From 650d81b252cc669ef848448afad7e9bb79c4f20e Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Sat, 21 Apr 2007 07:23:42 +0000 Subject: r22421: merged in latest ctdb changes from bzr (This used to be commit 3633f862b966866819c9a0a6ad0238a858e15e62) --- source4/cluster/ctdb/common/cmdline.c | 142 +++++++ source4/cluster/ctdb/common/ctdb.c | 136 +++++-- source4/cluster/ctdb/common/ctdb_call.c | 265 ++++-------- source4/cluster/ctdb/common/ctdb_client.c | 612 ++++++++++++++-------------- source4/cluster/ctdb/common/ctdb_daemon.c | 500 +++++++++++++---------- source4/cluster/ctdb/common/ctdb_lockwait.c | 139 +++++++ source4/cluster/ctdb/common/ctdb_ltdb.c | 144 ++++++- source4/cluster/ctdb/common/ctdb_message.c | 17 +- source4/cluster/ctdb/common/ctdb_util.c | 28 ++ 9 files changed, 1241 insertions(+), 742 deletions(-) create mode 100644 source4/cluster/ctdb/common/cmdline.c create mode 100644 source4/cluster/ctdb/common/ctdb_lockwait.c (limited to 'source4/cluster/ctdb/common') diff --git a/source4/cluster/ctdb/common/cmdline.c b/source4/cluster/ctdb/common/cmdline.c new file mode 100644 index 0000000000..699cb8fb22 --- /dev/null +++ b/source4/cluster/ctdb/common/cmdline.c @@ -0,0 +1,142 @@ +/* + common commandline code to ctdb test tools + + Copyright (C) Andrew Tridgell 2007 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "system/filesys.h" +#include "popt.h" +#include "../include/ctdb.h" +#include "../include/ctdb_private.h" + +/* Handle common command line options for ctdb test progs + */ + +static struct { + const char *nlist; + const char *transport; + const char *myaddress; + int self_connect; + const char *db_dir; + int torture; +} ctdb_cmdline = { + .nlist = NULL, + .transport = "tcp", + .myaddress = NULL, + .self_connect = 0, + .db_dir = NULL, + .torture = 0 +}; + + +struct poptOption popt_ctdb_cmdline[] = { + { "nlist", 0, POPT_ARG_STRING, &ctdb_cmdline.nlist, 0, "node list file", "filename" }, + { "listen", 0, POPT_ARG_STRING, &ctdb_cmdline.myaddress, 0, "address to listen on", "address" }, + { "transport", 0, POPT_ARG_STRING, &ctdb_cmdline.transport, 0, "protocol transport", NULL }, + { "self-connect", 0, POPT_ARG_NONE, &ctdb_cmdline.self_connect, 0, "enable self connect", "boolean" }, + { "debug", 'd', POPT_ARG_INT, &LogLevel, 0, "debug level"}, + { "dbdir", 0, POPT_ARG_STRING, &ctdb_cmdline.db_dir, 0, "directory for the tdb files", NULL }, + { "torture", 0, POPT_ARG_NONE, &ctdb_cmdline.torture, 0, "enable nastiness in library", NULL }, + { NULL } +}; + + +/* + startup daemon side of ctdb according to command line options + */ +struct ctdb_context *ctdb_cmdline_init(struct event_context *ev) +{ + struct ctdb_context *ctdb; + int ret; + + if (ctdb_cmdline.nlist == NULL || ctdb_cmdline.myaddress == NULL) { + printf("You must provide a node list with --nlist and an address with --listen\n"); + exit(1); + } + + /* initialise ctdb */ + ctdb = ctdb_init(ev); + if (ctdb == NULL) { + printf("Failed to init ctdb\n"); + exit(1); + } + + if (ctdb_cmdline.self_connect) { + ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT); + } + if (ctdb_cmdline.torture) { + ctdb_set_flags(ctdb, CTDB_FLAG_TORTURE); + } + + ret = ctdb_set_transport(ctdb, ctdb_cmdline.transport); + if (ret == -1) { + printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* tell ctdb what address to listen on */ + ret = ctdb_set_address(ctdb, ctdb_cmdline.myaddress); + if (ret == -1) { + printf("ctdb_set_address failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* tell ctdb what nodes are available */ + ret = ctdb_set_nlist(ctdb, ctdb_cmdline.nlist); + if (ret == -1) { + printf("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + ret = ctdb_set_tdb_dir(ctdb, ctdb_cmdline.db_dir); + if (ret == -1) { + printf("ctdb_set_tdb_dir failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + return ctdb; +} + + +/* + startup a client only ctdb context + */ +struct ctdb_context *ctdb_cmdline_client(struct event_context *ev, const char *ctdb_socket) +{ + struct ctdb_context *ctdb; + int ret; + + /* initialise ctdb */ + ctdb = ctdb_init(ev); + if (ctdb == NULL) { + printf("Failed to init ctdb\n"); + exit(1); + } + + ctdb->daemon.name = talloc_strdup(ctdb, ctdb_socket); + + ret = ctdb_socket_connect(ctdb); + if (ret != 0) { + DEBUG(0,(__location__ " Failed to connect to daemon\n")); + talloc_free(ctdb); + return NULL; + } + + return ctdb; +} diff --git a/source4/cluster/ctdb/common/ctdb.c b/source4/cluster/ctdb/common/ctdb.c index 8a8d52f3f1..6bd2fda529 100644 --- a/source4/cluster/ctdb/common/ctdb.c +++ b/source4/cluster/ctdb/common/ctdb.c @@ -73,6 +73,22 @@ void ctdb_set_max_lacount(struct ctdb_context *ctdb, unsigned count) ctdb->max_lacount = count; } +/* + set the directory for the local databases +*/ +int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir) +{ + if (dir == NULL) { + ctdb->db_directory = talloc_asprintf(ctdb, "ctdb-%u", ctdb_get_vnn(ctdb)); + } else { + ctdb->db_directory = talloc_strdup(ctdb, dir); + } + if (ctdb->db_directory == NULL) { + return -1; + } + return 0; +} + /* add a node to the list of active nodes */ @@ -190,65 +206,102 @@ uint32_t ctdb_get_num_nodes(struct ctdb_context *ctdb) /* called by the transport layer when a packet comes in */ -static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length) +void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length) { - struct ctdb_req_header *hdr; + struct ctdb_req_header *hdr = (struct ctdb_req_header *)data; + TALLOC_CTX *tmp_ctx; + + ctdb->status.node_packets_recv++; + + /* place the packet as a child of the tmp_ctx. We then use + talloc_free() below to free it. If any of the calls want + to keep it, then they will steal it somewhere else, and the + talloc_free() will only free the tmp_ctx */ + tmp_ctx = talloc_new(ctdb); + talloc_steal(tmp_ctx, hdr); if (length < sizeof(*hdr)) { ctdb_set_error(ctdb, "Bad packet length %d\n", length); - return; + goto done; } - hdr = (struct ctdb_req_header *)data; if (length != hdr->length) { ctdb_set_error(ctdb, "Bad header length %d expected %d\n", hdr->length, length); - return; + goto done; } if (hdr->ctdb_magic != CTDB_MAGIC) { ctdb_set_error(ctdb, "Non CTDB packet rejected\n"); - return; + goto done; } if (hdr->ctdb_version != CTDB_VERSION) { ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); - return; + goto done; } + DEBUG(3,(__location__ " ctdb request %d of type %d length %d from " + "node %d to %d\n", hdr->reqid, hdr->operation, hdr->length, + hdr->srcnode, hdr->destnode)); + switch (hdr->operation) { case CTDB_REQ_CALL: + ctdb->status.count.req_call++; ctdb_request_call(ctdb, hdr); break; case CTDB_REPLY_CALL: + ctdb->status.count.reply_call++; ctdb_reply_call(ctdb, hdr); break; case CTDB_REPLY_ERROR: + ctdb->status.count.reply_error++; ctdb_reply_error(ctdb, hdr); break; case CTDB_REPLY_REDIRECT: + ctdb->status.count.reply_redirect++; ctdb_reply_redirect(ctdb, hdr); break; case CTDB_REQ_DMASTER: + ctdb->status.count.req_dmaster++; ctdb_request_dmaster(ctdb, hdr); break; case CTDB_REPLY_DMASTER: + ctdb->status.count.reply_dmaster++; ctdb_reply_dmaster(ctdb, hdr); break; case CTDB_REQ_MESSAGE: + ctdb->status.count.req_message++; ctdb_request_message(ctdb, hdr); break; + case CTDB_REQ_FINISHED: + ctdb->status.count.req_finished++; + ctdb_request_finished(ctdb, hdr); + break; + default: - printf("Packet with unknown operation %d\n", hdr->operation); + DEBUG(0,("%s: Packet with unknown operation %d\n", + __location__, hdr->operation)); break; } - talloc_free(hdr); + +done: + talloc_free(tmp_ctx); +} + +/* + called by the transport layer when a packet comes in +*/ +void ctdb_recv_raw_pkt(void *p, uint8_t *data, uint32_t length) +{ + struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context); + ctdb_recv_pkt(ctdb, data, length); } /* @@ -257,8 +310,8 @@ static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t len static void ctdb_node_dead(struct ctdb_node *node) { node->ctdb->num_connected--; - printf("%s: node %s is dead: %d connected\n", - node->ctdb->name, node->name, node->ctdb->num_connected); + DEBUG(1,("%s: node %s is dead: %d connected\n", + node->ctdb->name, node->name, node->ctdb->num_connected)); } /* @@ -267,8 +320,8 @@ static void ctdb_node_dead(struct ctdb_node *node) static void ctdb_node_connected(struct ctdb_node *node) { node->ctdb->num_connected++; - printf("%s: connected to %s - %d connected\n", - node->ctdb->name, node->name, node->ctdb->num_connected); + DEBUG(1,("%s: connected to %s - %d connected\n", + node->ctdb->name, node->name, node->ctdb->num_connected)); } /* @@ -281,33 +334,62 @@ void ctdb_daemon_connect_wait(struct ctdb_context *ctdb) expected++; } while (ctdb->num_connected != expected) { + DEBUG(3,("ctdb_connect_wait: waiting for %d nodes (have %d)\n", + expected, ctdb->num_connected)); event_loop_once(ctdb->ev); } + DEBUG(3,("ctdb_connect_wait: got all %d nodes\n", expected)); } +struct queue_next { + struct ctdb_context *ctdb; + struct ctdb_req_header *hdr; +}; + + /* - wait until we're the only node left -*/ -void ctdb_wait_loop(struct ctdb_context *ctdb) + trigered when a deferred packet is due + */ +static void queue_next_trigger(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) { - int expected = 0; - if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) { - expected++; + struct queue_next *q = talloc_get_type(private_data, struct queue_next); + ctdb_recv_pkt(q->ctdb, (uint8_t *)q->hdr, q->hdr->length); + talloc_free(q); +} + +/* + defer a packet, so it is processed on the next event loop + this is used for sending packets to ourselves + */ +static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct queue_next *q; + q = talloc(ctdb, struct queue_next); + if (q == NULL) { + DEBUG(0,(__location__ " Failed to allocate deferred packet\n")); + return; } - while (ctdb->num_connected > expected) { - event_loop_once(ctdb->ev); + q->ctdb = ctdb; + q->hdr = talloc_memdup(ctdb, hdr, hdr->length); + if (q->hdr == NULL) { + DEBUG(0,("Error copying deferred packet to self\n")); + return; } + event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q); } - /* queue a packet or die */ void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { struct ctdb_node *node; + ctdb->status.node_packets_sent++; node = ctdb->nodes[hdr->destnode]; - if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) { + if (hdr->destnode == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { + ctdb_defer_packet(ctdb, hdr); + } else if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) { ctdb_fatal(ctdb, "Unable to queue packet\n"); } } @@ -338,11 +420,3 @@ struct ctdb_context *ctdb_init(struct event_context *ev) return ctdb; } -int ctdb_start(struct ctdb_context *ctdb) -{ - if (ctdb->flags&CTDB_FLAG_DAEMON_MODE) { - return ctdbd_start(ctdb); - } - - return ctdb->methods->start(ctdb); -} diff --git a/source4/cluster/ctdb/common/ctdb_call.c b/source4/cluster/ctdb/common/ctdb_call.c index ab5c2cce3b..76a7e97a87 100644 --- a/source4/cluster/ctdb/common/ctdb_call.c +++ b/source4/cluster/ctdb/common/ctdb_call.c @@ -47,9 +47,9 @@ /* local version of ctdb_call */ -static int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call, - struct ctdb_ltdb_header *header, TDB_DATA *data, - uint32_t caller) +int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call, + struct ctdb_ltdb_header *header, TDB_DATA *data, + uint32_t caller) { struct ctdb_call_info *c; struct ctdb_registered_call *fn; @@ -105,6 +105,7 @@ static int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *ca if (c->reply_data) { call->reply_data = *c->reply_data; talloc_steal(ctdb, call->reply_data.dptr); + talloc_set_name_const(call->reply_data.dptr, __location__); } else { call->reply_data.dptr = NULL; call->reply_data.dsize = 0; @@ -140,7 +141,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb, msglen = strlen(msg)+1; len = offsetof(struct ctdb_reply_error, msg); - r = ctdb->methods->allocate_pkt(ctdb, len + msglen); + r = ctdb->methods->allocate_pkt(msg, len + msglen); CTDB_NO_MEMORY_FATAL(ctdb, r); talloc_set_name_const(r, "send_error packet"); @@ -155,11 +156,9 @@ static void ctdb_send_error(struct ctdb_context *ctdb, r->msglen = msglen; memcpy(&r->msg[0], msg, msglen); - talloc_free(msg); - ctdb_queue_packet(ctdb, &r->hdr); - talloc_free(r); + talloc_free(msg); } @@ -223,16 +222,12 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, memcpy(&r->data[0], key->dptr, key->dsize); memcpy(&r->data[key->dsize], data->dptr, data->dsize); - if (r->hdr.destnode == ctdb->vnn) { - /* we are the lmaster - don't send to ourselves */ - ctdb_request_dmaster(ctdb, &r->hdr); - } else { - ctdb_queue_packet(ctdb, &r->hdr); - - /* update the ltdb to record the new dmaster */ - header->dmaster = r->hdr.destnode; - ctdb_ltdb_store(ctdb_db, *key, header, *data); - } + /* XXX - probably not necessary when lmaster==dmaster + update the ltdb to record the new dmaster */ + header->dmaster = r->hdr.destnode; + ctdb_ltdb_store(ctdb_db, *key, header, *data); + + ctdb_queue_packet(ctdb, &r->hdr); talloc_free(r); } @@ -252,6 +247,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr struct ctdb_ltdb_header header; struct ctdb_db_context *ctdb_db; int ret, len; + TALLOC_CTX *tmp_ctx; key.dptr = c->data; key.dsize = c->keylen; @@ -267,28 +263,41 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr } /* fetch the current record */ - ret = ctdb_ltdb_fetch(ctdb_db, key, &header, hdr, &data2); - if (ret != 0) { + ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2, + ctdb_recv_raw_pkt, ctdb); + if (ret == -1) { ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record"); return; } - + if (ret == -2) { + DEBUG(2,(__location__ " deferring ctdb_request_dmaster\n")); + return; + } + /* its a protocol error if the sending node is not the current dmaster */ - if (header.dmaster != hdr->srcnode) { + if (header.dmaster != hdr->srcnode && + hdr->srcnode != ctdb_lmaster(ctdb_db->ctdb, &key)) { ctdb_fatal(ctdb, "dmaster request from non-master"); return; } - + header.dmaster = c->dmaster; - if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) { + ret = ctdb_ltdb_store(ctdb_db, key, &header, data); + ctdb_ltdb_unlock(ctdb_db, key); + if (ret != 0) { ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster"); return; } + /* put the packet on a temporary context, allowing us to safely free + it below even if ctdb_reply_dmaster() has freed it already */ + tmp_ctx = talloc_new(ctdb); + /* send the CTDB_REPLY_DMASTER */ len = offsetof(struct ctdb_reply_dmaster, data) + data.dsize; - r = ctdb->methods->allocate_pkt(ctdb, len); + r = ctdb->methods->allocate_pkt(tmp_ctx, len); CTDB_NO_MEMORY_FATAL(ctdb, r); + talloc_set_name_const(r, "reply_dmaster packet"); r->hdr.length = len; r->hdr.ctdb_magic = CTDB_MAGIC; @@ -300,13 +309,9 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr r->datalen = data.dsize; memcpy(&r->data[0], data.dptr, data.dsize); - if (r->hdr.destnode == r->hdr.srcnode) { - ctdb_reply_dmaster(ctdb, &r->hdr); - } else { - ctdb_queue_packet(ctdb, &r->hdr); - } + ctdb_queue_packet(ctdb, &r->hdr); - talloc_free(r); + talloc_free(tmp_ctx); } @@ -341,17 +346,23 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) fetches the record data (if any), thus avoiding a 2nd fetch of the data if the call will be answered locally */ - ret = ctdb_ltdb_fetch(ctdb_db, call.key, &header, hdr, &data); - if (ret != 0) { + ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call.key, &header, hdr, &data, + ctdb_recv_raw_pkt, ctdb); + if (ret == -1) { ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call"); return; } + if (ret == -2) { + DEBUG(2,(__location__ " deferred ctdb_request_call\n")); + return; + } /* if we are not the dmaster, then send a redirect to the requesting node */ if (header.dmaster != ctdb->vnn) { ctdb_call_send_redirect(ctdb, c, &header); talloc_free(data.dptr); + ctdb_ltdb_unlock(ctdb_db, call.key); return; } @@ -364,11 +375,14 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) || c->flags&CTDB_IMMEDIATE_MIGRATION ) { ctdb_call_send_dmaster(ctdb_db, c, &header, &call.key, &data); talloc_free(data.dptr); + ctdb_ltdb_unlock(ctdb_db, call.key); return; } ctdb_call_local(ctdb_db, &call, &header, &data, c->hdr.srcnode); + ctdb_ltdb_unlock(ctdb_db, call.key); + len = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize; r = ctdb->methods->allocate_pkt(ctdb, len); CTDB_NO_MEMORY_FATAL(ctdb, r); @@ -396,15 +410,18 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) called when a CTDB_REPLY_CALL packet comes in This packet comes in response to a CTDB_REQ_CALL request packet. It - contains any reply data freom the call + contains any reply data from the call */ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr; struct ctdb_call_state *state; - state = idr_find(ctdb->idr, hdr->reqid); - if (state == NULL) return; + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state); + if (state == NULL) { + DEBUG(0, ("reqid %d not found\n", hdr->reqid)); + return; + } state->call.reply_data.dptr = c->data; state->call.reply_data.dsize = c->datalen; @@ -412,10 +429,6 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) talloc_steal(state, c); - /* get an extra reference here - this prevents the free in ctdb_recv_pkt() - from freeing the data */ - (void)talloc_reference(state, c); - state->state = CTDB_CALL_DONE; if (state->async.fn) { state->async.fn(state); @@ -435,13 +448,25 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) struct ctdb_call_state *state; struct ctdb_db_context *ctdb_db; TDB_DATA data; + int ret; - state = idr_find(ctdb->idr, hdr->reqid); + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state); if (state == NULL) { return; } + ctdb_db = state->ctdb_db; + ret = ctdb_ltdb_lock_requeue(ctdb_db, state->call.key, hdr, + ctdb_recv_raw_pkt, ctdb); + if (ret == -2) { + return; + } + if (ret != 0) { + DEBUG(0,(__location__ " Failed to get lock in ctdb_reply_dmaster\n")); + return; + } + data.dptr = c->data; data.dsize = c->datalen; @@ -452,12 +477,17 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) state->header.dmaster = ctdb->vnn; if (ctdb_ltdb_store(ctdb_db, state->call.key, &state->header, data) != 0) { + ctdb_ltdb_unlock(ctdb_db, state->call.key); ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n"); return; } ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn); + ctdb_ltdb_unlock(ctdb_db, state->call.key); + + talloc_steal(state, state->call.reply_data.dptr); + state->state = CTDB_CALL_DONE; if (state->async.fn) { state->async.fn(state); @@ -473,7 +503,7 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr; struct ctdb_call_state *state; - state = idr_find(ctdb->idr, hdr->reqid); + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state); if (state == NULL) return; talloc_steal(state, c); @@ -498,7 +528,7 @@ void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr; struct ctdb_call_state *state; - state = idr_find(ctdb->idr, hdr->reqid); + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state); if (state == NULL) return; talloc_steal(state, c); @@ -510,6 +540,7 @@ void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) /* send it off again */ state->node = ctdb->nodes[c->dmaster]; + state->c->hdr.destnode = c->dmaster; ctdb_queue_packet(ctdb, &state->c->hdr); } @@ -578,6 +609,7 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, state->ctdb_db = ctdb_db; ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn); + talloc_steal(state, state->call.reply_data.dptr); event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state); @@ -591,45 +623,27 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, This constructs a ctdb_call request and queues it for processing. This call never blocks. */ -static struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctdb_db, - struct ctdb_call *call) +struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call, + struct ctdb_ltdb_header *header) { uint32_t len; struct ctdb_call_state *state; - int ret; - struct ctdb_ltdb_header header; - TDB_DATA data; struct ctdb_context *ctdb = ctdb_db->ctdb; - /* - if we are the dmaster for this key then we don't need to - send it off at all, we can bypass the network and handle it - locally. To find out if we are the dmaster we need to look - in our ltdb - */ - ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data); - if (ret != 0) return NULL; - - if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { - return ctdb_call_local_send(ctdb_db, call, &header, &data); - } - state = talloc_zero(ctdb_db, struct ctdb_call_state); CTDB_NO_MEMORY_NULL(ctdb, state); - talloc_steal(state, data.dptr); - len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize; - state->c = ctdb->methods->allocate_pkt(ctdb, len); + state->c = ctdb->methods->allocate_pkt(state, len); CTDB_NO_MEMORY_NULL(ctdb, state->c); talloc_set_name_const(state->c, "req_call packet"); - talloc_steal(state, state->c); state->c->hdr.length = len; state->c->hdr.ctdb_magic = CTDB_MAGIC; state->c->hdr.ctdb_version = CTDB_VERSION; state->c->hdr.operation = CTDB_REQ_CALL; - state->c->hdr.destnode = header.dmaster; + state->c->hdr.destnode = header->dmaster; state->c->hdr.srcnode = ctdb->vnn; /* this limits us to 16k outstanding messages - not unreasonable */ state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); @@ -645,9 +659,9 @@ static struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctd state->call.call_data.dptr = &state->c->data[call->key.dsize]; state->call.key.dptr = &state->c->data[0]; - state->node = ctdb->nodes[header.dmaster]; + state->node = ctdb->nodes[header->dmaster]; state->state = CTDB_CALL_WAIT; - state->header = header; + state->header = *header; state->ctdb_db = ctdb_db; talloc_set_destructor(state, ctdb_call_destructor); @@ -659,30 +673,14 @@ static struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctd return state; } -/* - make a remote ctdb call - async send - - This constructs a ctdb_call request and queues it for processing. - This call never blocks. -*/ -struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) -{ - if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_call_send(ctdb_db, call); - } - return ctdb_daemon_call_send(ctdb_db, call); -} - /* make a remote ctdb call - async recv - called in daemon context This is called when the program wants to wait for a ctdb_call to complete and get the results. This call will block unless the call has already completed. */ -static int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) +int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) { - struct ctdb_record_handle *rec; - while (state->state < CTDB_CALL_DONE) { event_loop_once(state->node->ctdb->ev); } @@ -692,16 +690,6 @@ static int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call return -1; } - rec = state->fetch_private; - - /* ugly hack to manage forced migration */ - if (rec != NULL) { - rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr); - rec->data->dsize = state->call.reply_data.dsize; - talloc_free(state); - return 0; - } - if (state->call.reply_data.dsize) { call->reply_data.dptr = talloc_memdup(state->node->ctdb, state->call.reply_data.dptr, @@ -717,92 +705,3 @@ static int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call } -/* - make a remote ctdb call - async recv. - - This is called when the program wants to wait for a ctdb_call to complete and get the - results. This call will block unless the call has already completed. -*/ -int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) -{ - if (state->ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_call_recv(state, call); - } - return ctdb_daemon_call_recv(state, call); -} - -/* - full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv() -*/ -int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) -{ - struct ctdb_call_state *state; - - state = ctdb_call_send(ctdb_db, call); - return ctdb_call_recv(state, call); -} - - - -struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, - TDB_DATA key, TDB_DATA *data) -{ - struct ctdb_call call; - struct ctdb_record_handle *rec; - struct ctdb_call_state *state; - int ret; - - if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_fetch_lock(ctdb_db, mem_ctx, key, data); - } - - ZERO_STRUCT(call); - call.call_id = CTDB_FETCH_FUNC; - call.key = key; - call.flags = CTDB_IMMEDIATE_MIGRATION; - - rec = talloc(mem_ctx, struct ctdb_record_handle); - CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec); - - rec->ctdb_db = ctdb_db; - rec->key = key; - rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize); - rec->data = data; - - state = ctdb_call_send(ctdb_db, &call); - state->fetch_private = rec; - - ret = ctdb_call_recv(state, &call); - if (ret != 0) { - talloc_free(rec); - return NULL; - } - - return rec; -} - - -int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data) -{ - int ret; - struct ctdb_ltdb_header header; - struct ctdb_db_context *ctdb_db = talloc_get_type(rec->ctdb_db, struct ctdb_db_context); - - if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_store_unlock(rec, data); - } - - /* should be avoided if possible hang header off rec ? */ - ret = ctdb_ltdb_fetch(rec->ctdb_db, rec->key, &header, NULL, NULL); - if (ret) { - ctdb_set_error(rec->ctdb_db->ctdb, "Fetch of locally held record failed"); - talloc_free(rec); - return ret; - } - - ret = ctdb_ltdb_store(rec->ctdb_db, rec->key, &header, data); - - talloc_free(rec); - - return ret; -} diff --git a/source4/cluster/ctdb/common/ctdb_client.c b/source4/cluster/ctdb/common/ctdb_client.c index 3cb27a1165..dbed8d3585 100644 --- a/source4/cluster/ctdb/common/ctdb_client.c +++ b/source4/cluster/ctdb/common/ctdb_client.c @@ -49,94 +49,87 @@ static void ctdb_reply_connect_wait(struct ctdb_context *ctdb, } /* - called in the client when we receive a CTDB_REPLY_FETCH_LOCK from the daemon + state of a in-progress ctdb call in client +*/ +struct ctdb_client_call_state { + enum call_state state; + uint32_t reqid; + struct ctdb_db_context *ctdb_db; + struct ctdb_call call; +}; + +/* + called when a CTDB_REPLY_CALL packet comes in in the client - This packet comes in response to a CTDB_REQ_FETCH_LOCK request packet. It + This packet comes in response to a CTDB_REQ_CALL request packet. It contains any reply data from the call */ -void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { - struct ctdb_reply_fetch_lock *c = (struct ctdb_reply_fetch_lock *)hdr; - struct ctdb_call_state *state; + struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr; + struct ctdb_client_call_state *state; - state = idr_find(ctdb->idr, hdr->reqid); - if (state == NULL) return; + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_client_call_state); + if (state == NULL) { + DEBUG(0, ("reqid %d not found\n", hdr->reqid)); + return; + } state->call.reply_data.dptr = c->data; state->call.reply_data.dsize = c->datalen; - state->call.status = c->state; + state->call.status = c->status; talloc_steal(state, c); - /* get an extra reference here - this prevents the free in ctdb_recv_pkt() - from freeing the data */ - (void)talloc_reference(state, c); - state->state = CTDB_CALL_DONE; - if (state->async.fn) { - state->async.fn(state); - } } -/* - called in the client when we receive a CTDB_REPLY_STORE_UNLOCK from the daemon - - This packet comes in response to a CTDB_REQ_STORE_UNLOCK request packet. It - contains any reply data from the call -*/ -void ctdb_reply_store_unlock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) -{ - struct ctdb_reply_store_unlock *c = (struct ctdb_reply_store_unlock *)hdr; - struct ctdb_call_state *state; - - state = idr_find(ctdb->idr, hdr->reqid); - if (state == NULL) return; - - state->call.status = c->state; - - talloc_steal(state, c); - - /* get an extra reference here - this prevents the free in ctdb_recv_pkt() - from freeing the data */ - (void)talloc_reference(state, c); +static void ctdb_reply_status(struct ctdb_context *ctdb, struct ctdb_req_header *hdr); - state->state = CTDB_CALL_DONE; - if (state->async.fn) { - state->async.fn(state); - } -} /* this is called in the client, when data comes in from the daemon */ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) { struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context); - struct ctdb_req_header *hdr; + struct ctdb_req_header *hdr = (struct ctdb_req_header *)data; + TALLOC_CTX *tmp_ctx; + + /* place the packet as a child of a tmp_ctx. We then use + talloc_free() below to free it. If any of the calls want + to keep it, then they will steal it somewhere else, and the + talloc_free() will be a no-op */ + tmp_ctx = talloc_new(ctdb); + talloc_steal(tmp_ctx, hdr); + + if (cnt == 0) { + DEBUG(2,("Daemon has exited - shutting down client\n")); + exit(0); + } if (cnt < sizeof(*hdr)) { - ctdb_set_error(ctdb, "Bad packet length %d\n", cnt); - return; + DEBUG(0,("Bad packet length %d in client\n", cnt)); + goto done; } - hdr = (struct ctdb_req_header *)data; if (cnt != hdr->length) { - ctdb_set_error(ctdb, "Bad header length %d expected %d\n", + ctdb_set_error(ctdb, "Bad header length %d expected %d in client\n", hdr->length, cnt); - return; + goto done; } if (hdr->ctdb_magic != CTDB_MAGIC) { - ctdb_set_error(ctdb, "Non CTDB packet rejected\n"); - return; + ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n"); + goto done; } if (hdr->ctdb_version != CTDB_VERSION) { - ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); - return; + ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version); + goto done; } switch (hdr->operation) { case CTDB_REPLY_CALL: - ctdb_reply_call(ctdb, hdr); + ctdb_client_reply_call(ctdb, hdr); break; case CTDB_REQ_MESSAGE: @@ -147,23 +140,22 @@ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) ctdb_reply_connect_wait(ctdb, hdr); break; - case CTDB_REPLY_FETCH_LOCK: - ctdb_reply_fetch_lock(ctdb, hdr); - break; - - case CTDB_REPLY_STORE_UNLOCK: - ctdb_reply_store_unlock(ctdb, hdr); + case CTDB_REPLY_STATUS: + ctdb_reply_status(ctdb, hdr); break; default: - printf("bogus operation code:%d\n",hdr->operation); + DEBUG(0,("bogus operation code:%d\n",hdr->operation)); } + +done: + talloc_free(tmp_ctx); } /* connect to a unix domain socket */ -static int ux_socket_connect(struct ctdb_context *ctdb) +int ctdb_socket_connect(struct ctdb_context *ctdb) { struct sockaddr_un addr; @@ -189,6 +181,13 @@ static int ux_socket_connect(struct ctdb_context *ctdb) } +struct ctdb_record_handle { + struct ctdb_db_context *ctdb_db; + TDB_DATA key; + TDB_DATA *data; + struct ctdb_ltdb_header header; +}; + /* make a recv call to the local ctdb daemon - called from client context @@ -196,31 +195,19 @@ static int ux_socket_connect(struct ctdb_context *ctdb) This is called when the program wants to wait for a ctdb_call to complete and get the results. This call will block unless the call has already completed. */ -int ctdb_client_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) +int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call) { - struct ctdb_record_handle *rec; - while (state->state < CTDB_CALL_DONE) { - event_loop_once(state->node->ctdb->ev); + event_loop_once(state->ctdb_db->ctdb->ev); } if (state->state != CTDB_CALL_DONE) { - ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + DEBUG(0,(__location__ " ctdb_call_recv failed\n")); talloc_free(state); return -1; } - rec = state->fetch_private; - - /* ugly hack to manage forced migration */ - if (rec != NULL) { - rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr); - rec->data->dsize = state->call.reply_data.dsize; - talloc_free(state); - return 0; - } - if (state->call.reply_data.dsize) { - call->reply_data.dptr = talloc_memdup(state->node->ctdb, + call->reply_data.dptr = talloc_memdup(state->ctdb_db, state->call.reply_data.dptr, state->call.reply_data.dsize); call->reply_data.dsize = state->call.reply_data.dsize; @@ -240,13 +227,41 @@ int ctdb_client_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) /* destroy a ctdb_call in client */ -static int ctdb_client_call_destructor(struct ctdb_call_state *state) +static int ctdb_client_call_destructor(struct ctdb_client_call_state *state) { - idr_remove(state->node->ctdb->idr, state->c->hdr.reqid); + idr_remove(state->ctdb_db->ctdb->idr, state->reqid); return 0; } +/* + construct an event driven local ctdb_call + + this is used so that locally processed ctdb_call requests are processed + in an event driven manner +*/ +static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call, + struct ctdb_ltdb_header *header, + TDB_DATA *data) +{ + struct ctdb_client_call_state *state; + struct ctdb_context *ctdb = ctdb_db->ctdb; + int ret; + + state = talloc_zero(ctdb_db, struct ctdb_client_call_state); + CTDB_NO_MEMORY_NULL(ctdb, state); + talloc_steal(state, data->dptr); + + state->state = CTDB_CALL_DONE; + state->call = *call; + state->ctdb_db = ctdb_db; + + ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn); + talloc_steal(state, state->call.reply_data.dptr); + + return state; +} /* make a ctdb call to the local daemon - async send. Called from client context. @@ -254,107 +269,109 @@ static int ctdb_client_call_destructor(struct ctdb_call_state *state) This constructs a ctdb_call request and queues it for processing. This call never blocks. */ -struct ctdb_call_state *ctdb_client_call_send(struct ctdb_db_context *ctdb_db, - struct ctdb_call *call) +struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call) { - struct ctdb_call_state *state; + struct ctdb_client_call_state *state; struct ctdb_context *ctdb = ctdb_db->ctdb; struct ctdb_ltdb_header header; TDB_DATA data; int ret; size_t len; + struct ctdb_req_call *c; /* if the domain socket is not yet open, open it */ if (ctdb->daemon.sd==-1) { - ux_socket_connect(ctdb); + ctdb_socket_connect(ctdb); } ret = ctdb_ltdb_lock(ctdb_db, call->key); if (ret != 0) { - printf("failed to lock ltdb record\n"); + DEBUG(0,(__location__ " Failed to get chainlock\n")); return NULL; } ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data); if (ret != 0) { ctdb_ltdb_unlock(ctdb_db, call->key); + DEBUG(0,(__location__ " Failed to fetch record\n")); return NULL; } -#if 0 if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { - state = ctdb_call_local_send(ctdb_db, call, &header, &data); + state = ctdb_client_call_local_send(ctdb_db, call, &header, &data); + talloc_free(data.dptr); ctdb_ltdb_unlock(ctdb_db, call->key); return state; } -#endif - state = talloc_zero(ctdb_db, struct ctdb_call_state); + ctdb_ltdb_unlock(ctdb_db, call->key); + talloc_free(data.dptr); + + state = talloc_zero(ctdb_db, struct ctdb_client_call_state); if (state == NULL) { - printf("failed to allocate state\n"); - ctdb_ltdb_unlock(ctdb_db, call->key); + DEBUG(0, (__location__ " failed to allocate state\n")); return NULL; } - talloc_steal(state, data.dptr); - len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize; - state->c = ctdbd_allocate_pkt(ctdb, len); - if (state->c == NULL) { - printf("failed to allocate packet\n"); - ctdb_ltdb_unlock(ctdb_db, call->key); + c = ctdbd_allocate_pkt(state, len); + if (c == NULL) { + DEBUG(0, (__location__ " failed to allocate packet\n")); return NULL; } - talloc_set_name_const(state->c, "ctdbd req_call packet"); - talloc_steal(state, state->c); + talloc_set_name_const(c, "ctdb client req_call packet"); + memset(c, 0, offsetof(struct ctdb_req_call, data)); - state->c->hdr.length = len; - state->c->hdr.ctdb_magic = CTDB_MAGIC; - state->c->hdr.ctdb_version = CTDB_VERSION; - state->c->hdr.operation = CTDB_REQ_CALL; - state->c->hdr.destnode = header.dmaster; - state->c->hdr.srcnode = ctdb->vnn; + c->hdr.length = len; + c->hdr.ctdb_magic = CTDB_MAGIC; + c->hdr.ctdb_version = CTDB_VERSION; + c->hdr.operation = CTDB_REQ_CALL; /* this limits us to 16k outstanding messages - not unreasonable */ - state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); - state->c->flags = call->flags; - state->c->db_id = ctdb_db->db_id; - state->c->callid = call->call_id; - state->c->keylen = call->key.dsize; - state->c->calldatalen = call->call_data.dsize; - memcpy(&state->c->data[0], call->key.dptr, call->key.dsize); - memcpy(&state->c->data[call->key.dsize], + c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + c->flags = call->flags; + c->db_id = ctdb_db->db_id; + c->callid = call->call_id; + c->keylen = call->key.dsize; + c->calldatalen = call->call_data.dsize; + memcpy(&c->data[0], call->key.dptr, call->key.dsize); + memcpy(&c->data[call->key.dsize], call->call_data.dptr, call->call_data.dsize); state->call = *call; - state->call.call_data.dptr = &state->c->data[call->key.dsize]; - state->call.key.dptr = &state->c->data[0]; + state->call.call_data.dptr = &c->data[call->key.dsize]; + state->call.key.dptr = &c->data[0]; - state->node = ctdb->nodes[header.dmaster]; state->state = CTDB_CALL_WAIT; - state->header = header; state->ctdb_db = ctdb_db; + state->reqid = c->hdr.reqid; talloc_set_destructor(state, ctdb_client_call_destructor); - ctdb_client_queue_pkt(ctdb, &state->c->hdr); - -/*XXX set up timeout to cleanup if server doesnt respond - event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), - ctdb_call_timeout, state); -*/ + ctdb_client_queue_pkt(ctdb, &c->hdr); - ctdb_ltdb_unlock(ctdb_db, call->key); return state; } +/* + full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv() +*/ +int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) +{ + struct ctdb_client_call_state *state; + + state = ctdb_call_send(ctdb_db, call); + return ctdb_call_recv(state, call); +} + /* tell the daemon what messaging srvid we will use, and register the message handler function in the client */ -int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, - ctdb_message_fn_t handler, - void *private_data) +int ctdb_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, + ctdb_message_fn_t handler, + void *private_data) { struct ctdb_req_register c; @@ -362,7 +379,7 @@ int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, /* if the domain socket is not yet open, open it */ if (ctdb->daemon.sd==-1) { - ux_socket_connect(ctdb); + ctdb_socket_connect(ctdb); } ZERO_STRUCT(c); @@ -383,26 +400,10 @@ int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, } - -/* - setup handler for receipt of ctdb messages from ctdb_send_message() -*/ -int ctdb_set_message_handler(struct ctdb_context *ctdb, - uint32_t srvid, - ctdb_message_fn_t handler, - void *private_data) -{ - if (ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_set_message_handler(ctdb, srvid, handler, private_data); - } - return ctdb_daemon_set_message_handler(ctdb, srvid, handler, private_data); -} - - /* send a message - from client context */ -int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn, +int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn, uint32_t srvid, TDB_DATA data) { struct ctdb_req_message *r; @@ -436,7 +437,7 @@ int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn, /* wait for all nodes to be connected - from client */ -static void ctdb_client_connect_wait(struct ctdb_context *ctdb) +void ctdb_connect_wait(struct ctdb_context *ctdb) { struct ctdb_req_connect_wait r; int res; @@ -447,216 +448,235 @@ static void ctdb_client_connect_wait(struct ctdb_context *ctdb) r.hdr.ctdb_magic = CTDB_MAGIC; r.hdr.ctdb_version = CTDB_VERSION; r.hdr.operation = CTDB_REQ_CONNECT_WAIT; + + DEBUG(3,("ctdb_connect_wait: sending to ctdbd\n")); + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ctdb_socket_connect(ctdb); + } res = ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)&r.hdr, r.hdr.length); if (res != 0) { - printf("Failed to queue a connect wait request\n"); + DEBUG(0,(__location__ " Failed to queue a connect wait request\n")); return; } + DEBUG(3,("ctdb_connect_wait: waiting\n")); + /* now we can go into the normal wait routine, as the reply packet will update the ctdb->num_connected variable */ ctdb_daemon_connect_wait(ctdb); } /* - wait for all nodes to be connected -*/ -void ctdb_connect_wait(struct ctdb_context *ctdb) + cancel a ctdb_fetch_lock operation, releasing the lock + */ +static int fetch_lock_destructor(struct ctdb_record_handle *h) { - if (!(ctdb->flags & CTDB_FLAG_DAEMON_MODE)) { - ctdb_daemon_connect_wait(ctdb); - return; - } - - ctdb_client_connect_wait(ctdb); + ctdb_ltdb_unlock(h->ctdb_db, h->key); + return 0; } +/* + force the migration of a record to this node + */ +static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key) +{ + struct ctdb_call call; + ZERO_STRUCT(call); + call.call_id = CTDB_NULL_FUNC; + call.key = key; + call.flags = CTDB_IMMEDIATE_MIGRATION; + return ctdb_call(ctdb_db, &call); +} -struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db, - TALLOC_CTX *mem_ctx, - TDB_DATA key) +/* + get a lock on a record, and return the records data. Blocks until it gets the lock + */ +struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data) { - struct ctdb_call_state *state; - struct ctdb_context *ctdb = ctdb_db->ctdb; - struct ctdb_req_fetch_lock *req; - int len, res; + int ret; + struct ctdb_record_handle *h; - /* if the domain socket is not yet open, open it */ - if (ctdb->daemon.sd==-1) { - ux_socket_connect(ctdb); - } + /* + procedure is as follows: - state = talloc_zero(ctdb_db, struct ctdb_call_state); - if (state == NULL) { - printf("failed to allocate state\n"); - return NULL; - } - state->state = CTDB_CALL_WAIT; - state->ctdb_db = ctdb_db; - len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize; - state->c = ctdbd_allocate_pkt(ctdb, len); - if (state->c == NULL) { - printf("failed to allocate packet\n"); + 1) get the chain lock. + 2) check if we are dmaster + 3) if we are the dmaster then return handle + 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for + reply from ctdbd + 5) when we get the reply, goto (1) + */ + + h = talloc_zero(mem_ctx, struct ctdb_record_handle); + if (h == NULL) { return NULL; } - ZERO_STRUCT(*state->c); - talloc_set_name_const(state->c, "ctdbd req_fetch_lock packet"); - talloc_steal(state, state->c); - - req = (struct ctdb_req_fetch_lock *)state->c; - req->hdr.length = len; - req->hdr.ctdb_magic = CTDB_MAGIC; - req->hdr.ctdb_version = CTDB_VERSION; - req->hdr.operation = CTDB_REQ_FETCH_LOCK; - req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); - req->db_id = ctdb_db->db_id; - req->keylen = key.dsize; - memcpy(&req->key[0], key.dptr, key.dsize); - - res = ctdb_client_queue_pkt(ctdb, &req->hdr); - if (res != 0) { + + h->ctdb_db = ctdb_db; + h->key = key; + h->key.dptr = talloc_memdup(h, key.dptr, key.dsize); + if (h->key.dptr == NULL) { + talloc_free(h); return NULL; } + h->data = data; - talloc_free(req); - - return state; -} + DEBUG(3,("ctdb_fetch_lock: key=%*.*s\n", key.dsize, key.dsize, + (const char *)key.dptr)); +again: + /* step 1 - get the chain lock */ + ret = ctdb_ltdb_lock(ctdb_db, key); + if (ret != 0) { + DEBUG(0, (__location__ " failed to lock ltdb record\n")); + talloc_free(h); + return NULL; + } -struct ctdb_call_state *ctdb_client_store_unlock_send( - struct ctdb_record_handle *rh, - TALLOC_CTX *mem_ctx, - TDB_DATA data) -{ - struct ctdb_call_state *state; - struct ctdb_db_context *ctdb_db = talloc_get_type(rh->ctdb_db, struct ctdb_db_context); - struct ctdb_context *ctdb = ctdb_db->ctdb; - struct ctdb_req_store_unlock *req; - int len, res; + DEBUG(4,("ctdb_fetch_lock: got chain lock\n")); - /* if the domain socket is not yet open, open it */ - if (ctdb->daemon.sd==-1) { - ux_socket_connect(ctdb); - } + talloc_set_destructor(h, fetch_lock_destructor); - state = talloc_zero(ctdb_db, struct ctdb_call_state); - if (state == NULL) { - printf("failed to allocate state\n"); - return NULL; - } - state->state = CTDB_CALL_WAIT; - state->ctdb_db = ctdb_db; - len = offsetof(struct ctdb_req_store_unlock, data) + rh->key.dsize + data.dsize; - state->c = ctdbd_allocate_pkt(ctdb, len); - if (state->c == NULL) { - printf("failed to allocate packet\n"); + ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data); + if (ret != 0) { + ctdb_ltdb_unlock(ctdb_db, key); + talloc_free(h); return NULL; } - ZERO_STRUCT(*state->c); - talloc_set_name_const(state->c, "ctdbd req_store_unlock packet"); - talloc_steal(state, state->c); - - req = (struct ctdb_req_store_unlock *)state->c; - req->hdr.length = len; - req->hdr.ctdb_magic = CTDB_MAGIC; - req->hdr.ctdb_version = CTDB_VERSION; - req->hdr.operation = CTDB_REQ_STORE_UNLOCK; - req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); - req->db_id = ctdb_db->db_id; - req->keylen = rh->key.dsize; - req->datalen = data.dsize; - memcpy(&req->data[0], rh->key.dptr, rh->key.dsize); - memcpy(&req->data[req->keylen], data.dptr, data.dsize); - - res = ctdb_client_queue_pkt(ctdb, &req->hdr); - if (res != 0) { - return NULL; + + /* when torturing, ensure we test the remote path */ + if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) && + random() % 5 == 0) { + h->header.dmaster = (uint32_t)-1; } - talloc_free(req); - return state; + DEBUG(4,("ctdb_fetch_lock: done local fetch\n")); + + if (h->header.dmaster != ctdb_db->ctdb->vnn) { + ctdb_ltdb_unlock(ctdb_db, key); + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(4,("ctdb_fetch_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + goto again; + } + + DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n")); + return h; } /* - make a recv call to the local ctdb daemon - called from client context + store some data to the record that was locked with ctdb_fetch_lock() +*/ +int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data) +{ + return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data); +} - This is called when the program wants to wait for a ctdb_fetch_lock to complete and get the - results. This call will block unless the call has already completed. +/* + wait until we're the only node left. + this function never returns */ -struct ctdb_record_handle *ctdb_client_fetch_lock_recv(struct ctdb_call_state *state, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data) +void ctdb_shutdown(struct ctdb_context *ctdb) { - struct ctdb_record_handle *rec; + struct ctdb_req_shutdown r; + int len; - while (state->state < CTDB_CALL_DONE) { - event_loop_once(state->ctdb_db->ctdb->ev); - } - if (state->state != CTDB_CALL_DONE) { - ctdb_set_error(state->node->ctdb, "%s", state->errmsg); - talloc_free(state); - return NULL; + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ctdb_socket_connect(ctdb); } - rec = talloc(mem_ctx, struct ctdb_record_handle); - CTDB_NO_MEMORY_NULL(state->ctdb_db->ctdb, rec); + len = sizeof(struct ctdb_req_shutdown); + ZERO_STRUCT(r); + r.hdr.length = len; + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.operation = CTDB_REQ_SHUTDOWN; + r.hdr.reqid = 0; - rec->ctdb_db = state->ctdb_db; - rec->key = key; - rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize); - rec->data = talloc(rec, TDB_DATA); - rec->data->dsize = state->call.reply_data.dsize; - rec->data->dptr = talloc_memdup(rec, state->call.reply_data.dptr, rec->data->dsize); + ctdb_client_queue_pkt(ctdb, &(r.hdr)); - if (data) { - *data = *rec->data; + /* this event loop will terminate once we receive the reply */ + while (1) { + event_loop_once(ctdb->ev); } - return rec; } -/* - make a recv call to the local ctdb daemon - called from client context +enum ctdb_status_states {CTDB_STATUS_WAIT, CTDB_STATUS_DONE}; - This is called when the program wants to wait for a ctdb_store_unlock to complete and get the - results. This call will block unless the call has already completed. -*/ -int ctdb_client_store_unlock_recv(struct ctdb_call_state *state, struct ctdb_record_handle *rec) +struct ctdb_status_state { + uint32_t reqid; + struct ctdb_status *status; + enum ctdb_status_states state; +}; + +/* + handle a ctdb_reply_status reply + */ +static void ctdb_reply_status(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { - while (state->state < CTDB_CALL_DONE) { - event_loop_once(state->ctdb_db->ctdb->ev); - } - if (state->state != CTDB_CALL_DONE) { - ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + struct ctdb_reply_status *r = (struct ctdb_reply_status *)hdr; + struct ctdb_status_state *state; + + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_status_state); + if (state == NULL) { + DEBUG(0, ("reqid %d not found\n", hdr->reqid)); + return; } - talloc_free(state); - return state->state; + *state->status = r->status; + state->state = CTDB_STATUS_DONE; } -struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db, - TALLOC_CTX *mem_ctx, - TDB_DATA key, - TDB_DATA *data) +/* + wait until we're the only node left. + this function never returns +*/ +int ctdb_status(struct ctdb_context *ctdb, struct ctdb_status *status) { - struct ctdb_call_state *state; - struct ctdb_record_handle *rec; + struct ctdb_req_status r; + int ret; + struct ctdb_status_state *state; - state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key); - rec = ctdb_client_fetch_lock_recv(state, mem_ctx, key, data); + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ctdb_socket_connect(ctdb); + } - return rec; -} + state = talloc(ctdb, struct ctdb_status_state); + CTDB_NO_MEMORY(ctdb, state); -int ctdb_client_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data) -{ - struct ctdb_call_state *state; - int res; + state->reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + state->status = status; + state->state = CTDB_STATUS_WAIT; + + ZERO_STRUCT(r); + r.hdr.length = sizeof(r); + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.operation = CTDB_REQ_STATUS; + r.hdr.reqid = state->reqid; - state = ctdb_client_store_unlock_send(rec, rec, data); - res = ctdb_client_store_unlock_recv(state, rec); + ret = ctdb_client_queue_pkt(ctdb, &(r.hdr)); + if (ret != 0) { + talloc_free(state); + return -1; + } + + while (state->state == CTDB_STATUS_WAIT) { + event_loop_once(ctdb->ev); + } - talloc_free(rec); + talloc_free(state); - return res; + return 0; } + diff --git a/source4/cluster/ctdb/common/ctdb_daemon.c b/source4/cluster/ctdb/common/ctdb_daemon.c index 945030d77e..ff3431a392 100644 --- a/source4/cluster/ctdb/common/ctdb_daemon.c +++ b/source4/cluster/ctdb/common/ctdb_daemon.c @@ -25,9 +25,23 @@ #include "lib/util/dlinklist.h" #include "system/network.h" #include "system/filesys.h" +#include "system/wait.h" #include "../include/ctdb.h" #include "../include/ctdb_private.h" +/* + structure describing a connected client in the daemon + */ +struct ctdb_client { + struct ctdb_context *ctdb; + int fd; + struct ctdb_queue *queue; +}; + + + +static void daemon_incoming_packet(void *, uint8_t *, uint32_t ); + static void ctdb_main_loop(struct ctdb_context *ctdb) { ctdb->methods->start(ctdb); @@ -35,7 +49,7 @@ static void ctdb_main_loop(struct ctdb_context *ctdb) /* go into a wait loop to allow other nodes to complete */ event_loop_wait(ctdb->ev); - printf("event_loop_wait() returned. this should not happen\n"); + DEBUG(0,("event_loop_wait() returned. this should not happen\n")); exit(1); } @@ -47,16 +61,27 @@ static void set_non_blocking(int fd) fcntl(fd, F_SETFL, v | O_NONBLOCK); } +static void block_signal(int signum) +{ + struct sigaction act; + + memset(&act, 0, sizeof(act)); + + act.sa_handler = SIG_IGN; + sigemptyset(&act.sa_mask); + sigaddset(&act.sa_mask, signum); + sigaction(signum, &act, NULL); +} + /* - structure describing a connected client in the daemon + send a packet to a client */ -struct ctdb_client { - struct ctdb_context *ctdb; - int fd; - struct ctdb_queue *queue; -}; - +static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr) +{ + client->ctdb->status.client_packets_sent++; + return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length); +} /* message handler for when we are in daemon mode. This redirects the message @@ -73,10 +98,9 @@ static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, len = offsetof(struct ctdb_req_message, data) + data.dsize; r = ctdbd_allocate_pkt(ctdb, len); -/*XXX cant use this since it returns an int CTDB_NO_MEMORY(ctdb, r);*/ talloc_set_name_const(r, "req_message packet"); - ZERO_STRUCT(*r); + memset(r, 0, offsetof(struct ctdb_req_message, data)); r->hdr.length = len; r->hdr.ctdb_magic = CTDB_MAGIC; @@ -85,11 +109,10 @@ static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, r->srvid = srvid; r->datalen = data.dsize; memcpy(&r->data[0], data.dptr, data.dsize); - - ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, len); + + daemon_queue_send(client, &r->hdr); talloc_free(r); - return; } @@ -105,187 +128,122 @@ static void daemon_request_register_message_handler(struct ctdb_client *client, c->srvid, daemon_message_handler, client); if (res != 0) { - printf("Failed to register handler %u in daemon\n", c->srvid); + DEBUG(0,(__location__ " Failed to register handler %u in daemon\n", + c->srvid)); + } else { + DEBUG(2,(__location__ " Registered message handler for srvid=%u\n", + c->srvid)); } } -static struct ctdb_call_state *ctdb_fetch_lock_send(struct ctdb_db_context *ctdb_db, - TALLOC_CTX *mem_ctx, - TDB_DATA key, TDB_DATA *data) +/* + called when the daemon gets a shutdown request from a client + */ +static void daemon_request_shutdown(struct ctdb_client *client, + struct ctdb_req_shutdown *f) { - struct ctdb_call *call; - struct ctdb_record_handle *rec; - struct ctdb_call_state *state; + struct ctdb_context *ctdb = talloc_get_type(client->ctdb, struct ctdb_context); + int len; + uint32_t node; - rec = talloc(mem_ctx, struct ctdb_record_handle); - CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec); + /* we dont send to ourself so we can already count one daemon as + exiting */ + ctdb->num_finished++; - - call = talloc(rec, struct ctdb_call); - ZERO_STRUCT(*call); - call->call_id = CTDB_FETCH_FUNC; - call->key = key; - call->flags = CTDB_IMMEDIATE_MIGRATION; + /* loop over all nodes of the cluster */ + for (node=0; nodenum_nodes;node++) { + struct ctdb_req_finished *rf; - rec->ctdb_db = ctdb_db; - rec->key = key; - rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize); - rec->data = data; + /* dont send a message to ourself */ + if (ctdb->vnn == node) { + continue; + } - state = ctdb_call_send(ctdb_db, call); - state->fetch_private = rec; + len = sizeof(struct ctdb_req_finished); + rf = ctdb->methods->allocate_pkt(ctdb, len); + CTDB_NO_MEMORY_FATAL(ctdb, rf); + talloc_set_name_const(rf, "ctdb_req_finished packet"); - return state; -} + ZERO_STRUCT(*rf); + rf->hdr.length = len; + rf->hdr.ctdb_magic = CTDB_MAGIC; + rf->hdr.ctdb_version = CTDB_VERSION; + rf->hdr.operation = CTDB_REQ_FINISHED; + rf->hdr.destnode = node; + rf->hdr.srcnode = ctdb->vnn; + rf->hdr.reqid = 0; -struct client_fetch_lock_data { - struct ctdb_client *client; - uint32_t reqid; -}; -static void daemon_fetch_lock_complete(struct ctdb_call_state *state) -{ - struct ctdb_reply_fetch_lock *r; - struct client_fetch_lock_data *data = talloc_get_type(state->async.private_data, struct client_fetch_lock_data); - struct ctdb_client *client = talloc_get_type(data->client, struct ctdb_client); - int length, res; + ctdb_queue_packet(ctdb, &(rf->hdr)); - length = offsetof(struct ctdb_reply_fetch_lock, data) + state->call.reply_data.dsize; - r = ctdbd_allocate_pkt(client->ctdb, length); - if (r == NULL) { - printf("Failed to allocate reply_call in ctdb daemon\n"); - return; + talloc_free(rf); } - ZERO_STRUCT(*r); - r->hdr.length = length; - r->hdr.ctdb_magic = CTDB_MAGIC; - r->hdr.ctdb_version = CTDB_VERSION; - r->hdr.operation = CTDB_REPLY_FETCH_LOCK; - r->hdr.reqid = data->reqid; - r->state = state->state; - r->datalen = state->call.reply_data.dsize; - memcpy(&r->data[0], state->call.reply_data.dptr, r->datalen); - res = ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, r->hdr.length); - if (res != 0) { - printf("Failed to queue packet from daemon to client\n"); + /* wait until all nodes have are prepared to shutdown */ + while (ctdb->num_finished != ctdb->num_nodes) { + event_loop_once(ctdb->ev); } - talloc_free(r); -} -/* - called when the daemon gets a fetch lock request from a client - */ -static void daemon_request_fetch_lock(struct ctdb_client *client, - struct ctdb_req_fetch_lock *f) -{ - struct ctdb_call_state *state; - TDB_DATA key, *data; - struct ctdb_db_context *ctdb_db; - struct client_fetch_lock_data *fl_data; - - ctdb_db = find_ctdb_db(client->ctdb, f->db_id); - - key.dsize = f->keylen; - key.dptr = &f->key[0]; - - data = talloc(client, TDB_DATA); - data->dptr = NULL; - data->dsize = 0; + /* all daemons have requested to finish - we now exit */ + DEBUG(1,("All daemons finished - exiting\n")); + _exit(0); +} - state = ctdb_fetch_lock_send(ctdb_db, client, key, data); - talloc_steal(state, data); - fl_data = talloc(state, struct client_fetch_lock_data); - fl_data->client = client; - fl_data->reqid = f->hdr.reqid; - state->async.fn = daemon_fetch_lock_complete; - state->async.private_data = fl_data; -} /* - called when the daemon gets a store unlock request from a client - - this would never block? + called when the daemon gets a connect wait request from a client */ -static void daemon_request_store_unlock(struct ctdb_client *client, - struct ctdb_req_store_unlock *f) +static void daemon_request_connect_wait(struct ctdb_client *client, + struct ctdb_req_connect_wait *c) { - struct ctdb_db_context *ctdb_db; - struct ctdb_reply_store_unlock r; - uint32_t caller = ctdb_get_vnn(client->ctdb); - struct ctdb_ltdb_header header; - TDB_DATA key, data; + struct ctdb_reply_connect_wait r; int res; - ctdb_db = find_ctdb_db(client->ctdb, f->db_id); - - /* write the data to ltdb */ - key.dsize = f->keylen; - key.dptr = &f->data[0]; - res = ctdb_ltdb_fetch(ctdb_db, key, &header, NULL, NULL); - if (res) { - ctdb_set_error(ctdb_db->ctdb, "Fetch of locally held record failed"); - res = -1; - goto done; - } - if (header.laccessor != caller) { - header.lacount = 0; - } - header.laccessor = caller; - header.lacount++; - data.dsize = f->datalen; - data.dptr = &f->data[f->keylen]; - res = ctdb_ltdb_store(ctdb_db, key, &header, data); - if ( res != 0) { - ctdb_set_error(ctdb_db->ctdb, "ctdb_call tdb_store failed\n"); - } - + /* first wait - in the daemon */ + ctdb_daemon_connect_wait(client->ctdb); -done: /* now send the reply */ ZERO_STRUCT(r); r.hdr.length = sizeof(r); r.hdr.ctdb_magic = CTDB_MAGIC; r.hdr.ctdb_version = CTDB_VERSION; - r.hdr.operation = CTDB_REPLY_STORE_UNLOCK; - r.hdr.reqid = f->hdr.reqid; - r.state = res; + r.hdr.operation = CTDB_REPLY_CONNECT_WAIT; + r.vnn = ctdb_get_vnn(client->ctdb); + r.num_connected = client->ctdb->num_connected; - res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length); + res = daemon_queue_send(client, &r.hdr); if (res != 0) { - printf("Failed to queue a store unlock response\n"); + DEBUG(0,(__location__ " Failed to queue a connect wait response\n")); return; } } + /* - called when the daemon gets a connect wait request from a client + called when the daemon gets a status request from a client */ -static void daemon_request_connect_wait(struct ctdb_client *client, - struct ctdb_req_connect_wait *c) +static void daemon_request_status(struct ctdb_client *client, + struct ctdb_req_status *c) { - struct ctdb_reply_connect_wait r; + struct ctdb_reply_status r; int res; - /* first wait - in the daemon */ - ctdb_daemon_connect_wait(client->ctdb); - /* now send the reply */ ZERO_STRUCT(r); r.hdr.length = sizeof(r); r.hdr.ctdb_magic = CTDB_MAGIC; r.hdr.ctdb_version = CTDB_VERSION; - r.hdr.operation = CTDB_REPLY_CONNECT_WAIT; - r.vnn = ctdb_get_vnn(client->ctdb); - r.num_connected = client->ctdb->num_connected; + r.hdr.operation = CTDB_REPLY_STATUS; + r.hdr.reqid = c->hdr.reqid; + r.status = client->ctdb->status; - res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length); + res = daemon_queue_send(client, &r.hdr); if (res != 0) { - printf("Failed to queue a connect wait response\n"); + DEBUG(0,(__location__ " Failed to queue a connect wait response\n")); return; } } @@ -323,11 +281,69 @@ static void daemon_request_message_from_client(struct ctdb_client *client, res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode, c->srvid, data); if (res != 0) { - printf("Failed to send message to remote node %u\n", - c->hdr.destnode); + DEBUG(0,(__location__ " Failed to send message to remote node %u\n", + c->hdr.destnode)); + } +} + + +struct daemon_call_state { + struct ctdb_client *client; + uint32_t reqid; + struct ctdb_call *call; + struct timeval start_time; +}; + +/* + complete a call from a client +*/ +static void daemon_call_from_client_callback(struct ctdb_call_state *state) +{ + struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, + struct daemon_call_state); + struct ctdb_reply_call *r; + int res; + uint32_t length; + struct ctdb_client *client = dstate->client; + + talloc_steal(client, dstate); + talloc_steal(dstate, dstate->call); + + res = ctdb_daemon_call_recv(state, dstate->call); + if (res != 0) { + DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n")); + client->ctdb->status.pending_calls--; + ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time); + return; } + + length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize; + r = ctdbd_allocate_pkt(dstate, length); + if (r == NULL) { + DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n")); + client->ctdb->status.pending_calls--; + ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time); + return; + } + memset(r, 0, offsetof(struct ctdb_reply_call, data)); + r->hdr.length = length; + r->hdr.ctdb_magic = CTDB_MAGIC; + r->hdr.ctdb_version = CTDB_VERSION; + r->hdr.operation = CTDB_REPLY_CALL; + r->hdr.reqid = dstate->reqid; + r->datalen = dstate->call->reply_data.dsize; + memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen); + + res = daemon_queue_send(client, &r->hdr); + if (res != 0) { + DEBUG(0, (__location__ "Failed to queue packet from daemon to client\n")); + } + ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time); + talloc_free(dstate); + client->ctdb->status.pending_calls--; } + /* this is called when the ctdb daemon received a ctdb request call from a local client over the unix domain socket @@ -337,103 +353,159 @@ static void daemon_request_call_from_client(struct ctdb_client *client, { struct ctdb_call_state *state; struct ctdb_db_context *ctdb_db; - struct ctdb_call call; - struct ctdb_reply_call *r; - int res; - uint32_t length; + struct daemon_call_state *dstate; + struct ctdb_call *call; + struct ctdb_ltdb_header header; + TDB_DATA key, data; + int ret; + struct ctdb_context *ctdb = client->ctdb; + + ctdb->status.total_calls++; + ctdb->status.pending_calls++; ctdb_db = find_ctdb_db(client->ctdb, c->db_id); if (!ctdb_db) { - printf("Unknown database in request. db_id==0x%08x",c->db_id); + DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x", + c->db_id)); + ctdb->status.pending_calls--; return; } - ZERO_STRUCT(call); - call.call_id = c->callid; - call.key.dptr = c->data; - call.key.dsize = c->keylen; - call.call_data.dptr = c->data + c->keylen; - call.call_data.dsize = c->calldatalen; + key.dptr = c->data; + key.dsize = c->keylen; - state = ctdb_call_send(ctdb_db, &call); + ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, + (struct ctdb_req_header *)c, &data, + daemon_incoming_packet, client); + if (ret == -2) { + /* will retry later */ + ctdb->status.pending_calls--; + return; + } -/* XXX this must be converted to fully async */ - res = ctdb_call_recv(state, &call); - if (res != 0) { - printf("ctdbd_call_recv() returned error\n"); - exit(1); + if (ret != 0) { + DEBUG(0,(__location__ " Unable to fetch record\n")); + ctdb->status.pending_calls--; + return; } - length = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize; - r = ctdbd_allocate_pkt(client->ctdb, length); - if (r == NULL) { - printf("Failed to allocate reply_call in ctdb daemon\n"); + dstate = talloc(client, struct daemon_call_state); + if (dstate == NULL) { + ctdb_ltdb_unlock(ctdb_db, key); + DEBUG(0,(__location__ " Unable to allocate dstate\n")); + ctdb->status.pending_calls--; + return; + } + dstate->start_time = timeval_current(); + dstate->client = client; + dstate->reqid = c->hdr.reqid; + talloc_steal(dstate, data.dptr); + + call = dstate->call = talloc_zero(dstate, struct ctdb_call); + if (call == NULL) { + ctdb_ltdb_unlock(ctdb_db, key); + DEBUG(0,(__location__ " Unable to allocate call\n")); + ctdb->status.pending_calls--; + ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time); return; } - ZERO_STRUCT(*r); - r->hdr.length = length; - r->hdr.ctdb_magic = CTDB_MAGIC; - r->hdr.ctdb_version = CTDB_VERSION; - r->hdr.operation = CTDB_REPLY_CALL; - r->hdr.reqid = c->hdr.reqid; - r->datalen = call.reply_data.dsize; - memcpy(&r->data[0], call.reply_data.dptr, r->datalen); - res = ctdb_queue_send(client->queue, (uint8_t *)&r, r->hdr.length); - if (res != 0) { - printf("Failed to queue packet from daemon to client\n"); + call->call_id = c->callid; + call->key = key; + call->call_data.dptr = c->data + c->keylen; + call->call_data.dsize = c->calldatalen; + call->flags = c->flags; + + if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { + state = ctdb_call_local_send(ctdb_db, call, &header, &data); + } else { + state = ctdb_daemon_call_send_remote(ctdb_db, call, &header); } - talloc_free(r); -} + ctdb_ltdb_unlock(ctdb_db, key); + + if (state == NULL) { + DEBUG(0,(__location__ " Unable to setup call send\n")); + ctdb->status.pending_calls--; + ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time); + return; + } + talloc_steal(state, dstate); + talloc_steal(client, state); + + state->async.fn = daemon_call_from_client_callback; + state->async.private_data = dstate; +} /* data contains a packet from the client */ -static void client_incoming_packet(struct ctdb_client *client, void *data, size_t nread) +static void daemon_incoming_packet(void *p, uint8_t *data, uint32_t nread) { - struct ctdb_req_header *hdr = data; + struct ctdb_req_header *hdr = (struct ctdb_req_header *)data; + struct ctdb_client *client = talloc_get_type(p, struct ctdb_client); + TALLOC_CTX *tmp_ctx; + struct ctdb_context *ctdb = client->ctdb; + + /* place the packet as a child of a tmp_ctx. We then use + talloc_free() below to free it. If any of the calls want + to keep it, then they will steal it somewhere else, and the + talloc_free() will be a no-op */ + tmp_ctx = talloc_new(client); + talloc_steal(tmp_ctx, hdr); if (hdr->ctdb_magic != CTDB_MAGIC) { - ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n"); + ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n"); goto done; } if (hdr->ctdb_version != CTDB_VERSION) { - ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); + ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version); goto done; } switch (hdr->operation) { case CTDB_REQ_CALL: + ctdb->status.client.req_call++; daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr); break; case CTDB_REQ_REGISTER: + ctdb->status.client.req_register++; daemon_request_register_message_handler(client, (struct ctdb_req_register *)hdr); break; case CTDB_REQ_MESSAGE: + ctdb->status.client.req_message++; daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr); break; case CTDB_REQ_CONNECT_WAIT: + ctdb->status.client.req_connect_wait++; daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr); break; - case CTDB_REQ_FETCH_LOCK: - daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr); + + case CTDB_REQ_SHUTDOWN: + ctdb->status.client.req_shutdown++; + daemon_request_shutdown(client, (struct ctdb_req_shutdown *)hdr); break; - case CTDB_REQ_STORE_UNLOCK: - daemon_request_store_unlock(client, (struct ctdb_req_store_unlock *)hdr); + + case CTDB_REQ_STATUS: + ctdb->status.client.req_status++; + daemon_request_status(client, (struct ctdb_req_status *)hdr); break; + default: - printf("daemon: unrecognized operation:%d\n",hdr->operation); + DEBUG(0,(__location__ " daemon: unrecognized operation %d\n", + hdr->operation)); } done: - talloc_free(data); + talloc_free(tmp_ctx); } - -static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) +/* + called when the daemon gets a incoming packet + */ +static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args) { struct ctdb_client *client = talloc_get_type(args, struct ctdb_client); struct ctdb_req_header *hdr; @@ -443,13 +515,15 @@ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) return; } + client->ctdb->status.client_packets_recv++; + if (cnt < sizeof(*hdr)) { - ctdb_set_error(client->ctdb, "Bad packet length %d\n", cnt); + ctdb_set_error(client->ctdb, "Bad packet length %d in daemon\n", cnt); return; } hdr = (struct ctdb_req_header *)data; if (cnt != hdr->length) { - ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n", + ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n in daemon", hdr->length, cnt); return; } @@ -460,12 +534,16 @@ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) } if (hdr->ctdb_version != CTDB_VERSION) { - ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); + ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version); return; } + DEBUG(3,(__location__ " client request %d of type %d length %d from " + "node %d to %d\n", hdr->reqid, hdr->operation, hdr->length, + hdr->srcnode, hdr->destnode)); + /* it is the responsibility of the incoming packet function to free 'data' */ - client_incoming_packet(client, data, cnt); + daemon_incoming_packet(client, data, cnt); } static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, @@ -490,7 +568,7 @@ static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, client->fd = fd; client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, - ctdb_client_read_cb, client); + ctdb_daemon_read_cb, client); talloc_set_destructor(client, ctdb_client_destructor); } @@ -507,10 +585,10 @@ static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde /* XXX this is a good place to try doing some cleaning up before exiting */ cnt = read(*fd, &buf, 1); if (cnt==0) { - printf("parent process exited. filedescriptor dissappeared\n"); + DEBUG(2,(__location__ " parent process exited. filedescriptor dissappeared\n")); exit(1); } else { - printf("ctdb: did not expect data from parent process\n"); + DEBUG(0,(__location__ " ctdb: did not expect data from parent process\n")); exit(1); } } @@ -559,7 +637,7 @@ static int unlink_destructor(const char *name) /* start the protocol going */ -int ctdbd_start(struct ctdb_context *ctdb) +int ctdb_start(struct ctdb_context *ctdb) { pid_t pid; static int fd[2]; @@ -575,18 +653,18 @@ int ctdbd_start(struct ctdb_context *ctdb) /* create a unix domain stream socket to listen to */ res = ux_socket_bind(ctdb); if (res!=0) { - printf("Failed to open CTDB unix domain socket\n"); + DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n")); exit(10); } res = pipe(&fd[0]); if (res) { - printf("Failed to open pipe for CTDB\n"); + DEBUG(0,(__location__ " Failed to open pipe for CTDB\n")); exit(1); } pid = fork(); if (pid==-1) { - printf("Failed to fork CTDB daemon\n"); + DEBUG(0,(__location__ " Failed to fork CTDB daemon\n")); exit(1); } @@ -597,12 +675,14 @@ int ctdbd_start(struct ctdb_context *ctdb) return 0; } + block_signal(SIGPIPE); + /* ensure the socket is deleted on exit of the daemon */ domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name); talloc_set_destructor(domain_socket_name, unlink_destructor); close(fd[1]); - ctdb_clear_flags(ctdb, CTDB_FLAG_DAEMON_MODE); + ctdb->ev = event_context_init(NULL); fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]); fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb); @@ -614,18 +694,18 @@ int ctdbd_start(struct ctdb_context *ctdb) /* allocate a packet for use in client<->daemon communication */ -void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len) +void *ctdbd_allocate_pkt(TALLOC_CTX *mem_ctx, size_t len) { int size; size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1); - return talloc_size(ctdb, size); + return talloc_size(mem_ctx, size); } -int ctdb_daemon_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, - ctdb_message_fn_t handler, - void *private_data) +/* + called when a CTDB_REQ_FINISHED packet comes in +*/ +void ctdb_request_finished(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { - return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data); + ctdb->num_finished++; } - diff --git a/source4/cluster/ctdb/common/ctdb_lockwait.c b/source4/cluster/ctdb/common/ctdb_lockwait.c new file mode 100644 index 0000000000..36b08796be --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb_lockwait.c @@ -0,0 +1,139 @@ +/* + wait for a tdb chain lock + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "system/filesys.h" +#include "system/wait.h" +#include "popt.h" +#include "db_wrap.h" +#include "lib/tdb/include/tdb.h" +#include "../include/ctdb_private.h" + + +struct lockwait_handle { + struct ctdb_context *ctdb; + struct fd_event *fde; + int fd[2]; + pid_t child; + void *private_data; + void (*callback)(void *); + struct timeval start_time; +}; + +static void lockwait_handler(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private_data) +{ + struct lockwait_handle *h = talloc_get_type(private_data, + struct lockwait_handle); + void (*callback)(void *) = h->callback; + void *p = h->private_data; + pid_t child = h->child; + talloc_set_destructor(h, NULL); + close(h->fd[0]); + ctdb_latency(&h->ctdb->status.max_lockwait_latency, h->start_time); + h->ctdb->status.pending_lockwait_calls--; + talloc_free(h); + callback(p); + waitpid(child, NULL, 0); +} + +static int lockwait_destructor(struct lockwait_handle *h) +{ + h->ctdb->status.pending_lockwait_calls--; + close(h->fd[0]); + kill(h->child, SIGKILL); + waitpid(h->child, NULL, 0); + return 0; +} + +/* + setup a non-blocking chainlock on a tdb record. If this function + returns NULL then it could not get the chainlock. Otherwise it + returns a opaque handle, and will call callback() once it has + managed to get the chainlock. You can cancel it by using talloc_free + on the returned handle. + + It is the callers responsibility to unlock the chainlock once + acquired + */ +struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db, + TDB_DATA key, + void (*callback)(void *private_data), + void *private_data) +{ + struct lockwait_handle *result; + int ret; + + ctdb_db->ctdb->status.lockwait_calls++; + ctdb_db->ctdb->status.pending_lockwait_calls++; + + if (!(result = talloc_zero(ctdb_db, struct lockwait_handle))) { + ctdb_db->ctdb->status.pending_lockwait_calls--; + return NULL; + } + + ret = pipe(result->fd); + + if (ret != 0) { + talloc_free(result); + ctdb_db->ctdb->status.pending_lockwait_calls--; + return NULL; + } + + result->child = fork(); + + if (result->child == (pid_t)-1) { + close(result->fd[0]); + close(result->fd[1]); + talloc_free(result); + ctdb_db->ctdb->status.pending_lockwait_calls--; + return NULL; + } + + result->callback = callback; + result->private_data = private_data; + result->ctdb = ctdb_db->ctdb; + + if (result->child == 0) { + close(result->fd[0]); + /* + * Do we need a tdb_reopen here? + */ + tdb_chainlock(ctdb_db->ltdb->tdb, key); + _exit(0); + } + + close(result->fd[1]); + talloc_set_destructor(result, lockwait_destructor); + + result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0], + EVENT_FD_READ, lockwait_handler, + (void *)result); + if (result->fde == NULL) { + talloc_free(result); + ctdb_db->ctdb->status.pending_lockwait_calls--; + return NULL; + } + + result->start_time = timeval_current(); + + return result; +} diff --git a/source4/cluster/ctdb/common/ctdb_ltdb.c b/source4/cluster/ctdb/common/ctdb_ltdb.c index 785ccad9b3..cb07a72375 100644 --- a/source4/cluster/ctdb/common/ctdb_ltdb.c +++ b/source4/cluster/ctdb/common/ctdb_ltdb.c @@ -45,9 +45,8 @@ struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *na /* this is the dummy null procedure that all databases support */ -static int ctdb_fetch_func(struct ctdb_call_info *call) +static int ctdb_null_func(struct ctdb_call_info *call) { - call->reply_data = &call->record_data; return 0; } @@ -82,10 +81,21 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, } } + if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) { + DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n", + ctdb->db_directory)); + talloc_free(ctdb_db); + return NULL; + } + + /* add the node id to the database name, so when we run on loopback + we don't conflict in the local filesystem */ + name = talloc_asprintf(ctdb_db, "%s/%s", ctdb->db_directory, name); + /* when we have a separate daemon this will need to be a real file, not a TDB_INTERNAL, so the parent can access it to for ltdb bypass */ - ctdb_db->ltdb = tdb_wrap_open(ctdb, name, 0, TDB_INTERNAL, open_flags, mode); + ctdb_db->ltdb = tdb_wrap_open(ctdb, name, 0, TDB_CLEAR_IF_FIRST, open_flags, mode); if (ctdb_db->ltdb == NULL) { ctdb_set_error(ctdb, "Failed to open tdb %s\n", name); talloc_free(ctdb_db); @@ -94,9 +104,10 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, /* - all databases support the "fetch" function. we need this in order to do forced migration of records + all databases support the "null" function. we need this in + order to do forced migration of records */ - ret = ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC); + ret = ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC); if (ret != 0) { talloc_free(ctdb_db); return NULL; @@ -145,13 +156,15 @@ int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db, rec = tdb_fetch(ctdb_db->ltdb->tdb, key); if (rec.dsize < sizeof(*header)) { + TDB_DATA d2; /* return an initial header */ - free(rec.dptr); + if (rec.dptr) free(rec.dptr); ltdb_initial_header(ctdb_db, key, header); + ZERO_STRUCT(d2); if (data) { - data->dptr = NULL; - data->dsize = 0; + *data = d2; } + ctdb_ltdb_store(ctdb_db, key, header, d2); return 0; } @@ -215,3 +228,118 @@ int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key) return tdb_chainunlock(ctdb_db->ltdb->tdb, key); } +struct lock_fetch_state { + struct ctdb_context *ctdb; + void (*recv_pkt)(void *, uint8_t *, uint32_t); + void *recv_context; + struct ctdb_req_header *hdr; +}; + +/* + called when we should retry the operation + */ +static void lock_fetch_callback(void *p) +{ + struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state); + state->recv_pkt(state->recv_context, (uint8_t *)state->hdr, state->hdr->length); + talloc_free(state); + DEBUG(2,(__location__ " PACKET REQUEUED\n")); +} + + +/* + do a non-blocking ltdb_lock, deferring this ctdb request until we + have the chainlock + + It does the following: + + 1) tries to get the chainlock. If it succeeds, then it returns 0 + + 2) if it fails to get a chainlock immediately then it sets up a + non-blocking chainlock via ctdb_lockwait, and when it gets the + chainlock it re-submits this ctdb request to the main packet + receive function + + This effectively queues all ctdb requests that cannot be + immediately satisfied until it can get the lock. This means that + the main ctdb daemon will not block waiting for a chainlock held by + a client + + There are 3 possible return values: + + 0: means that it got the lock immediately. + -1: means that it failed to get the lock, and won't retry + -2: means that it failed to get the lock immediately, but will retry + */ +int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, + TDB_DATA key, struct ctdb_req_header *hdr, + void (*recv_pkt)(void *, uint8_t *, uint32_t ), + void *recv_context) +{ + int ret; + struct tdb_context *tdb = ctdb_db->ltdb->tdb; + struct lockwait_handle *h; + struct lock_fetch_state *state; + + ret = tdb_chainlock_nonblock(tdb, key); + + if (ret != 0 && + !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) { + /* a hard failure - don't try again */ + return -1; + } + + /* when torturing, ensure we test the contended path */ + if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) && + random() % 5 == 0) { + ret = -1; + tdb_chainunlock(tdb, key); + } + + /* first the non-contended path */ + if (ret == 0) { + return 0; + } + + state = talloc(ctdb_db, struct lock_fetch_state); + state->ctdb = ctdb_db->ctdb; + state->hdr = hdr; + state->recv_pkt = recv_pkt; + state->recv_context = recv_context; + + /* now the contended path */ + h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state); + if (h == NULL) { + tdb_chainunlock(tdb, key); + return -1; + } + + /* we need to move the packet off the temporary context in ctdb_recv_pkt(), + so it won't be freed yet */ + talloc_steal(state, hdr); + talloc_steal(state, h); + + /* now tell the caller than we will retry asynchronously */ + return -2; +} + +/* + a varient of ctdb_ltdb_lock_requeue that also fetches the record + */ +int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, + TDB_DATA key, struct ctdb_ltdb_header *header, + struct ctdb_req_header *hdr, TDB_DATA *data, + void (*recv_pkt)(void *, uint8_t *, uint32_t ), + void *recv_context) +{ + int ret; + + ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, recv_context); + if (ret == 0) { + ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data); + if (ret != 0) { + ctdb_ltdb_unlock(ctdb_db, key); + } + } + return ret; +} diff --git a/source4/cluster/ctdb/common/ctdb_message.c b/source4/cluster/ctdb/common/ctdb_message.c index ad88ec22d2..70fcf00c4d 100644 --- a/source4/cluster/ctdb/common/ctdb_message.c +++ b/source4/cluster/ctdb/common/ctdb_message.c @@ -42,7 +42,8 @@ static int ctdb_dispatch_message(struct ctdb_context *ctdb, uint32_t srvid, TDB_ if (ml->srvid == srvid || ml->srvid == CTDB_SRVID_ALL) break; } if (ml == NULL) { - printf("daemon vnn:%d no msg handler for srvid=%u\n", ctdb_get_vnn(ctdb), srvid); + DEBUG(1,(__location__ " daemon vnn:%d no msg handler for srvid=%u\n", + ctdb_get_vnn(ctdb), srvid)); /* no registered message handler */ return -1; } @@ -86,7 +87,7 @@ static void ctdb_local_message_trigger(struct event_context *ev, struct timed_ev res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data); if (res != 0) { - printf("Failed to dispatch message for srvid=%u\n", m->srvid); + DEBUG(0, (__location__ " Failed to dispatch message for srvid=%u\n", m->srvid)); } talloc_free(m); } @@ -147,18 +148,6 @@ int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn, return 0; } -/* - send a ctdb message -*/ -int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn, - uint32_t srvid, TDB_DATA data) -{ - if (ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_send_message(ctdb, vnn, srvid, data); - } - return ctdb_daemon_send_message(ctdb, vnn, srvid, data); -} - /* when a client goes away, we need to remove its srvid handler from the list diff --git a/source4/cluster/ctdb/common/ctdb_util.c b/source4/cluster/ctdb/common/ctdb_util.c index cf0c72a58b..9a5e51bfa0 100644 --- a/source4/cluster/ctdb/common/ctdb_util.c +++ b/source4/cluster/ctdb/common/ctdb_util.c @@ -25,6 +25,8 @@ #include "system/filesys.h" #include "../include/ctdb_private.h" +int LogLevel; + /* return error string for last error */ @@ -100,3 +102,29 @@ uint32_t ctdb_hash(const TDB_DATA *key) return (1103515243 * value + 12345); } + +/* + a type checking varient of idr_find + */ +void *_idr_find_type(struct idr_context *idp, int id, const char *type, const char *location) +{ + void *p = idr_find(idp, id); + if (p && talloc_check_name(p, type) == NULL) { + DEBUG(0,("%s idr_find_type expected type %s but got %s\n", + location, type, talloc_get_name(p))); + return NULL; + } + return p; +} + + +/* + update a max latency number + */ +void ctdb_latency(double *latency, struct timeval t) +{ + double l = timeval_elapsed(&t); + if (l > *latency) { + *latency = l; + } +} -- cgit