diff options
author | Andrew Tridgell <tridge@samba.org> | 2007-04-16 00:18:54 +0000 |
---|---|---|
committer | Gerald (Jerry) Carter <jerry@samba.org> | 2007-10-10 14:50:40 -0500 |
commit | c9f04d8648cfdd573d45d47467bc964ef01f754d (patch) | |
tree | 115acf98b7b136f07dd8b16bbd50c9f7cbcdd3bb /source4/cluster/ctdb/common/ctdb_client.c | |
parent | bb36705c8d360a2ba865a3d8118c52afa1e46f4e (diff) | |
download | samba-c9f04d8648cfdd573d45d47467bc964ef01f754d.tar.gz samba-c9f04d8648cfdd573d45d47467bc964ef01f754d.tar.bz2 samba-c9f04d8648cfdd573d45d47467bc964ef01f754d.zip |
r22231: merge from bzr ctdb tree
(This used to be commit 807b959082d3b9a929c9f6597714e636638a940e)
Diffstat (limited to 'source4/cluster/ctdb/common/ctdb_client.c')
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_client.c | 662 |
1 files changed, 662 insertions, 0 deletions
diff --git a/source4/cluster/ctdb/common/ctdb_client.c b/source4/cluster/ctdb/common/ctdb_client.c new file mode 100644 index 0000000000..3cb27a1165 --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb_client.c @@ -0,0 +1,662 @@ +/* + ctdb daemon code + + Copyright (C) Andrew Tridgell 2007 + Copyright (C) Ronnie Sahlberg 2007 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "db_wrap.h" +#include "lib/tdb/include/tdb.h" +#include "lib/events/events.h" +#include "lib/util/dlinklist.h" +#include "system/network.h" +#include "system/filesys.h" +#include "../include/ctdb.h" +#include "../include/ctdb_private.h" + +/* + queue a packet for sending from client to daemon +*/ +static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length); +} + + +/* + handle a connect wait reply packet + */ +static void ctdb_reply_connect_wait(struct ctdb_context *ctdb, + struct ctdb_req_header *hdr) +{ + struct ctdb_reply_connect_wait *r = (struct ctdb_reply_connect_wait *)hdr; + ctdb->num_connected = r->num_connected; +} + +/* + called in the client when we receive a CTDB_REPLY_FETCH_LOCK from the daemon + + This packet comes in response to a CTDB_REQ_FETCH_LOCK request packet. It + contains any reply data from the call +*/ +void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_fetch_lock *c = (struct ctdb_reply_fetch_lock *)hdr; + struct ctdb_call_state *state; + + state = idr_find(ctdb->idr, hdr->reqid); + if (state == NULL) return; + + state->call.reply_data.dptr = c->data; + state->call.reply_data.dsize = c->datalen; + state->call.status = c->state; + + talloc_steal(state, c); + + /* get an extra reference here - this prevents the free in ctdb_recv_pkt() + from freeing the data */ + (void)talloc_reference(state, c); + + state->state = CTDB_CALL_DONE; + if (state->async.fn) { + state->async.fn(state); + } +} + +/* + called in the client when we receive a CTDB_REPLY_STORE_UNLOCK from the daemon + + This packet comes in response to a CTDB_REQ_STORE_UNLOCK request packet. It + contains any reply data from the call +*/ +void ctdb_reply_store_unlock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_store_unlock *c = (struct ctdb_reply_store_unlock *)hdr; + struct ctdb_call_state *state; + + state = idr_find(ctdb->idr, hdr->reqid); + if (state == NULL) return; + + state->call.status = c->state; + + talloc_steal(state, c); + + /* get an extra reference here - this prevents the free in ctdb_recv_pkt() + from freeing the data */ + (void)talloc_reference(state, c); + + state->state = CTDB_CALL_DONE; + if (state->async.fn) { + state->async.fn(state); + } +} +/* + this is called in the client, when data comes in from the daemon + */ +static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) +{ + struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context); + struct ctdb_req_header *hdr; + + if (cnt < sizeof(*hdr)) { + ctdb_set_error(ctdb, "Bad packet length %d\n", cnt); + return; + } + hdr = (struct ctdb_req_header *)data; + if (cnt != hdr->length) { + ctdb_set_error(ctdb, "Bad header length %d expected %d\n", + hdr->length, cnt); + return; + } + + if (hdr->ctdb_magic != CTDB_MAGIC) { + ctdb_set_error(ctdb, "Non CTDB packet rejected\n"); + return; + } + + if (hdr->ctdb_version != CTDB_VERSION) { + ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); + return; + } + + switch (hdr->operation) { + case CTDB_REPLY_CALL: + ctdb_reply_call(ctdb, hdr); + break; + + case CTDB_REQ_MESSAGE: + ctdb_request_message(ctdb, hdr); + break; + + case CTDB_REPLY_CONNECT_WAIT: + ctdb_reply_connect_wait(ctdb, hdr); + break; + + case CTDB_REPLY_FETCH_LOCK: + ctdb_reply_fetch_lock(ctdb, hdr); + break; + + case CTDB_REPLY_STORE_UNLOCK: + ctdb_reply_store_unlock(ctdb, hdr); + break; + + default: + printf("bogus operation code:%d\n",hdr->operation); + } +} + +/* + connect to a unix domain socket +*/ +static int ux_socket_connect(struct ctdb_context *ctdb) +{ + struct sockaddr_un addr; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)); + + ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0); + if (ctdb->daemon.sd == -1) { + return -1; + } + + if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + close(ctdb->daemon.sd); + ctdb->daemon.sd = -1; + return -1; + } + + ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, + CTDB_DS_ALIGNMENT, + ctdb_client_read_cb, ctdb); + return 0; +} + + + +/* + make a recv call to the local ctdb daemon - called from client context + + This is called when the program wants to wait for a ctdb_call to complete and get the + results. This call will block unless the call has already completed. +*/ +int ctdb_client_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) +{ + struct ctdb_record_handle *rec; + + while (state->state < CTDB_CALL_DONE) { + event_loop_once(state->node->ctdb->ev); + } + if (state->state != CTDB_CALL_DONE) { + ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + talloc_free(state); + return -1; + } + + rec = state->fetch_private; + + /* ugly hack to manage forced migration */ + if (rec != NULL) { + rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr); + rec->data->dsize = state->call.reply_data.dsize; + talloc_free(state); + return 0; + } + + if (state->call.reply_data.dsize) { + call->reply_data.dptr = talloc_memdup(state->node->ctdb, + state->call.reply_data.dptr, + state->call.reply_data.dsize); + call->reply_data.dsize = state->call.reply_data.dsize; + } else { + call->reply_data.dptr = NULL; + call->reply_data.dsize = 0; + } + call->status = state->call.status; + talloc_free(state); + + return 0; +} + + + + +/* + destroy a ctdb_call in client +*/ +static int ctdb_client_call_destructor(struct ctdb_call_state *state) +{ + idr_remove(state->node->ctdb->idr, state->c->hdr.reqid); + return 0; +} + + + +/* + make a ctdb call to the local daemon - async send. Called from client context. + + This constructs a ctdb_call request and queues it for processing. + This call never blocks. +*/ +struct ctdb_call_state *ctdb_client_call_send(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call) +{ + struct ctdb_call_state *state; + struct ctdb_context *ctdb = ctdb_db->ctdb; + struct ctdb_ltdb_header header; + TDB_DATA data; + int ret; + size_t len; + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ux_socket_connect(ctdb); + } + + ret = ctdb_ltdb_lock(ctdb_db, call->key); + if (ret != 0) { + printf("failed to lock ltdb record\n"); + return NULL; + } + + ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data); + if (ret != 0) { + ctdb_ltdb_unlock(ctdb_db, call->key); + return NULL; + } + +#if 0 + if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { + state = ctdb_call_local_send(ctdb_db, call, &header, &data); + ctdb_ltdb_unlock(ctdb_db, call->key); + return state; + } +#endif + + state = talloc_zero(ctdb_db, struct ctdb_call_state); + if (state == NULL) { + printf("failed to allocate state\n"); + ctdb_ltdb_unlock(ctdb_db, call->key); + return NULL; + } + + talloc_steal(state, data.dptr); + + len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize; + state->c = ctdbd_allocate_pkt(ctdb, len); + if (state->c == NULL) { + printf("failed to allocate packet\n"); + ctdb_ltdb_unlock(ctdb_db, call->key); + return NULL; + } + talloc_set_name_const(state->c, "ctdbd req_call packet"); + talloc_steal(state, state->c); + + state->c->hdr.length = len; + state->c->hdr.ctdb_magic = CTDB_MAGIC; + state->c->hdr.ctdb_version = CTDB_VERSION; + state->c->hdr.operation = CTDB_REQ_CALL; + state->c->hdr.destnode = header.dmaster; + state->c->hdr.srcnode = ctdb->vnn; + /* this limits us to 16k outstanding messages - not unreasonable */ + state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + state->c->flags = call->flags; + state->c->db_id = ctdb_db->db_id; + state->c->callid = call->call_id; + state->c->keylen = call->key.dsize; + state->c->calldatalen = call->call_data.dsize; + memcpy(&state->c->data[0], call->key.dptr, call->key.dsize); + memcpy(&state->c->data[call->key.dsize], + call->call_data.dptr, call->call_data.dsize); + state->call = *call; + state->call.call_data.dptr = &state->c->data[call->key.dsize]; + state->call.key.dptr = &state->c->data[0]; + + state->node = ctdb->nodes[header.dmaster]; + state->state = CTDB_CALL_WAIT; + state->header = header; + state->ctdb_db = ctdb_db; + + talloc_set_destructor(state, ctdb_client_call_destructor); + + ctdb_client_queue_pkt(ctdb, &state->c->hdr); + +/*XXX set up timeout to cleanup if server doesnt respond + event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), + ctdb_call_timeout, state); +*/ + + ctdb_ltdb_unlock(ctdb_db, call->key); + return state; +} + + + +/* + tell the daemon what messaging srvid we will use, and register the message + handler function in the client +*/ +int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, + ctdb_message_fn_t handler, + void *private_data) + +{ + struct ctdb_req_register c; + int res; + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ux_socket_connect(ctdb); + } + + ZERO_STRUCT(c); + + c.hdr.length = sizeof(c); + c.hdr.ctdb_magic = CTDB_MAGIC; + c.hdr.ctdb_version = CTDB_VERSION; + c.hdr.operation = CTDB_REQ_REGISTER; + c.srvid = srvid; + + res = ctdb_client_queue_pkt(ctdb, &c.hdr); + if (res != 0) { + return res; + } + + /* also need to register the handler with our ctdb structure */ + return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data); +} + + + +/* + setup handler for receipt of ctdb messages from ctdb_send_message() +*/ +int ctdb_set_message_handler(struct ctdb_context *ctdb, + uint32_t srvid, + ctdb_message_fn_t handler, + void *private_data) +{ + if (ctdb->flags & CTDB_FLAG_DAEMON_MODE) { + return ctdb_client_set_message_handler(ctdb, srvid, handler, private_data); + } + return ctdb_daemon_set_message_handler(ctdb, srvid, handler, private_data); +} + + +/* + send a message - from client context + */ +int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn, + uint32_t srvid, TDB_DATA data) +{ + struct ctdb_req_message *r; + int len, res; + + len = offsetof(struct ctdb_req_message, data) + data.dsize; + r = ctdb->methods->allocate_pkt(ctdb, len); + CTDB_NO_MEMORY(ctdb, r); + talloc_set_name_const(r, "req_message packet"); + + r->hdr.length = len; + r->hdr.ctdb_magic = CTDB_MAGIC; + r->hdr.ctdb_version = CTDB_VERSION; + r->hdr.operation = CTDB_REQ_MESSAGE; + r->hdr.destnode = vnn; + r->hdr.srcnode = ctdb->vnn; + r->hdr.reqid = 0; + r->srvid = srvid; + r->datalen = data.dsize; + memcpy(&r->data[0], data.dptr, data.dsize); + + res = ctdb_client_queue_pkt(ctdb, &r->hdr); + if (res != 0) { + return res; + } + + talloc_free(r); + return 0; +} + +/* + wait for all nodes to be connected - from client + */ +static void ctdb_client_connect_wait(struct ctdb_context *ctdb) +{ + struct ctdb_req_connect_wait r; + int res; + + ZERO_STRUCT(r); + + r.hdr.length = sizeof(r); + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.operation = CTDB_REQ_CONNECT_WAIT; + + res = ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)&r.hdr, r.hdr.length); + if (res != 0) { + printf("Failed to queue a connect wait request\n"); + return; + } + + /* now we can go into the normal wait routine, as the reply packet + will update the ctdb->num_connected variable */ + ctdb_daemon_connect_wait(ctdb); +} + +/* + wait for all nodes to be connected +*/ +void ctdb_connect_wait(struct ctdb_context *ctdb) +{ + if (!(ctdb->flags & CTDB_FLAG_DAEMON_MODE)) { + ctdb_daemon_connect_wait(ctdb); + return; + } + + ctdb_client_connect_wait(ctdb); +} + + +struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db, + TALLOC_CTX *mem_ctx, + TDB_DATA key) +{ + struct ctdb_call_state *state; + struct ctdb_context *ctdb = ctdb_db->ctdb; + struct ctdb_req_fetch_lock *req; + int len, res; + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ux_socket_connect(ctdb); + } + + state = talloc_zero(ctdb_db, struct ctdb_call_state); + if (state == NULL) { + printf("failed to allocate state\n"); + return NULL; + } + state->state = CTDB_CALL_WAIT; + state->ctdb_db = ctdb_db; + len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize; + state->c = ctdbd_allocate_pkt(ctdb, len); + if (state->c == NULL) { + printf("failed to allocate packet\n"); + return NULL; + } + ZERO_STRUCT(*state->c); + talloc_set_name_const(state->c, "ctdbd req_fetch_lock packet"); + talloc_steal(state, state->c); + + req = (struct ctdb_req_fetch_lock *)state->c; + req->hdr.length = len; + req->hdr.ctdb_magic = CTDB_MAGIC; + req->hdr.ctdb_version = CTDB_VERSION; + req->hdr.operation = CTDB_REQ_FETCH_LOCK; + req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + req->db_id = ctdb_db->db_id; + req->keylen = key.dsize; + memcpy(&req->key[0], key.dptr, key.dsize); + + res = ctdb_client_queue_pkt(ctdb, &req->hdr); + if (res != 0) { + return NULL; + } + + talloc_free(req); + + return state; +} + + +struct ctdb_call_state *ctdb_client_store_unlock_send( + struct ctdb_record_handle *rh, + TALLOC_CTX *mem_ctx, + TDB_DATA data) +{ + struct ctdb_call_state *state; + struct ctdb_db_context *ctdb_db = talloc_get_type(rh->ctdb_db, struct ctdb_db_context); + struct ctdb_context *ctdb = ctdb_db->ctdb; + struct ctdb_req_store_unlock *req; + int len, res; + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ux_socket_connect(ctdb); + } + + state = talloc_zero(ctdb_db, struct ctdb_call_state); + if (state == NULL) { + printf("failed to allocate state\n"); + return NULL; + } + state->state = CTDB_CALL_WAIT; + state->ctdb_db = ctdb_db; + len = offsetof(struct ctdb_req_store_unlock, data) + rh->key.dsize + data.dsize; + state->c = ctdbd_allocate_pkt(ctdb, len); + if (state->c == NULL) { + printf("failed to allocate packet\n"); + return NULL; + } + ZERO_STRUCT(*state->c); + talloc_set_name_const(state->c, "ctdbd req_store_unlock packet"); + talloc_steal(state, state->c); + + req = (struct ctdb_req_store_unlock *)state->c; + req->hdr.length = len; + req->hdr.ctdb_magic = CTDB_MAGIC; + req->hdr.ctdb_version = CTDB_VERSION; + req->hdr.operation = CTDB_REQ_STORE_UNLOCK; + req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + req->db_id = ctdb_db->db_id; + req->keylen = rh->key.dsize; + req->datalen = data.dsize; + memcpy(&req->data[0], rh->key.dptr, rh->key.dsize); + memcpy(&req->data[req->keylen], data.dptr, data.dsize); + + res = ctdb_client_queue_pkt(ctdb, &req->hdr); + if (res != 0) { + return NULL; + } + + talloc_free(req); + + return state; +} + +/* + make a recv call to the local ctdb daemon - called from client context + + This is called when the program wants to wait for a ctdb_fetch_lock to complete and get the + results. This call will block unless the call has already completed. +*/ +struct ctdb_record_handle *ctdb_client_fetch_lock_recv(struct ctdb_call_state *state, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data) +{ + struct ctdb_record_handle *rec; + + while (state->state < CTDB_CALL_DONE) { + event_loop_once(state->ctdb_db->ctdb->ev); + } + if (state->state != CTDB_CALL_DONE) { + ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + talloc_free(state); + return NULL; + } + + rec = talloc(mem_ctx, struct ctdb_record_handle); + CTDB_NO_MEMORY_NULL(state->ctdb_db->ctdb, rec); + + rec->ctdb_db = state->ctdb_db; + rec->key = key; + rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize); + rec->data = talloc(rec, TDB_DATA); + rec->data->dsize = state->call.reply_data.dsize; + rec->data->dptr = talloc_memdup(rec, state->call.reply_data.dptr, rec->data->dsize); + + if (data) { + *data = *rec->data; + } + return rec; +} + +/* + make a recv call to the local ctdb daemon - called from client context + + This is called when the program wants to wait for a ctdb_store_unlock to complete and get the + results. This call will block unless the call has already completed. +*/ +int ctdb_client_store_unlock_recv(struct ctdb_call_state *state, struct ctdb_record_handle *rec) +{ + while (state->state < CTDB_CALL_DONE) { + event_loop_once(state->ctdb_db->ctdb->ev); + } + if (state->state != CTDB_CALL_DONE) { + ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + } + + talloc_free(state); + return state->state; +} + +struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db, + TALLOC_CTX *mem_ctx, + TDB_DATA key, + TDB_DATA *data) +{ + struct ctdb_call_state *state; + struct ctdb_record_handle *rec; + + state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key); + rec = ctdb_client_fetch_lock_recv(state, mem_ctx, key, data); + + return rec; +} + +int ctdb_client_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data) +{ + struct ctdb_call_state *state; + int res; + + state = ctdb_client_store_unlock_send(rec, rec, data); + res = ctdb_client_store_unlock_recv(state, rec); + + talloc_free(rec); + + return res; +} |