From 650d81b252cc669ef848448afad7e9bb79c4f20e Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Sat, 21 Apr 2007 07:23:42 +0000 Subject: r22421: merged in latest ctdb changes from bzr (This used to be commit 3633f862b966866819c9a0a6ad0238a858e15e62) --- source4/cluster/ctdb/Makefile.in | 36 +- source4/cluster/ctdb/brlock_ctdb.c | 2 + source4/cluster/ctdb/common/cmdline.c | 142 +++++++ source4/cluster/ctdb/common/ctdb.c | 136 +++++-- source4/cluster/ctdb/common/ctdb_call.c | 265 ++++-------- source4/cluster/ctdb/common/ctdb_client.c | 612 ++++++++++++++-------------- source4/cluster/ctdb/common/ctdb_daemon.c | 500 +++++++++++++---------- source4/cluster/ctdb/common/ctdb_lockwait.c | 139 +++++++ source4/cluster/ctdb/common/ctdb_ltdb.c | 144 ++++++- source4/cluster/ctdb/common/ctdb_message.c | 17 +- source4/cluster/ctdb/common/ctdb_util.c | 28 ++ source4/cluster/ctdb/config.mk | 3 +- source4/cluster/ctdb/ctdb_cluster.c | 69 +++- source4/cluster/ctdb/direct/ctdbd.c | 51 +-- source4/cluster/ctdb/direct/ctdbd.sh | 4 +- source4/cluster/ctdb/direct/ctdbd_test.c | 92 +---- source4/cluster/ctdb/ib/ibw_ctdb_init.c | 4 +- source4/cluster/ctdb/include/cmdline.h | 7 + source4/cluster/ctdb/include/ctdb.h | 51 ++- source4/cluster/ctdb/include/ctdb_private.h | 147 +++++-- source4/cluster/ctdb/include/includes.h | 7 +- source4/cluster/ctdb/opendb_ctdb.c | 6 +- source4/cluster/ctdb/tcp/tcp_connect.c | 4 + source4/cluster/ctdb/tcp/tcp_init.c | 10 +- source4/cluster/ctdb/tests/bench.sh | 6 +- source4/cluster/ctdb/tests/bench1.sh | 3 +- source4/cluster/ctdb/tests/ctdb_bench.c | 51 +-- source4/cluster/ctdb/tests/ctdb_fetch.c | 26 +- source4/cluster/ctdb/tests/ctdb_fetch1.c | 97 ++++- source4/cluster/ctdb/tests/ctdb_messaging.c | 7 +- source4/cluster/ctdb/tests/ctdb_test.c | 13 +- source4/cluster/ctdb/tests/fetch.sh | 12 +- source4/cluster/ctdb/tests/fetch1.sh | 5 +- source4/cluster/ctdb/tests/lockwait.c | 245 +++++++++++ source4/cluster/ctdb/tests/messaging.sh | 3 +- source4/cluster/ctdb/tests/test.sh | 40 +- source4/cluster/ctdb/tests/test1.sh | 7 +- source4/cluster/ctdb/tools/ctdb_status.c | 133 ++++++ 38 files changed, 2020 insertions(+), 1104 deletions(-) create mode 100644 source4/cluster/ctdb/common/cmdline.c create mode 100644 source4/cluster/ctdb/common/ctdb_lockwait.c create mode 100644 source4/cluster/ctdb/include/cmdline.h create mode 100644 source4/cluster/ctdb/tests/lockwait.c create mode 100644 source4/cluster/ctdb/tools/ctdb_status.c (limited to 'source4/cluster') diff --git a/source4/cluster/ctdb/Makefile.in b/source4/cluster/ctdb/Makefile.in index 175418aa99..adc1f92d0a 100644 --- a/source4/cluster/ctdb/Makefile.in +++ b/source4/cluster/ctdb/Makefile.in @@ -21,8 +21,8 @@ LIB_FLAGS=@LDFLAGS@ -Llib @LIBS@ -lpopt @INFINIBAND_LIBS@ EVENTS_OBJ = lib/events/events.o lib/events/events_standard.o CTDB_COMMON_OBJ = common/ctdb.o common/ctdb_daemon.o common/ctdb_client.o common/ctdb_io.o common/util.o common/ctdb_util.o \ - common/ctdb_call.o common/ctdb_ltdb.o common/ctdb_message.o \ - lib/util/idtree.o lib/util/db_wrap.o + common/ctdb_call.o common/ctdb_ltdb.o common/ctdb_lockwait.o common/ctdb_message.o \ + common/cmdline.o lib/util/idtree.o lib/util/db_wrap.o lib/util/debug.o CTDB_TCP_OBJ = tcp/tcp_connect.o tcp/tcp_io.o tcp/tcp_init.o @@ -30,7 +30,7 @@ CTDB_OBJ = $(CTDB_COMMON_OBJ) $(CTDB_TCP_OBJ) OBJS = @TDBOBJ@ @TALLOCOBJ@ @LIBREPLACEOBJ@ @INFINIBAND_WRAPPER_OBJ@ $(EXTRA_OBJ) $(EVENTS_OBJ) $(CTDB_OBJ) -BINS = bin/ctdbd bin/ctdbd_test bin/ctdb_test bin/ctdb_bench bin/ctdb_messaging bin/ctdb_fetch bin/ctdb_fetch1 @INFINIBAND_BINS@ +BINS = bin/ctdbd bin/ctdbd_test bin/ctdb_test bin/ctdb_bench bin/ctdb_messaging bin/ctdb_fetch bin/ctdb_fetch1 bin/lockwait bin/ctdb_status @INFINIBAND_BINS@ DIRS = lib bin @@ -49,38 +49,46 @@ showflags: dirs: @mkdir -p $(DIRS) -bin/ctdb_test: $(OBJS) tests/ctdb_test.o tests/cmdline.o +bin/ctdb_test: $(OBJS) tests/ctdb_test.o @echo Linking $@ - @$(CC) $(CFLAGS) -o $@ tests/ctdb_test.o tests/cmdline.o $(OBJS) $(LIB_FLAGS) + @$(CC) $(CFLAGS) -o $@ tests/ctdb_test.o $(OBJS) $(LIB_FLAGS) -bin/ctdbd: $(OBJS) direct/ctdbd.o +bin/ctdbd: $(OBJS) direct/ctdbd.o @echo Linking $@ @$(CC) $(CFLAGS) -o $@ direct/ctdbd.o $(OBJS) $(LIB_FLAGS) +bin/ctdb_status: $(OBJS) tools/ctdb_status.o + @echo Linking $@ + @$(CC) $(CFLAGS) -o $@ tools/ctdb_status.o $(OBJS) $(LIB_FLAGS) + bin/ctdbd_test: $(OBJS) direct/ctdbd_test.o @echo Linking $@ @$(CC) $(CFLAGS) -o $@ direct/ctdbd_test.o -bin/ctdb_bench: $(OBJS) tests/ctdb_bench.o tests/cmdline.o +bin/ctdb_bench: $(OBJS) tests/ctdb_bench.o @echo Linking $@ - @$(CC) $(CFLAGS) -o $@ tests/ctdb_bench.o tests/cmdline.o $(OBJS) $(LIB_FLAGS) + @$(CC) $(CFLAGS) -o $@ tests/ctdb_bench.o $(OBJS) $(LIB_FLAGS) -bin/ctdb_fetch: $(OBJS) tests/ctdb_fetch.o tests/cmdline.o +bin/ctdb_fetch: $(OBJS) tests/ctdb_fetch.o @echo Linking $@ - @$(CC) $(CFLAGS) -o $@ tests/ctdb_fetch.o tests/cmdline.o $(OBJS) $(LIB_FLAGS) + @$(CC) $(CFLAGS) -o $@ tests/ctdb_fetch.o $(OBJS) $(LIB_FLAGS) -bin/ctdb_fetch1: $(OBJS) tests/ctdb_fetch1.o tests/cmdline.o +bin/ctdb_fetch1: $(OBJS) tests/ctdb_fetch1.o @echo Linking $@ - @$(CC) $(CFLAGS) -o $@ tests/ctdb_fetch1.o tests/cmdline.o $(OBJS) $(LIB_FLAGS) + @$(CC) $(CFLAGS) -o $@ tests/ctdb_fetch1.o $(OBJS) $(LIB_FLAGS) -bin/ctdb_messaging: $(OBJS) tests/ctdb_messaging.o tests/cmdline.o +bin/ctdb_messaging: $(OBJS) tests/ctdb_messaging.o @echo Linking $@ - @$(CC) $(CFLAGS) -o $@ tests/ctdb_messaging.o tests/cmdline.o $(OBJS) $(LIB_FLAGS) + @$(CC) $(CFLAGS) -o $@ tests/ctdb_messaging.o $(OBJS) $(LIB_FLAGS) bin/ibwrapper_test: $(OBJS) ib/ibwrapper_test.o @echo Linking $@ @$(CC) $(CFLAGS) -o $@ ib/ibwrapper_test.o $(OBJS) $(LIB_FLAGS) +bin/lockwait: $(OBJS) tests/lockwait.o + @echo Linking $@ + @$(CC) $(CFLAGS) -o $@ tests/lockwait.o $(OBJS) $(LIB_FLAGS) + clean: rm -f *.o */*.o */*/*.o rm -f $(BINS) diff --git a/source4/cluster/ctdb/brlock_ctdb.c b/source4/cluster/ctdb/brlock_ctdb.c index b1e8e64d40..2773827b59 100644 --- a/source4/cluster/ctdb/brlock_ctdb.c +++ b/source4/cluster/ctdb/brlock_ctdb.c @@ -389,6 +389,8 @@ static NTSTATUS brl_ctdb_lock(struct brl_context *brl, call.key.dsize = brlh->key.length; call.call_data.dptr = (uint8_t *)&req; call.call_data.dsize = sizeof(req); + call.flags = 0; + call.status = 0; ZERO_STRUCT(req); req.smbpid = smbpid; diff --git a/source4/cluster/ctdb/common/cmdline.c b/source4/cluster/ctdb/common/cmdline.c new file mode 100644 index 0000000000..699cb8fb22 --- /dev/null +++ b/source4/cluster/ctdb/common/cmdline.c @@ -0,0 +1,142 @@ +/* + common commandline code to ctdb test tools + + Copyright (C) Andrew Tridgell 2007 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "system/filesys.h" +#include "popt.h" +#include "../include/ctdb.h" +#include "../include/ctdb_private.h" + +/* Handle common command line options for ctdb test progs + */ + +static struct { + const char *nlist; + const char *transport; + const char *myaddress; + int self_connect; + const char *db_dir; + int torture; +} ctdb_cmdline = { + .nlist = NULL, + .transport = "tcp", + .myaddress = NULL, + .self_connect = 0, + .db_dir = NULL, + .torture = 0 +}; + + +struct poptOption popt_ctdb_cmdline[] = { + { "nlist", 0, POPT_ARG_STRING, &ctdb_cmdline.nlist, 0, "node list file", "filename" }, + { "listen", 0, POPT_ARG_STRING, &ctdb_cmdline.myaddress, 0, "address to listen on", "address" }, + { "transport", 0, POPT_ARG_STRING, &ctdb_cmdline.transport, 0, "protocol transport", NULL }, + { "self-connect", 0, POPT_ARG_NONE, &ctdb_cmdline.self_connect, 0, "enable self connect", "boolean" }, + { "debug", 'd', POPT_ARG_INT, &LogLevel, 0, "debug level"}, + { "dbdir", 0, POPT_ARG_STRING, &ctdb_cmdline.db_dir, 0, "directory for the tdb files", NULL }, + { "torture", 0, POPT_ARG_NONE, &ctdb_cmdline.torture, 0, "enable nastiness in library", NULL }, + { NULL } +}; + + +/* + startup daemon side of ctdb according to command line options + */ +struct ctdb_context *ctdb_cmdline_init(struct event_context *ev) +{ + struct ctdb_context *ctdb; + int ret; + + if (ctdb_cmdline.nlist == NULL || ctdb_cmdline.myaddress == NULL) { + printf("You must provide a node list with --nlist and an address with --listen\n"); + exit(1); + } + + /* initialise ctdb */ + ctdb = ctdb_init(ev); + if (ctdb == NULL) { + printf("Failed to init ctdb\n"); + exit(1); + } + + if (ctdb_cmdline.self_connect) { + ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT); + } + if (ctdb_cmdline.torture) { + ctdb_set_flags(ctdb, CTDB_FLAG_TORTURE); + } + + ret = ctdb_set_transport(ctdb, ctdb_cmdline.transport); + if (ret == -1) { + printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* tell ctdb what address to listen on */ + ret = ctdb_set_address(ctdb, ctdb_cmdline.myaddress); + if (ret == -1) { + printf("ctdb_set_address failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* tell ctdb what nodes are available */ + ret = ctdb_set_nlist(ctdb, ctdb_cmdline.nlist); + if (ret == -1) { + printf("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + ret = ctdb_set_tdb_dir(ctdb, ctdb_cmdline.db_dir); + if (ret == -1) { + printf("ctdb_set_tdb_dir failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + return ctdb; +} + + +/* + startup a client only ctdb context + */ +struct ctdb_context *ctdb_cmdline_client(struct event_context *ev, const char *ctdb_socket) +{ + struct ctdb_context *ctdb; + int ret; + + /* initialise ctdb */ + ctdb = ctdb_init(ev); + if (ctdb == NULL) { + printf("Failed to init ctdb\n"); + exit(1); + } + + ctdb->daemon.name = talloc_strdup(ctdb, ctdb_socket); + + ret = ctdb_socket_connect(ctdb); + if (ret != 0) { + DEBUG(0,(__location__ " Failed to connect to daemon\n")); + talloc_free(ctdb); + return NULL; + } + + return ctdb; +} diff --git a/source4/cluster/ctdb/common/ctdb.c b/source4/cluster/ctdb/common/ctdb.c index 8a8d52f3f1..6bd2fda529 100644 --- a/source4/cluster/ctdb/common/ctdb.c +++ b/source4/cluster/ctdb/common/ctdb.c @@ -73,6 +73,22 @@ void ctdb_set_max_lacount(struct ctdb_context *ctdb, unsigned count) ctdb->max_lacount = count; } +/* + set the directory for the local databases +*/ +int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir) +{ + if (dir == NULL) { + ctdb->db_directory = talloc_asprintf(ctdb, "ctdb-%u", ctdb_get_vnn(ctdb)); + } else { + ctdb->db_directory = talloc_strdup(ctdb, dir); + } + if (ctdb->db_directory == NULL) { + return -1; + } + return 0; +} + /* add a node to the list of active nodes */ @@ -190,65 +206,102 @@ uint32_t ctdb_get_num_nodes(struct ctdb_context *ctdb) /* called by the transport layer when a packet comes in */ -static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length) +void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length) { - struct ctdb_req_header *hdr; + struct ctdb_req_header *hdr = (struct ctdb_req_header *)data; + TALLOC_CTX *tmp_ctx; + + ctdb->status.node_packets_recv++; + + /* place the packet as a child of the tmp_ctx. We then use + talloc_free() below to free it. If any of the calls want + to keep it, then they will steal it somewhere else, and the + talloc_free() will only free the tmp_ctx */ + tmp_ctx = talloc_new(ctdb); + talloc_steal(tmp_ctx, hdr); if (length < sizeof(*hdr)) { ctdb_set_error(ctdb, "Bad packet length %d\n", length); - return; + goto done; } - hdr = (struct ctdb_req_header *)data; if (length != hdr->length) { ctdb_set_error(ctdb, "Bad header length %d expected %d\n", hdr->length, length); - return; + goto done; } if (hdr->ctdb_magic != CTDB_MAGIC) { ctdb_set_error(ctdb, "Non CTDB packet rejected\n"); - return; + goto done; } if (hdr->ctdb_version != CTDB_VERSION) { ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); - return; + goto done; } + DEBUG(3,(__location__ " ctdb request %d of type %d length %d from " + "node %d to %d\n", hdr->reqid, hdr->operation, hdr->length, + hdr->srcnode, hdr->destnode)); + switch (hdr->operation) { case CTDB_REQ_CALL: + ctdb->status.count.req_call++; ctdb_request_call(ctdb, hdr); break; case CTDB_REPLY_CALL: + ctdb->status.count.reply_call++; ctdb_reply_call(ctdb, hdr); break; case CTDB_REPLY_ERROR: + ctdb->status.count.reply_error++; ctdb_reply_error(ctdb, hdr); break; case CTDB_REPLY_REDIRECT: + ctdb->status.count.reply_redirect++; ctdb_reply_redirect(ctdb, hdr); break; case CTDB_REQ_DMASTER: + ctdb->status.count.req_dmaster++; ctdb_request_dmaster(ctdb, hdr); break; case CTDB_REPLY_DMASTER: + ctdb->status.count.reply_dmaster++; ctdb_reply_dmaster(ctdb, hdr); break; case CTDB_REQ_MESSAGE: + ctdb->status.count.req_message++; ctdb_request_message(ctdb, hdr); break; + case CTDB_REQ_FINISHED: + ctdb->status.count.req_finished++; + ctdb_request_finished(ctdb, hdr); + break; + default: - printf("Packet with unknown operation %d\n", hdr->operation); + DEBUG(0,("%s: Packet with unknown operation %d\n", + __location__, hdr->operation)); break; } - talloc_free(hdr); + +done: + talloc_free(tmp_ctx); +} + +/* + called by the transport layer when a packet comes in +*/ +void ctdb_recv_raw_pkt(void *p, uint8_t *data, uint32_t length) +{ + struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context); + ctdb_recv_pkt(ctdb, data, length); } /* @@ -257,8 +310,8 @@ static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t len static void ctdb_node_dead(struct ctdb_node *node) { node->ctdb->num_connected--; - printf("%s: node %s is dead: %d connected\n", - node->ctdb->name, node->name, node->ctdb->num_connected); + DEBUG(1,("%s: node %s is dead: %d connected\n", + node->ctdb->name, node->name, node->ctdb->num_connected)); } /* @@ -267,8 +320,8 @@ static void ctdb_node_dead(struct ctdb_node *node) static void ctdb_node_connected(struct ctdb_node *node) { node->ctdb->num_connected++; - printf("%s: connected to %s - %d connected\n", - node->ctdb->name, node->name, node->ctdb->num_connected); + DEBUG(1,("%s: connected to %s - %d connected\n", + node->ctdb->name, node->name, node->ctdb->num_connected)); } /* @@ -281,33 +334,62 @@ void ctdb_daemon_connect_wait(struct ctdb_context *ctdb) expected++; } while (ctdb->num_connected != expected) { + DEBUG(3,("ctdb_connect_wait: waiting for %d nodes (have %d)\n", + expected, ctdb->num_connected)); event_loop_once(ctdb->ev); } + DEBUG(3,("ctdb_connect_wait: got all %d nodes\n", expected)); } +struct queue_next { + struct ctdb_context *ctdb; + struct ctdb_req_header *hdr; +}; + + /* - wait until we're the only node left -*/ -void ctdb_wait_loop(struct ctdb_context *ctdb) + trigered when a deferred packet is due + */ +static void queue_next_trigger(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) { - int expected = 0; - if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) { - expected++; + struct queue_next *q = talloc_get_type(private_data, struct queue_next); + ctdb_recv_pkt(q->ctdb, (uint8_t *)q->hdr, q->hdr->length); + talloc_free(q); +} + +/* + defer a packet, so it is processed on the next event loop + this is used for sending packets to ourselves + */ +static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct queue_next *q; + q = talloc(ctdb, struct queue_next); + if (q == NULL) { + DEBUG(0,(__location__ " Failed to allocate deferred packet\n")); + return; } - while (ctdb->num_connected > expected) { - event_loop_once(ctdb->ev); + q->ctdb = ctdb; + q->hdr = talloc_memdup(ctdb, hdr, hdr->length); + if (q->hdr == NULL) { + DEBUG(0,("Error copying deferred packet to self\n")); + return; } + event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q); } - /* queue a packet or die */ void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { struct ctdb_node *node; + ctdb->status.node_packets_sent++; node = ctdb->nodes[hdr->destnode]; - if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) { + if (hdr->destnode == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { + ctdb_defer_packet(ctdb, hdr); + } else if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) { ctdb_fatal(ctdb, "Unable to queue packet\n"); } } @@ -338,11 +420,3 @@ struct ctdb_context *ctdb_init(struct event_context *ev) return ctdb; } -int ctdb_start(struct ctdb_context *ctdb) -{ - if (ctdb->flags&CTDB_FLAG_DAEMON_MODE) { - return ctdbd_start(ctdb); - } - - return ctdb->methods->start(ctdb); -} diff --git a/source4/cluster/ctdb/common/ctdb_call.c b/source4/cluster/ctdb/common/ctdb_call.c index ab5c2cce3b..76a7e97a87 100644 --- a/source4/cluster/ctdb/common/ctdb_call.c +++ b/source4/cluster/ctdb/common/ctdb_call.c @@ -47,9 +47,9 @@ /* local version of ctdb_call */ -static int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call, - struct ctdb_ltdb_header *header, TDB_DATA *data, - uint32_t caller) +int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call, + struct ctdb_ltdb_header *header, TDB_DATA *data, + uint32_t caller) { struct ctdb_call_info *c; struct ctdb_registered_call *fn; @@ -105,6 +105,7 @@ static int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *ca if (c->reply_data) { call->reply_data = *c->reply_data; talloc_steal(ctdb, call->reply_data.dptr); + talloc_set_name_const(call->reply_data.dptr, __location__); } else { call->reply_data.dptr = NULL; call->reply_data.dsize = 0; @@ -140,7 +141,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb, msglen = strlen(msg)+1; len = offsetof(struct ctdb_reply_error, msg); - r = ctdb->methods->allocate_pkt(ctdb, len + msglen); + r = ctdb->methods->allocate_pkt(msg, len + msglen); CTDB_NO_MEMORY_FATAL(ctdb, r); talloc_set_name_const(r, "send_error packet"); @@ -155,11 +156,9 @@ static void ctdb_send_error(struct ctdb_context *ctdb, r->msglen = msglen; memcpy(&r->msg[0], msg, msglen); - talloc_free(msg); - ctdb_queue_packet(ctdb, &r->hdr); - talloc_free(r); + talloc_free(msg); } @@ -223,16 +222,12 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, memcpy(&r->data[0], key->dptr, key->dsize); memcpy(&r->data[key->dsize], data->dptr, data->dsize); - if (r->hdr.destnode == ctdb->vnn) { - /* we are the lmaster - don't send to ourselves */ - ctdb_request_dmaster(ctdb, &r->hdr); - } else { - ctdb_queue_packet(ctdb, &r->hdr); - - /* update the ltdb to record the new dmaster */ - header->dmaster = r->hdr.destnode; - ctdb_ltdb_store(ctdb_db, *key, header, *data); - } + /* XXX - probably not necessary when lmaster==dmaster + update the ltdb to record the new dmaster */ + header->dmaster = r->hdr.destnode; + ctdb_ltdb_store(ctdb_db, *key, header, *data); + + ctdb_queue_packet(ctdb, &r->hdr); talloc_free(r); } @@ -252,6 +247,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr struct ctdb_ltdb_header header; struct ctdb_db_context *ctdb_db; int ret, len; + TALLOC_CTX *tmp_ctx; key.dptr = c->data; key.dsize = c->keylen; @@ -267,28 +263,41 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr } /* fetch the current record */ - ret = ctdb_ltdb_fetch(ctdb_db, key, &header, hdr, &data2); - if (ret != 0) { + ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2, + ctdb_recv_raw_pkt, ctdb); + if (ret == -1) { ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record"); return; } - + if (ret == -2) { + DEBUG(2,(__location__ " deferring ctdb_request_dmaster\n")); + return; + } + /* its a protocol error if the sending node is not the current dmaster */ - if (header.dmaster != hdr->srcnode) { + if (header.dmaster != hdr->srcnode && + hdr->srcnode != ctdb_lmaster(ctdb_db->ctdb, &key)) { ctdb_fatal(ctdb, "dmaster request from non-master"); return; } - + header.dmaster = c->dmaster; - if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) { + ret = ctdb_ltdb_store(ctdb_db, key, &header, data); + ctdb_ltdb_unlock(ctdb_db, key); + if (ret != 0) { ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster"); return; } + /* put the packet on a temporary context, allowing us to safely free + it below even if ctdb_reply_dmaster() has freed it already */ + tmp_ctx = talloc_new(ctdb); + /* send the CTDB_REPLY_DMASTER */ len = offsetof(struct ctdb_reply_dmaster, data) + data.dsize; - r = ctdb->methods->allocate_pkt(ctdb, len); + r = ctdb->methods->allocate_pkt(tmp_ctx, len); CTDB_NO_MEMORY_FATAL(ctdb, r); + talloc_set_name_const(r, "reply_dmaster packet"); r->hdr.length = len; r->hdr.ctdb_magic = CTDB_MAGIC; @@ -300,13 +309,9 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr r->datalen = data.dsize; memcpy(&r->data[0], data.dptr, data.dsize); - if (r->hdr.destnode == r->hdr.srcnode) { - ctdb_reply_dmaster(ctdb, &r->hdr); - } else { - ctdb_queue_packet(ctdb, &r->hdr); - } + ctdb_queue_packet(ctdb, &r->hdr); - talloc_free(r); + talloc_free(tmp_ctx); } @@ -341,17 +346,23 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) fetches the record data (if any), thus avoiding a 2nd fetch of the data if the call will be answered locally */ - ret = ctdb_ltdb_fetch(ctdb_db, call.key, &header, hdr, &data); - if (ret != 0) { + ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call.key, &header, hdr, &data, + ctdb_recv_raw_pkt, ctdb); + if (ret == -1) { ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call"); return; } + if (ret == -2) { + DEBUG(2,(__location__ " deferred ctdb_request_call\n")); + return; + } /* if we are not the dmaster, then send a redirect to the requesting node */ if (header.dmaster != ctdb->vnn) { ctdb_call_send_redirect(ctdb, c, &header); talloc_free(data.dptr); + ctdb_ltdb_unlock(ctdb_db, call.key); return; } @@ -364,11 +375,14 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) || c->flags&CTDB_IMMEDIATE_MIGRATION ) { ctdb_call_send_dmaster(ctdb_db, c, &header, &call.key, &data); talloc_free(data.dptr); + ctdb_ltdb_unlock(ctdb_db, call.key); return; } ctdb_call_local(ctdb_db, &call, &header, &data, c->hdr.srcnode); + ctdb_ltdb_unlock(ctdb_db, call.key); + len = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize; r = ctdb->methods->allocate_pkt(ctdb, len); CTDB_NO_MEMORY_FATAL(ctdb, r); @@ -396,15 +410,18 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) called when a CTDB_REPLY_CALL packet comes in This packet comes in response to a CTDB_REQ_CALL request packet. It - contains any reply data freom the call + contains any reply data from the call */ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr; struct ctdb_call_state *state; - state = idr_find(ctdb->idr, hdr->reqid); - if (state == NULL) return; + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state); + if (state == NULL) { + DEBUG(0, ("reqid %d not found\n", hdr->reqid)); + return; + } state->call.reply_data.dptr = c->data; state->call.reply_data.dsize = c->datalen; @@ -412,10 +429,6 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) talloc_steal(state, c); - /* get an extra reference here - this prevents the free in ctdb_recv_pkt() - from freeing the data */ - (void)talloc_reference(state, c); - state->state = CTDB_CALL_DONE; if (state->async.fn) { state->async.fn(state); @@ -435,13 +448,25 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) struct ctdb_call_state *state; struct ctdb_db_context *ctdb_db; TDB_DATA data; + int ret; - state = idr_find(ctdb->idr, hdr->reqid); + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state); if (state == NULL) { return; } + ctdb_db = state->ctdb_db; + ret = ctdb_ltdb_lock_requeue(ctdb_db, state->call.key, hdr, + ctdb_recv_raw_pkt, ctdb); + if (ret == -2) { + return; + } + if (ret != 0) { + DEBUG(0,(__location__ " Failed to get lock in ctdb_reply_dmaster\n")); + return; + } + data.dptr = c->data; data.dsize = c->datalen; @@ -452,12 +477,17 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) state->header.dmaster = ctdb->vnn; if (ctdb_ltdb_store(ctdb_db, state->call.key, &state->header, data) != 0) { + ctdb_ltdb_unlock(ctdb_db, state->call.key); ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n"); return; } ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn); + ctdb_ltdb_unlock(ctdb_db, state->call.key); + + talloc_steal(state, state->call.reply_data.dptr); + state->state = CTDB_CALL_DONE; if (state->async.fn) { state->async.fn(state); @@ -473,7 +503,7 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr; struct ctdb_call_state *state; - state = idr_find(ctdb->idr, hdr->reqid); + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state); if (state == NULL) return; talloc_steal(state, c); @@ -498,7 +528,7 @@ void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr; struct ctdb_call_state *state; - state = idr_find(ctdb->idr, hdr->reqid); + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state); if (state == NULL) return; talloc_steal(state, c); @@ -510,6 +540,7 @@ void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) /* send it off again */ state->node = ctdb->nodes[c->dmaster]; + state->c->hdr.destnode = c->dmaster; ctdb_queue_packet(ctdb, &state->c->hdr); } @@ -578,6 +609,7 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, state->ctdb_db = ctdb_db; ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn); + talloc_steal(state, state->call.reply_data.dptr); event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state); @@ -591,45 +623,27 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, This constructs a ctdb_call request and queues it for processing. This call never blocks. */ -static struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctdb_db, - struct ctdb_call *call) +struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call, + struct ctdb_ltdb_header *header) { uint32_t len; struct ctdb_call_state *state; - int ret; - struct ctdb_ltdb_header header; - TDB_DATA data; struct ctdb_context *ctdb = ctdb_db->ctdb; - /* - if we are the dmaster for this key then we don't need to - send it off at all, we can bypass the network and handle it - locally. To find out if we are the dmaster we need to look - in our ltdb - */ - ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data); - if (ret != 0) return NULL; - - if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { - return ctdb_call_local_send(ctdb_db, call, &header, &data); - } - state = talloc_zero(ctdb_db, struct ctdb_call_state); CTDB_NO_MEMORY_NULL(ctdb, state); - talloc_steal(state, data.dptr); - len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize; - state->c = ctdb->methods->allocate_pkt(ctdb, len); + state->c = ctdb->methods->allocate_pkt(state, len); CTDB_NO_MEMORY_NULL(ctdb, state->c); talloc_set_name_const(state->c, "req_call packet"); - talloc_steal(state, state->c); state->c->hdr.length = len; state->c->hdr.ctdb_magic = CTDB_MAGIC; state->c->hdr.ctdb_version = CTDB_VERSION; state->c->hdr.operation = CTDB_REQ_CALL; - state->c->hdr.destnode = header.dmaster; + state->c->hdr.destnode = header->dmaster; state->c->hdr.srcnode = ctdb->vnn; /* this limits us to 16k outstanding messages - not unreasonable */ state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); @@ -645,9 +659,9 @@ static struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctd state->call.call_data.dptr = &state->c->data[call->key.dsize]; state->call.key.dptr = &state->c->data[0]; - state->node = ctdb->nodes[header.dmaster]; + state->node = ctdb->nodes[header->dmaster]; state->state = CTDB_CALL_WAIT; - state->header = header; + state->header = *header; state->ctdb_db = ctdb_db; talloc_set_destructor(state, ctdb_call_destructor); @@ -659,30 +673,14 @@ static struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctd return state; } -/* - make a remote ctdb call - async send - - This constructs a ctdb_call request and queues it for processing. - This call never blocks. -*/ -struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) -{ - if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_call_send(ctdb_db, call); - } - return ctdb_daemon_call_send(ctdb_db, call); -} - /* make a remote ctdb call - async recv - called in daemon context This is called when the program wants to wait for a ctdb_call to complete and get the results. This call will block unless the call has already completed. */ -static int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) +int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) { - struct ctdb_record_handle *rec; - while (state->state < CTDB_CALL_DONE) { event_loop_once(state->node->ctdb->ev); } @@ -692,16 +690,6 @@ static int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call return -1; } - rec = state->fetch_private; - - /* ugly hack to manage forced migration */ - if (rec != NULL) { - rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr); - rec->data->dsize = state->call.reply_data.dsize; - talloc_free(state); - return 0; - } - if (state->call.reply_data.dsize) { call->reply_data.dptr = talloc_memdup(state->node->ctdb, state->call.reply_data.dptr, @@ -717,92 +705,3 @@ static int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call } -/* - make a remote ctdb call - async recv. - - This is called when the program wants to wait for a ctdb_call to complete and get the - results. This call will block unless the call has already completed. -*/ -int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) -{ - if (state->ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_call_recv(state, call); - } - return ctdb_daemon_call_recv(state, call); -} - -/* - full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv() -*/ -int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) -{ - struct ctdb_call_state *state; - - state = ctdb_call_send(ctdb_db, call); - return ctdb_call_recv(state, call); -} - - - -struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, - TDB_DATA key, TDB_DATA *data) -{ - struct ctdb_call call; - struct ctdb_record_handle *rec; - struct ctdb_call_state *state; - int ret; - - if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_fetch_lock(ctdb_db, mem_ctx, key, data); - } - - ZERO_STRUCT(call); - call.call_id = CTDB_FETCH_FUNC; - call.key = key; - call.flags = CTDB_IMMEDIATE_MIGRATION; - - rec = talloc(mem_ctx, struct ctdb_record_handle); - CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec); - - rec->ctdb_db = ctdb_db; - rec->key = key; - rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize); - rec->data = data; - - state = ctdb_call_send(ctdb_db, &call); - state->fetch_private = rec; - - ret = ctdb_call_recv(state, &call); - if (ret != 0) { - talloc_free(rec); - return NULL; - } - - return rec; -} - - -int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data) -{ - int ret; - struct ctdb_ltdb_header header; - struct ctdb_db_context *ctdb_db = talloc_get_type(rec->ctdb_db, struct ctdb_db_context); - - if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_store_unlock(rec, data); - } - - /* should be avoided if possible hang header off rec ? */ - ret = ctdb_ltdb_fetch(rec->ctdb_db, rec->key, &header, NULL, NULL); - if (ret) { - ctdb_set_error(rec->ctdb_db->ctdb, "Fetch of locally held record failed"); - talloc_free(rec); - return ret; - } - - ret = ctdb_ltdb_store(rec->ctdb_db, rec->key, &header, data); - - talloc_free(rec); - - return ret; -} diff --git a/source4/cluster/ctdb/common/ctdb_client.c b/source4/cluster/ctdb/common/ctdb_client.c index 3cb27a1165..dbed8d3585 100644 --- a/source4/cluster/ctdb/common/ctdb_client.c +++ b/source4/cluster/ctdb/common/ctdb_client.c @@ -49,94 +49,87 @@ static void ctdb_reply_connect_wait(struct ctdb_context *ctdb, } /* - called in the client when we receive a CTDB_REPLY_FETCH_LOCK from the daemon + state of a in-progress ctdb call in client +*/ +struct ctdb_client_call_state { + enum call_state state; + uint32_t reqid; + struct ctdb_db_context *ctdb_db; + struct ctdb_call call; +}; + +/* + called when a CTDB_REPLY_CALL packet comes in in the client - This packet comes in response to a CTDB_REQ_FETCH_LOCK request packet. It + This packet comes in response to a CTDB_REQ_CALL request packet. It contains any reply data from the call */ -void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { - struct ctdb_reply_fetch_lock *c = (struct ctdb_reply_fetch_lock *)hdr; - struct ctdb_call_state *state; + struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr; + struct ctdb_client_call_state *state; - state = idr_find(ctdb->idr, hdr->reqid); - if (state == NULL) return; + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_client_call_state); + if (state == NULL) { + DEBUG(0, ("reqid %d not found\n", hdr->reqid)); + return; + } state->call.reply_data.dptr = c->data; state->call.reply_data.dsize = c->datalen; - state->call.status = c->state; + state->call.status = c->status; talloc_steal(state, c); - /* get an extra reference here - this prevents the free in ctdb_recv_pkt() - from freeing the data */ - (void)talloc_reference(state, c); - state->state = CTDB_CALL_DONE; - if (state->async.fn) { - state->async.fn(state); - } } -/* - called in the client when we receive a CTDB_REPLY_STORE_UNLOCK from the daemon - - This packet comes in response to a CTDB_REQ_STORE_UNLOCK request packet. It - contains any reply data from the call -*/ -void ctdb_reply_store_unlock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) -{ - struct ctdb_reply_store_unlock *c = (struct ctdb_reply_store_unlock *)hdr; - struct ctdb_call_state *state; - - state = idr_find(ctdb->idr, hdr->reqid); - if (state == NULL) return; - - state->call.status = c->state; - - talloc_steal(state, c); - - /* get an extra reference here - this prevents the free in ctdb_recv_pkt() - from freeing the data */ - (void)talloc_reference(state, c); +static void ctdb_reply_status(struct ctdb_context *ctdb, struct ctdb_req_header *hdr); - state->state = CTDB_CALL_DONE; - if (state->async.fn) { - state->async.fn(state); - } -} /* this is called in the client, when data comes in from the daemon */ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) { struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context); - struct ctdb_req_header *hdr; + struct ctdb_req_header *hdr = (struct ctdb_req_header *)data; + TALLOC_CTX *tmp_ctx; + + /* place the packet as a child of a tmp_ctx. We then use + talloc_free() below to free it. If any of the calls want + to keep it, then they will steal it somewhere else, and the + talloc_free() will be a no-op */ + tmp_ctx = talloc_new(ctdb); + talloc_steal(tmp_ctx, hdr); + + if (cnt == 0) { + DEBUG(2,("Daemon has exited - shutting down client\n")); + exit(0); + } if (cnt < sizeof(*hdr)) { - ctdb_set_error(ctdb, "Bad packet length %d\n", cnt); - return; + DEBUG(0,("Bad packet length %d in client\n", cnt)); + goto done; } - hdr = (struct ctdb_req_header *)data; if (cnt != hdr->length) { - ctdb_set_error(ctdb, "Bad header length %d expected %d\n", + ctdb_set_error(ctdb, "Bad header length %d expected %d in client\n", hdr->length, cnt); - return; + goto done; } if (hdr->ctdb_magic != CTDB_MAGIC) { - ctdb_set_error(ctdb, "Non CTDB packet rejected\n"); - return; + ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n"); + goto done; } if (hdr->ctdb_version != CTDB_VERSION) { - ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); - return; + ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version); + goto done; } switch (hdr->operation) { case CTDB_REPLY_CALL: - ctdb_reply_call(ctdb, hdr); + ctdb_client_reply_call(ctdb, hdr); break; case CTDB_REQ_MESSAGE: @@ -147,23 +140,22 @@ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) ctdb_reply_connect_wait(ctdb, hdr); break; - case CTDB_REPLY_FETCH_LOCK: - ctdb_reply_fetch_lock(ctdb, hdr); - break; - - case CTDB_REPLY_STORE_UNLOCK: - ctdb_reply_store_unlock(ctdb, hdr); + case CTDB_REPLY_STATUS: + ctdb_reply_status(ctdb, hdr); break; default: - printf("bogus operation code:%d\n",hdr->operation); + DEBUG(0,("bogus operation code:%d\n",hdr->operation)); } + +done: + talloc_free(tmp_ctx); } /* connect to a unix domain socket */ -static int ux_socket_connect(struct ctdb_context *ctdb) +int ctdb_socket_connect(struct ctdb_context *ctdb) { struct sockaddr_un addr; @@ -189,6 +181,13 @@ static int ux_socket_connect(struct ctdb_context *ctdb) } +struct ctdb_record_handle { + struct ctdb_db_context *ctdb_db; + TDB_DATA key; + TDB_DATA *data; + struct ctdb_ltdb_header header; +}; + /* make a recv call to the local ctdb daemon - called from client context @@ -196,31 +195,19 @@ static int ux_socket_connect(struct ctdb_context *ctdb) This is called when the program wants to wait for a ctdb_call to complete and get the results. This call will block unless the call has already completed. */ -int ctdb_client_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) +int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call) { - struct ctdb_record_handle *rec; - while (state->state < CTDB_CALL_DONE) { - event_loop_once(state->node->ctdb->ev); + event_loop_once(state->ctdb_db->ctdb->ev); } if (state->state != CTDB_CALL_DONE) { - ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + DEBUG(0,(__location__ " ctdb_call_recv failed\n")); talloc_free(state); return -1; } - rec = state->fetch_private; - - /* ugly hack to manage forced migration */ - if (rec != NULL) { - rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr); - rec->data->dsize = state->call.reply_data.dsize; - talloc_free(state); - return 0; - } - if (state->call.reply_data.dsize) { - call->reply_data.dptr = talloc_memdup(state->node->ctdb, + call->reply_data.dptr = talloc_memdup(state->ctdb_db, state->call.reply_data.dptr, state->call.reply_data.dsize); call->reply_data.dsize = state->call.reply_data.dsize; @@ -240,13 +227,41 @@ int ctdb_client_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) /* destroy a ctdb_call in client */ -static int ctdb_client_call_destructor(struct ctdb_call_state *state) +static int ctdb_client_call_destructor(struct ctdb_client_call_state *state) { - idr_remove(state->node->ctdb->idr, state->c->hdr.reqid); + idr_remove(state->ctdb_db->ctdb->idr, state->reqid); return 0; } +/* + construct an event driven local ctdb_call + + this is used so that locally processed ctdb_call requests are processed + in an event driven manner +*/ +static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call, + struct ctdb_ltdb_header *header, + TDB_DATA *data) +{ + struct ctdb_client_call_state *state; + struct ctdb_context *ctdb = ctdb_db->ctdb; + int ret; + + state = talloc_zero(ctdb_db, struct ctdb_client_call_state); + CTDB_NO_MEMORY_NULL(ctdb, state); + talloc_steal(state, data->dptr); + + state->state = CTDB_CALL_DONE; + state->call = *call; + state->ctdb_db = ctdb_db; + + ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn); + talloc_steal(state, state->call.reply_data.dptr); + + return state; +} /* make a ctdb call to the local daemon - async send. Called from client context. @@ -254,107 +269,109 @@ static int ctdb_client_call_destructor(struct ctdb_call_state *state) This constructs a ctdb_call request and queues it for processing. This call never blocks. */ -struct ctdb_call_state *ctdb_client_call_send(struct ctdb_db_context *ctdb_db, - struct ctdb_call *call) +struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call) { - struct ctdb_call_state *state; + struct ctdb_client_call_state *state; struct ctdb_context *ctdb = ctdb_db->ctdb; struct ctdb_ltdb_header header; TDB_DATA data; int ret; size_t len; + struct ctdb_req_call *c; /* if the domain socket is not yet open, open it */ if (ctdb->daemon.sd==-1) { - ux_socket_connect(ctdb); + ctdb_socket_connect(ctdb); } ret = ctdb_ltdb_lock(ctdb_db, call->key); if (ret != 0) { - printf("failed to lock ltdb record\n"); + DEBUG(0,(__location__ " Failed to get chainlock\n")); return NULL; } ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data); if (ret != 0) { ctdb_ltdb_unlock(ctdb_db, call->key); + DEBUG(0,(__location__ " Failed to fetch record\n")); return NULL; } -#if 0 if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { - state = ctdb_call_local_send(ctdb_db, call, &header, &data); + state = ctdb_client_call_local_send(ctdb_db, call, &header, &data); + talloc_free(data.dptr); ctdb_ltdb_unlock(ctdb_db, call->key); return state; } -#endif - state = talloc_zero(ctdb_db, struct ctdb_call_state); + ctdb_ltdb_unlock(ctdb_db, call->key); + talloc_free(data.dptr); + + state = talloc_zero(ctdb_db, struct ctdb_client_call_state); if (state == NULL) { - printf("failed to allocate state\n"); - ctdb_ltdb_unlock(ctdb_db, call->key); + DEBUG(0, (__location__ " failed to allocate state\n")); return NULL; } - talloc_steal(state, data.dptr); - len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize; - state->c = ctdbd_allocate_pkt(ctdb, len); - if (state->c == NULL) { - printf("failed to allocate packet\n"); - ctdb_ltdb_unlock(ctdb_db, call->key); + c = ctdbd_allocate_pkt(state, len); + if (c == NULL) { + DEBUG(0, (__location__ " failed to allocate packet\n")); return NULL; } - talloc_set_name_const(state->c, "ctdbd req_call packet"); - talloc_steal(state, state->c); + talloc_set_name_const(c, "ctdb client req_call packet"); + memset(c, 0, offsetof(struct ctdb_req_call, data)); - state->c->hdr.length = len; - state->c->hdr.ctdb_magic = CTDB_MAGIC; - state->c->hdr.ctdb_version = CTDB_VERSION; - state->c->hdr.operation = CTDB_REQ_CALL; - state->c->hdr.destnode = header.dmaster; - state->c->hdr.srcnode = ctdb->vnn; + c->hdr.length = len; + c->hdr.ctdb_magic = CTDB_MAGIC; + c->hdr.ctdb_version = CTDB_VERSION; + c->hdr.operation = CTDB_REQ_CALL; /* this limits us to 16k outstanding messages - not unreasonable */ - state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); - state->c->flags = call->flags; - state->c->db_id = ctdb_db->db_id; - state->c->callid = call->call_id; - state->c->keylen = call->key.dsize; - state->c->calldatalen = call->call_data.dsize; - memcpy(&state->c->data[0], call->key.dptr, call->key.dsize); - memcpy(&state->c->data[call->key.dsize], + c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + c->flags = call->flags; + c->db_id = ctdb_db->db_id; + c->callid = call->call_id; + c->keylen = call->key.dsize; + c->calldatalen = call->call_data.dsize; + memcpy(&c->data[0], call->key.dptr, call->key.dsize); + memcpy(&c->data[call->key.dsize], call->call_data.dptr, call->call_data.dsize); state->call = *call; - state->call.call_data.dptr = &state->c->data[call->key.dsize]; - state->call.key.dptr = &state->c->data[0]; + state->call.call_data.dptr = &c->data[call->key.dsize]; + state->call.key.dptr = &c->data[0]; - state->node = ctdb->nodes[header.dmaster]; state->state = CTDB_CALL_WAIT; - state->header = header; state->ctdb_db = ctdb_db; + state->reqid = c->hdr.reqid; talloc_set_destructor(state, ctdb_client_call_destructor); - ctdb_client_queue_pkt(ctdb, &state->c->hdr); - -/*XXX set up timeout to cleanup if server doesnt respond - event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), - ctdb_call_timeout, state); -*/ + ctdb_client_queue_pkt(ctdb, &c->hdr); - ctdb_ltdb_unlock(ctdb_db, call->key); return state; } +/* + full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv() +*/ +int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) +{ + struct ctdb_client_call_state *state; + + state = ctdb_call_send(ctdb_db, call); + return ctdb_call_recv(state, call); +} + /* tell the daemon what messaging srvid we will use, and register the message handler function in the client */ -int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, - ctdb_message_fn_t handler, - void *private_data) +int ctdb_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, + ctdb_message_fn_t handler, + void *private_data) { struct ctdb_req_register c; @@ -362,7 +379,7 @@ int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, /* if the domain socket is not yet open, open it */ if (ctdb->daemon.sd==-1) { - ux_socket_connect(ctdb); + ctdb_socket_connect(ctdb); } ZERO_STRUCT(c); @@ -383,26 +400,10 @@ int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, } - -/* - setup handler for receipt of ctdb messages from ctdb_send_message() -*/ -int ctdb_set_message_handler(struct ctdb_context *ctdb, - uint32_t srvid, - ctdb_message_fn_t handler, - void *private_data) -{ - if (ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_set_message_handler(ctdb, srvid, handler, private_data); - } - return ctdb_daemon_set_message_handler(ctdb, srvid, handler, private_data); -} - - /* send a message - from client context */ -int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn, +int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn, uint32_t srvid, TDB_DATA data) { struct ctdb_req_message *r; @@ -436,7 +437,7 @@ int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn, /* wait for all nodes to be connected - from client */ -static void ctdb_client_connect_wait(struct ctdb_context *ctdb) +void ctdb_connect_wait(struct ctdb_context *ctdb) { struct ctdb_req_connect_wait r; int res; @@ -447,216 +448,235 @@ static void ctdb_client_connect_wait(struct ctdb_context *ctdb) r.hdr.ctdb_magic = CTDB_MAGIC; r.hdr.ctdb_version = CTDB_VERSION; r.hdr.operation = CTDB_REQ_CONNECT_WAIT; + + DEBUG(3,("ctdb_connect_wait: sending to ctdbd\n")); + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ctdb_socket_connect(ctdb); + } res = ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)&r.hdr, r.hdr.length); if (res != 0) { - printf("Failed to queue a connect wait request\n"); + DEBUG(0,(__location__ " Failed to queue a connect wait request\n")); return; } + DEBUG(3,("ctdb_connect_wait: waiting\n")); + /* now we can go into the normal wait routine, as the reply packet will update the ctdb->num_connected variable */ ctdb_daemon_connect_wait(ctdb); } /* - wait for all nodes to be connected -*/ -void ctdb_connect_wait(struct ctdb_context *ctdb) + cancel a ctdb_fetch_lock operation, releasing the lock + */ +static int fetch_lock_destructor(struct ctdb_record_handle *h) { - if (!(ctdb->flags & CTDB_FLAG_DAEMON_MODE)) { - ctdb_daemon_connect_wait(ctdb); - return; - } - - ctdb_client_connect_wait(ctdb); + ctdb_ltdb_unlock(h->ctdb_db, h->key); + return 0; } +/* + force the migration of a record to this node + */ +static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key) +{ + struct ctdb_call call; + ZERO_STRUCT(call); + call.call_id = CTDB_NULL_FUNC; + call.key = key; + call.flags = CTDB_IMMEDIATE_MIGRATION; + return ctdb_call(ctdb_db, &call); +} -struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db, - TALLOC_CTX *mem_ctx, - TDB_DATA key) +/* + get a lock on a record, and return the records data. Blocks until it gets the lock + */ +struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data) { - struct ctdb_call_state *state; - struct ctdb_context *ctdb = ctdb_db->ctdb; - struct ctdb_req_fetch_lock *req; - int len, res; + int ret; + struct ctdb_record_handle *h; - /* if the domain socket is not yet open, open it */ - if (ctdb->daemon.sd==-1) { - ux_socket_connect(ctdb); - } + /* + procedure is as follows: - state = talloc_zero(ctdb_db, struct ctdb_call_state); - if (state == NULL) { - printf("failed to allocate state\n"); - return NULL; - } - state->state = CTDB_CALL_WAIT; - state->ctdb_db = ctdb_db; - len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize; - state->c = ctdbd_allocate_pkt(ctdb, len); - if (state->c == NULL) { - printf("failed to allocate packet\n"); + 1) get the chain lock. + 2) check if we are dmaster + 3) if we are the dmaster then return handle + 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for + reply from ctdbd + 5) when we get the reply, goto (1) + */ + + h = talloc_zero(mem_ctx, struct ctdb_record_handle); + if (h == NULL) { return NULL; } - ZERO_STRUCT(*state->c); - talloc_set_name_const(state->c, "ctdbd req_fetch_lock packet"); - talloc_steal(state, state->c); - - req = (struct ctdb_req_fetch_lock *)state->c; - req->hdr.length = len; - req->hdr.ctdb_magic = CTDB_MAGIC; - req->hdr.ctdb_version = CTDB_VERSION; - req->hdr.operation = CTDB_REQ_FETCH_LOCK; - req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); - req->db_id = ctdb_db->db_id; - req->keylen = key.dsize; - memcpy(&req->key[0], key.dptr, key.dsize); - - res = ctdb_client_queue_pkt(ctdb, &req->hdr); - if (res != 0) { + + h->ctdb_db = ctdb_db; + h->key = key; + h->key.dptr = talloc_memdup(h, key.dptr, key.dsize); + if (h->key.dptr == NULL) { + talloc_free(h); return NULL; } + h->data = data; - talloc_free(req); - - return state; -} + DEBUG(3,("ctdb_fetch_lock: key=%*.*s\n", key.dsize, key.dsize, + (const char *)key.dptr)); +again: + /* step 1 - get the chain lock */ + ret = ctdb_ltdb_lock(ctdb_db, key); + if (ret != 0) { + DEBUG(0, (__location__ " failed to lock ltdb record\n")); + talloc_free(h); + return NULL; + } -struct ctdb_call_state *ctdb_client_store_unlock_send( - struct ctdb_record_handle *rh, - TALLOC_CTX *mem_ctx, - TDB_DATA data) -{ - struct ctdb_call_state *state; - struct ctdb_db_context *ctdb_db = talloc_get_type(rh->ctdb_db, struct ctdb_db_context); - struct ctdb_context *ctdb = ctdb_db->ctdb; - struct ctdb_req_store_unlock *req; - int len, res; + DEBUG(4,("ctdb_fetch_lock: got chain lock\n")); - /* if the domain socket is not yet open, open it */ - if (ctdb->daemon.sd==-1) { - ux_socket_connect(ctdb); - } + talloc_set_destructor(h, fetch_lock_destructor); - state = talloc_zero(ctdb_db, struct ctdb_call_state); - if (state == NULL) { - printf("failed to allocate state\n"); - return NULL; - } - state->state = CTDB_CALL_WAIT; - state->ctdb_db = ctdb_db; - len = offsetof(struct ctdb_req_store_unlock, data) + rh->key.dsize + data.dsize; - state->c = ctdbd_allocate_pkt(ctdb, len); - if (state->c == NULL) { - printf("failed to allocate packet\n"); + ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data); + if (ret != 0) { + ctdb_ltdb_unlock(ctdb_db, key); + talloc_free(h); return NULL; } - ZERO_STRUCT(*state->c); - talloc_set_name_const(state->c, "ctdbd req_store_unlock packet"); - talloc_steal(state, state->c); - - req = (struct ctdb_req_store_unlock *)state->c; - req->hdr.length = len; - req->hdr.ctdb_magic = CTDB_MAGIC; - req->hdr.ctdb_version = CTDB_VERSION; - req->hdr.operation = CTDB_REQ_STORE_UNLOCK; - req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); - req->db_id = ctdb_db->db_id; - req->keylen = rh->key.dsize; - req->datalen = data.dsize; - memcpy(&req->data[0], rh->key.dptr, rh->key.dsize); - memcpy(&req->data[req->keylen], data.dptr, data.dsize); - - res = ctdb_client_queue_pkt(ctdb, &req->hdr); - if (res != 0) { - return NULL; + + /* when torturing, ensure we test the remote path */ + if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) && + random() % 5 == 0) { + h->header.dmaster = (uint32_t)-1; } - talloc_free(req); - return state; + DEBUG(4,("ctdb_fetch_lock: done local fetch\n")); + + if (h->header.dmaster != ctdb_db->ctdb->vnn) { + ctdb_ltdb_unlock(ctdb_db, key); + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(4,("ctdb_fetch_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + goto again; + } + + DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n")); + return h; } /* - make a recv call to the local ctdb daemon - called from client context + store some data to the record that was locked with ctdb_fetch_lock() +*/ +int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data) +{ + return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data); +} - This is called when the program wants to wait for a ctdb_fetch_lock to complete and get the - results. This call will block unless the call has already completed. +/* + wait until we're the only node left. + this function never returns */ -struct ctdb_record_handle *ctdb_client_fetch_lock_recv(struct ctdb_call_state *state, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data) +void ctdb_shutdown(struct ctdb_context *ctdb) { - struct ctdb_record_handle *rec; + struct ctdb_req_shutdown r; + int len; - while (state->state < CTDB_CALL_DONE) { - event_loop_once(state->ctdb_db->ctdb->ev); - } - if (state->state != CTDB_CALL_DONE) { - ctdb_set_error(state->node->ctdb, "%s", state->errmsg); - talloc_free(state); - return NULL; + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ctdb_socket_connect(ctdb); } - rec = talloc(mem_ctx, struct ctdb_record_handle); - CTDB_NO_MEMORY_NULL(state->ctdb_db->ctdb, rec); + len = sizeof(struct ctdb_req_shutdown); + ZERO_STRUCT(r); + r.hdr.length = len; + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.operation = CTDB_REQ_SHUTDOWN; + r.hdr.reqid = 0; - rec->ctdb_db = state->ctdb_db; - rec->key = key; - rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize); - rec->data = talloc(rec, TDB_DATA); - rec->data->dsize = state->call.reply_data.dsize; - rec->data->dptr = talloc_memdup(rec, state->call.reply_data.dptr, rec->data->dsize); + ctdb_client_queue_pkt(ctdb, &(r.hdr)); - if (data) { - *data = *rec->data; + /* this event loop will terminate once we receive the reply */ + while (1) { + event_loop_once(ctdb->ev); } - return rec; } -/* - make a recv call to the local ctdb daemon - called from client context +enum ctdb_status_states {CTDB_STATUS_WAIT, CTDB_STATUS_DONE}; - This is called when the program wants to wait for a ctdb_store_unlock to complete and get the - results. This call will block unless the call has already completed. -*/ -int ctdb_client_store_unlock_recv(struct ctdb_call_state *state, struct ctdb_record_handle *rec) +struct ctdb_status_state { + uint32_t reqid; + struct ctdb_status *status; + enum ctdb_status_states state; +}; + +/* + handle a ctdb_reply_status reply + */ +static void ctdb_reply_status(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { - while (state->state < CTDB_CALL_DONE) { - event_loop_once(state->ctdb_db->ctdb->ev); - } - if (state->state != CTDB_CALL_DONE) { - ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + struct ctdb_reply_status *r = (struct ctdb_reply_status *)hdr; + struct ctdb_status_state *state; + + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_status_state); + if (state == NULL) { + DEBUG(0, ("reqid %d not found\n", hdr->reqid)); + return; } - talloc_free(state); - return state->state; + *state->status = r->status; + state->state = CTDB_STATUS_DONE; } -struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db, - TALLOC_CTX *mem_ctx, - TDB_DATA key, - TDB_DATA *data) +/* + wait until we're the only node left. + this function never returns +*/ +int ctdb_status(struct ctdb_context *ctdb, struct ctdb_status *status) { - struct ctdb_call_state *state; - struct ctdb_record_handle *rec; + struct ctdb_req_status r; + int ret; + struct ctdb_status_state *state; - state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key); - rec = ctdb_client_fetch_lock_recv(state, mem_ctx, key, data); + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ctdb_socket_connect(ctdb); + } - return rec; -} + state = talloc(ctdb, struct ctdb_status_state); + CTDB_NO_MEMORY(ctdb, state); -int ctdb_client_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data) -{ - struct ctdb_call_state *state; - int res; + state->reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + state->status = status; + state->state = CTDB_STATUS_WAIT; + + ZERO_STRUCT(r); + r.hdr.length = sizeof(r); + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.operation = CTDB_REQ_STATUS; + r.hdr.reqid = state->reqid; - state = ctdb_client_store_unlock_send(rec, rec, data); - res = ctdb_client_store_unlock_recv(state, rec); + ret = ctdb_client_queue_pkt(ctdb, &(r.hdr)); + if (ret != 0) { + talloc_free(state); + return -1; + } + + while (state->state == CTDB_STATUS_WAIT) { + event_loop_once(ctdb->ev); + } - talloc_free(rec); + talloc_free(state); - return res; + return 0; } + diff --git a/source4/cluster/ctdb/common/ctdb_daemon.c b/source4/cluster/ctdb/common/ctdb_daemon.c index 945030d77e..ff3431a392 100644 --- a/source4/cluster/ctdb/common/ctdb_daemon.c +++ b/source4/cluster/ctdb/common/ctdb_daemon.c @@ -25,9 +25,23 @@ #include "lib/util/dlinklist.h" #include "system/network.h" #include "system/filesys.h" +#include "system/wait.h" #include "../include/ctdb.h" #include "../include/ctdb_private.h" +/* + structure describing a connected client in the daemon + */ +struct ctdb_client { + struct ctdb_context *ctdb; + int fd; + struct ctdb_queue *queue; +}; + + + +static void daemon_incoming_packet(void *, uint8_t *, uint32_t ); + static void ctdb_main_loop(struct ctdb_context *ctdb) { ctdb->methods->start(ctdb); @@ -35,7 +49,7 @@ static void ctdb_main_loop(struct ctdb_context *ctdb) /* go into a wait loop to allow other nodes to complete */ event_loop_wait(ctdb->ev); - printf("event_loop_wait() returned. this should not happen\n"); + DEBUG(0,("event_loop_wait() returned. this should not happen\n")); exit(1); } @@ -47,16 +61,27 @@ static void set_non_blocking(int fd) fcntl(fd, F_SETFL, v | O_NONBLOCK); } +static void block_signal(int signum) +{ + struct sigaction act; + + memset(&act, 0, sizeof(act)); + + act.sa_handler = SIG_IGN; + sigemptyset(&act.sa_mask); + sigaddset(&act.sa_mask, signum); + sigaction(signum, &act, NULL); +} + /* - structure describing a connected client in the daemon + send a packet to a client */ -struct ctdb_client { - struct ctdb_context *ctdb; - int fd; - struct ctdb_queue *queue; -}; - +static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr) +{ + client->ctdb->status.client_packets_sent++; + return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length); +} /* message handler for when we are in daemon mode. This redirects the message @@ -73,10 +98,9 @@ static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, len = offsetof(struct ctdb_req_message, data) + data.dsize; r = ctdbd_allocate_pkt(ctdb, len); -/*XXX cant use this since it returns an int CTDB_NO_MEMORY(ctdb, r);*/ talloc_set_name_const(r, "req_message packet"); - ZERO_STRUCT(*r); + memset(r, 0, offsetof(struct ctdb_req_message, data)); r->hdr.length = len; r->hdr.ctdb_magic = CTDB_MAGIC; @@ -85,11 +109,10 @@ static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, r->srvid = srvid; r->datalen = data.dsize; memcpy(&r->data[0], data.dptr, data.dsize); - - ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, len); + + daemon_queue_send(client, &r->hdr); talloc_free(r); - return; } @@ -105,187 +128,122 @@ static void daemon_request_register_message_handler(struct ctdb_client *client, c->srvid, daemon_message_handler, client); if (res != 0) { - printf("Failed to register handler %u in daemon\n", c->srvid); + DEBUG(0,(__location__ " Failed to register handler %u in daemon\n", + c->srvid)); + } else { + DEBUG(2,(__location__ " Registered message handler for srvid=%u\n", + c->srvid)); } } -static struct ctdb_call_state *ctdb_fetch_lock_send(struct ctdb_db_context *ctdb_db, - TALLOC_CTX *mem_ctx, - TDB_DATA key, TDB_DATA *data) +/* + called when the daemon gets a shutdown request from a client + */ +static void daemon_request_shutdown(struct ctdb_client *client, + struct ctdb_req_shutdown *f) { - struct ctdb_call *call; - struct ctdb_record_handle *rec; - struct ctdb_call_state *state; + struct ctdb_context *ctdb = talloc_get_type(client->ctdb, struct ctdb_context); + int len; + uint32_t node; - rec = talloc(mem_ctx, struct ctdb_record_handle); - CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec); + /* we dont send to ourself so we can already count one daemon as + exiting */ + ctdb->num_finished++; - - call = talloc(rec, struct ctdb_call); - ZERO_STRUCT(*call); - call->call_id = CTDB_FETCH_FUNC; - call->key = key; - call->flags = CTDB_IMMEDIATE_MIGRATION; + /* loop over all nodes of the cluster */ + for (node=0; nodenum_nodes;node++) { + struct ctdb_req_finished *rf; - rec->ctdb_db = ctdb_db; - rec->key = key; - rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize); - rec->data = data; + /* dont send a message to ourself */ + if (ctdb->vnn == node) { + continue; + } - state = ctdb_call_send(ctdb_db, call); - state->fetch_private = rec; + len = sizeof(struct ctdb_req_finished); + rf = ctdb->methods->allocate_pkt(ctdb, len); + CTDB_NO_MEMORY_FATAL(ctdb, rf); + talloc_set_name_const(rf, "ctdb_req_finished packet"); - return state; -} + ZERO_STRUCT(*rf); + rf->hdr.length = len; + rf->hdr.ctdb_magic = CTDB_MAGIC; + rf->hdr.ctdb_version = CTDB_VERSION; + rf->hdr.operation = CTDB_REQ_FINISHED; + rf->hdr.destnode = node; + rf->hdr.srcnode = ctdb->vnn; + rf->hdr.reqid = 0; -struct client_fetch_lock_data { - struct ctdb_client *client; - uint32_t reqid; -}; -static void daemon_fetch_lock_complete(struct ctdb_call_state *state) -{ - struct ctdb_reply_fetch_lock *r; - struct client_fetch_lock_data *data = talloc_get_type(state->async.private_data, struct client_fetch_lock_data); - struct ctdb_client *client = talloc_get_type(data->client, struct ctdb_client); - int length, res; + ctdb_queue_packet(ctdb, &(rf->hdr)); - length = offsetof(struct ctdb_reply_fetch_lock, data) + state->call.reply_data.dsize; - r = ctdbd_allocate_pkt(client->ctdb, length); - if (r == NULL) { - printf("Failed to allocate reply_call in ctdb daemon\n"); - return; + talloc_free(rf); } - ZERO_STRUCT(*r); - r->hdr.length = length; - r->hdr.ctdb_magic = CTDB_MAGIC; - r->hdr.ctdb_version = CTDB_VERSION; - r->hdr.operation = CTDB_REPLY_FETCH_LOCK; - r->hdr.reqid = data->reqid; - r->state = state->state; - r->datalen = state->call.reply_data.dsize; - memcpy(&r->data[0], state->call.reply_data.dptr, r->datalen); - res = ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, r->hdr.length); - if (res != 0) { - printf("Failed to queue packet from daemon to client\n"); + /* wait until all nodes have are prepared to shutdown */ + while (ctdb->num_finished != ctdb->num_nodes) { + event_loop_once(ctdb->ev); } - talloc_free(r); -} -/* - called when the daemon gets a fetch lock request from a client - */ -static void daemon_request_fetch_lock(struct ctdb_client *client, - struct ctdb_req_fetch_lock *f) -{ - struct ctdb_call_state *state; - TDB_DATA key, *data; - struct ctdb_db_context *ctdb_db; - struct client_fetch_lock_data *fl_data; - - ctdb_db = find_ctdb_db(client->ctdb, f->db_id); - - key.dsize = f->keylen; - key.dptr = &f->key[0]; - - data = talloc(client, TDB_DATA); - data->dptr = NULL; - data->dsize = 0; + /* all daemons have requested to finish - we now exit */ + DEBUG(1,("All daemons finished - exiting\n")); + _exit(0); +} - state = ctdb_fetch_lock_send(ctdb_db, client, key, data); - talloc_steal(state, data); - fl_data = talloc(state, struct client_fetch_lock_data); - fl_data->client = client; - fl_data->reqid = f->hdr.reqid; - state->async.fn = daemon_fetch_lock_complete; - state->async.private_data = fl_data; -} /* - called when the daemon gets a store unlock request from a client - - this would never block? + called when the daemon gets a connect wait request from a client */ -static void daemon_request_store_unlock(struct ctdb_client *client, - struct ctdb_req_store_unlock *f) +static void daemon_request_connect_wait(struct ctdb_client *client, + struct ctdb_req_connect_wait *c) { - struct ctdb_db_context *ctdb_db; - struct ctdb_reply_store_unlock r; - uint32_t caller = ctdb_get_vnn(client->ctdb); - struct ctdb_ltdb_header header; - TDB_DATA key, data; + struct ctdb_reply_connect_wait r; int res; - ctdb_db = find_ctdb_db(client->ctdb, f->db_id); - - /* write the data to ltdb */ - key.dsize = f->keylen; - key.dptr = &f->data[0]; - res = ctdb_ltdb_fetch(ctdb_db, key, &header, NULL, NULL); - if (res) { - ctdb_set_error(ctdb_db->ctdb, "Fetch of locally held record failed"); - res = -1; - goto done; - } - if (header.laccessor != caller) { - header.lacount = 0; - } - header.laccessor = caller; - header.lacount++; - data.dsize = f->datalen; - data.dptr = &f->data[f->keylen]; - res = ctdb_ltdb_store(ctdb_db, key, &header, data); - if ( res != 0) { - ctdb_set_error(ctdb_db->ctdb, "ctdb_call tdb_store failed\n"); - } - + /* first wait - in the daemon */ + ctdb_daemon_connect_wait(client->ctdb); -done: /* now send the reply */ ZERO_STRUCT(r); r.hdr.length = sizeof(r); r.hdr.ctdb_magic = CTDB_MAGIC; r.hdr.ctdb_version = CTDB_VERSION; - r.hdr.operation = CTDB_REPLY_STORE_UNLOCK; - r.hdr.reqid = f->hdr.reqid; - r.state = res; + r.hdr.operation = CTDB_REPLY_CONNECT_WAIT; + r.vnn = ctdb_get_vnn(client->ctdb); + r.num_connected = client->ctdb->num_connected; - res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length); + res = daemon_queue_send(client, &r.hdr); if (res != 0) { - printf("Failed to queue a store unlock response\n"); + DEBUG(0,(__location__ " Failed to queue a connect wait response\n")); return; } } + /* - called when the daemon gets a connect wait request from a client + called when the daemon gets a status request from a client */ -static void daemon_request_connect_wait(struct ctdb_client *client, - struct ctdb_req_connect_wait *c) +static void daemon_request_status(struct ctdb_client *client, + struct ctdb_req_status *c) { - struct ctdb_reply_connect_wait r; + struct ctdb_reply_status r; int res; - /* first wait - in the daemon */ - ctdb_daemon_connect_wait(client->ctdb); - /* now send the reply */ ZERO_STRUCT(r); r.hdr.length = sizeof(r); r.hdr.ctdb_magic = CTDB_MAGIC; r.hdr.ctdb_version = CTDB_VERSION; - r.hdr.operation = CTDB_REPLY_CONNECT_WAIT; - r.vnn = ctdb_get_vnn(client->ctdb); - r.num_connected = client->ctdb->num_connected; + r.hdr.operation = CTDB_REPLY_STATUS; + r.hdr.reqid = c->hdr.reqid; + r.status = client->ctdb->status; - res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length); + res = daemon_queue_send(client, &r.hdr); if (res != 0) { - printf("Failed to queue a connect wait response\n"); + DEBUG(0,(__location__ " Failed to queue a connect wait response\n")); return; } } @@ -323,11 +281,69 @@ static void daemon_request_message_from_client(struct ctdb_client *client, res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode, c->srvid, data); if (res != 0) { - printf("Failed to send message to remote node %u\n", - c->hdr.destnode); + DEBUG(0,(__location__ " Failed to send message to remote node %u\n", + c->hdr.destnode)); + } +} + + +struct daemon_call_state { + struct ctdb_client *client; + uint32_t reqid; + struct ctdb_call *call; + struct timeval start_time; +}; + +/* + complete a call from a client +*/ +static void daemon_call_from_client_callback(struct ctdb_call_state *state) +{ + struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, + struct daemon_call_state); + struct ctdb_reply_call *r; + int res; + uint32_t length; + struct ctdb_client *client = dstate->client; + + talloc_steal(client, dstate); + talloc_steal(dstate, dstate->call); + + res = ctdb_daemon_call_recv(state, dstate->call); + if (res != 0) { + DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n")); + client->ctdb->status.pending_calls--; + ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time); + return; } + + length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize; + r = ctdbd_allocate_pkt(dstate, length); + if (r == NULL) { + DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n")); + client->ctdb->status.pending_calls--; + ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time); + return; + } + memset(r, 0, offsetof(struct ctdb_reply_call, data)); + r->hdr.length = length; + r->hdr.ctdb_magic = CTDB_MAGIC; + r->hdr.ctdb_version = CTDB_VERSION; + r->hdr.operation = CTDB_REPLY_CALL; + r->hdr.reqid = dstate->reqid; + r->datalen = dstate->call->reply_data.dsize; + memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen); + + res = daemon_queue_send(client, &r->hdr); + if (res != 0) { + DEBUG(0, (__location__ "Failed to queue packet from daemon to client\n")); + } + ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time); + talloc_free(dstate); + client->ctdb->status.pending_calls--; } + /* this is called when the ctdb daemon received a ctdb request call from a local client over the unix domain socket @@ -337,103 +353,159 @@ static void daemon_request_call_from_client(struct ctdb_client *client, { struct ctdb_call_state *state; struct ctdb_db_context *ctdb_db; - struct ctdb_call call; - struct ctdb_reply_call *r; - int res; - uint32_t length; + struct daemon_call_state *dstate; + struct ctdb_call *call; + struct ctdb_ltdb_header header; + TDB_DATA key, data; + int ret; + struct ctdb_context *ctdb = client->ctdb; + + ctdb->status.total_calls++; + ctdb->status.pending_calls++; ctdb_db = find_ctdb_db(client->ctdb, c->db_id); if (!ctdb_db) { - printf("Unknown database in request. db_id==0x%08x",c->db_id); + DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x", + c->db_id)); + ctdb->status.pending_calls--; return; } - ZERO_STRUCT(call); - call.call_id = c->callid; - call.key.dptr = c->data; - call.key.dsize = c->keylen; - call.call_data.dptr = c->data + c->keylen; - call.call_data.dsize = c->calldatalen; + key.dptr = c->data; + key.dsize = c->keylen; - state = ctdb_call_send(ctdb_db, &call); + ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, + (struct ctdb_req_header *)c, &data, + daemon_incoming_packet, client); + if (ret == -2) { + /* will retry later */ + ctdb->status.pending_calls--; + return; + } -/* XXX this must be converted to fully async */ - res = ctdb_call_recv(state, &call); - if (res != 0) { - printf("ctdbd_call_recv() returned error\n"); - exit(1); + if (ret != 0) { + DEBUG(0,(__location__ " Unable to fetch record\n")); + ctdb->status.pending_calls--; + return; } - length = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize; - r = ctdbd_allocate_pkt(client->ctdb, length); - if (r == NULL) { - printf("Failed to allocate reply_call in ctdb daemon\n"); + dstate = talloc(client, struct daemon_call_state); + if (dstate == NULL) { + ctdb_ltdb_unlock(ctdb_db, key); + DEBUG(0,(__location__ " Unable to allocate dstate\n")); + ctdb->status.pending_calls--; + return; + } + dstate->start_time = timeval_current(); + dstate->client = client; + dstate->reqid = c->hdr.reqid; + talloc_steal(dstate, data.dptr); + + call = dstate->call = talloc_zero(dstate, struct ctdb_call); + if (call == NULL) { + ctdb_ltdb_unlock(ctdb_db, key); + DEBUG(0,(__location__ " Unable to allocate call\n")); + ctdb->status.pending_calls--; + ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time); return; } - ZERO_STRUCT(*r); - r->hdr.length = length; - r->hdr.ctdb_magic = CTDB_MAGIC; - r->hdr.ctdb_version = CTDB_VERSION; - r->hdr.operation = CTDB_REPLY_CALL; - r->hdr.reqid = c->hdr.reqid; - r->datalen = call.reply_data.dsize; - memcpy(&r->data[0], call.reply_data.dptr, r->datalen); - res = ctdb_queue_send(client->queue, (uint8_t *)&r, r->hdr.length); - if (res != 0) { - printf("Failed to queue packet from daemon to client\n"); + call->call_id = c->callid; + call->key = key; + call->call_data.dptr = c->data + c->keylen; + call->call_data.dsize = c->calldatalen; + call->flags = c->flags; + + if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { + state = ctdb_call_local_send(ctdb_db, call, &header, &data); + } else { + state = ctdb_daemon_call_send_remote(ctdb_db, call, &header); } - talloc_free(r); -} + ctdb_ltdb_unlock(ctdb_db, key); + + if (state == NULL) { + DEBUG(0,(__location__ " Unable to setup call send\n")); + ctdb->status.pending_calls--; + ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time); + return; + } + talloc_steal(state, dstate); + talloc_steal(client, state); + + state->async.fn = daemon_call_from_client_callback; + state->async.private_data = dstate; +} /* data contains a packet from the client */ -static void client_incoming_packet(struct ctdb_client *client, void *data, size_t nread) +static void daemon_incoming_packet(void *p, uint8_t *data, uint32_t nread) { - struct ctdb_req_header *hdr = data; + struct ctdb_req_header *hdr = (struct ctdb_req_header *)data; + struct ctdb_client *client = talloc_get_type(p, struct ctdb_client); + TALLOC_CTX *tmp_ctx; + struct ctdb_context *ctdb = client->ctdb; + + /* place the packet as a child of a tmp_ctx. We then use + talloc_free() below to free it. If any of the calls want + to keep it, then they will steal it somewhere else, and the + talloc_free() will be a no-op */ + tmp_ctx = talloc_new(client); + talloc_steal(tmp_ctx, hdr); if (hdr->ctdb_magic != CTDB_MAGIC) { - ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n"); + ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n"); goto done; } if (hdr->ctdb_version != CTDB_VERSION) { - ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); + ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version); goto done; } switch (hdr->operation) { case CTDB_REQ_CALL: + ctdb->status.client.req_call++; daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr); break; case CTDB_REQ_REGISTER: + ctdb->status.client.req_register++; daemon_request_register_message_handler(client, (struct ctdb_req_register *)hdr); break; case CTDB_REQ_MESSAGE: + ctdb->status.client.req_message++; daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr); break; case CTDB_REQ_CONNECT_WAIT: + ctdb->status.client.req_connect_wait++; daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr); break; - case CTDB_REQ_FETCH_LOCK: - daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr); + + case CTDB_REQ_SHUTDOWN: + ctdb->status.client.req_shutdown++; + daemon_request_shutdown(client, (struct ctdb_req_shutdown *)hdr); break; - case CTDB_REQ_STORE_UNLOCK: - daemon_request_store_unlock(client, (struct ctdb_req_store_unlock *)hdr); + + case CTDB_REQ_STATUS: + ctdb->status.client.req_status++; + daemon_request_status(client, (struct ctdb_req_status *)hdr); break; + default: - printf("daemon: unrecognized operation:%d\n",hdr->operation); + DEBUG(0,(__location__ " daemon: unrecognized operation %d\n", + hdr->operation)); } done: - talloc_free(data); + talloc_free(tmp_ctx); } - -static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) +/* + called when the daemon gets a incoming packet + */ +static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args) { struct ctdb_client *client = talloc_get_type(args, struct ctdb_client); struct ctdb_req_header *hdr; @@ -443,13 +515,15 @@ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) return; } + client->ctdb->status.client_packets_recv++; + if (cnt < sizeof(*hdr)) { - ctdb_set_error(client->ctdb, "Bad packet length %d\n", cnt); + ctdb_set_error(client->ctdb, "Bad packet length %d in daemon\n", cnt); return; } hdr = (struct ctdb_req_header *)data; if (cnt != hdr->length) { - ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n", + ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n in daemon", hdr->length, cnt); return; } @@ -460,12 +534,16 @@ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) } if (hdr->ctdb_version != CTDB_VERSION) { - ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); + ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version); return; } + DEBUG(3,(__location__ " client request %d of type %d length %d from " + "node %d to %d\n", hdr->reqid, hdr->operation, hdr->length, + hdr->srcnode, hdr->destnode)); + /* it is the responsibility of the incoming packet function to free 'data' */ - client_incoming_packet(client, data, cnt); + daemon_incoming_packet(client, data, cnt); } static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, @@ -490,7 +568,7 @@ static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, client->fd = fd; client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, - ctdb_client_read_cb, client); + ctdb_daemon_read_cb, client); talloc_set_destructor(client, ctdb_client_destructor); } @@ -507,10 +585,10 @@ static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde /* XXX this is a good place to try doing some cleaning up before exiting */ cnt = read(*fd, &buf, 1); if (cnt==0) { - printf("parent process exited. filedescriptor dissappeared\n"); + DEBUG(2,(__location__ " parent process exited. filedescriptor dissappeared\n")); exit(1); } else { - printf("ctdb: did not expect data from parent process\n"); + DEBUG(0,(__location__ " ctdb: did not expect data from parent process\n")); exit(1); } } @@ -559,7 +637,7 @@ static int unlink_destructor(const char *name) /* start the protocol going */ -int ctdbd_start(struct ctdb_context *ctdb) +int ctdb_start(struct ctdb_context *ctdb) { pid_t pid; static int fd[2]; @@ -575,18 +653,18 @@ int ctdbd_start(struct ctdb_context *ctdb) /* create a unix domain stream socket to listen to */ res = ux_socket_bind(ctdb); if (res!=0) { - printf("Failed to open CTDB unix domain socket\n"); + DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n")); exit(10); } res = pipe(&fd[0]); if (res) { - printf("Failed to open pipe for CTDB\n"); + DEBUG(0,(__location__ " Failed to open pipe for CTDB\n")); exit(1); } pid = fork(); if (pid==-1) { - printf("Failed to fork CTDB daemon\n"); + DEBUG(0,(__location__ " Failed to fork CTDB daemon\n")); exit(1); } @@ -597,12 +675,14 @@ int ctdbd_start(struct ctdb_context *ctdb) return 0; } + block_signal(SIGPIPE); + /* ensure the socket is deleted on exit of the daemon */ domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name); talloc_set_destructor(domain_socket_name, unlink_destructor); close(fd[1]); - ctdb_clear_flags(ctdb, CTDB_FLAG_DAEMON_MODE); + ctdb->ev = event_context_init(NULL); fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]); fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb); @@ -614,18 +694,18 @@ int ctdbd_start(struct ctdb_context *ctdb) /* allocate a packet for use in client<->daemon communication */ -void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len) +void *ctdbd_allocate_pkt(TALLOC_CTX *mem_ctx, size_t len) { int size; size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1); - return talloc_size(ctdb, size); + return talloc_size(mem_ctx, size); } -int ctdb_daemon_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, - ctdb_message_fn_t handler, - void *private_data) +/* + called when a CTDB_REQ_FINISHED packet comes in +*/ +void ctdb_request_finished(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { - return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data); + ctdb->num_finished++; } - diff --git a/source4/cluster/ctdb/common/ctdb_lockwait.c b/source4/cluster/ctdb/common/ctdb_lockwait.c new file mode 100644 index 0000000000..36b08796be --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb_lockwait.c @@ -0,0 +1,139 @@ +/* + wait for a tdb chain lock + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "system/filesys.h" +#include "system/wait.h" +#include "popt.h" +#include "db_wrap.h" +#include "lib/tdb/include/tdb.h" +#include "../include/ctdb_private.h" + + +struct lockwait_handle { + struct ctdb_context *ctdb; + struct fd_event *fde; + int fd[2]; + pid_t child; + void *private_data; + void (*callback)(void *); + struct timeval start_time; +}; + +static void lockwait_handler(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private_data) +{ + struct lockwait_handle *h = talloc_get_type(private_data, + struct lockwait_handle); + void (*callback)(void *) = h->callback; + void *p = h->private_data; + pid_t child = h->child; + talloc_set_destructor(h, NULL); + close(h->fd[0]); + ctdb_latency(&h->ctdb->status.max_lockwait_latency, h->start_time); + h->ctdb->status.pending_lockwait_calls--; + talloc_free(h); + callback(p); + waitpid(child, NULL, 0); +} + +static int lockwait_destructor(struct lockwait_handle *h) +{ + h->ctdb->status.pending_lockwait_calls--; + close(h->fd[0]); + kill(h->child, SIGKILL); + waitpid(h->child, NULL, 0); + return 0; +} + +/* + setup a non-blocking chainlock on a tdb record. If this function + returns NULL then it could not get the chainlock. Otherwise it + returns a opaque handle, and will call callback() once it has + managed to get the chainlock. You can cancel it by using talloc_free + on the returned handle. + + It is the callers responsibility to unlock the chainlock once + acquired + */ +struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db, + TDB_DATA key, + void (*callback)(void *private_data), + void *private_data) +{ + struct lockwait_handle *result; + int ret; + + ctdb_db->ctdb->status.lockwait_calls++; + ctdb_db->ctdb->status.pending_lockwait_calls++; + + if (!(result = talloc_zero(ctdb_db, struct lockwait_handle))) { + ctdb_db->ctdb->status.pending_lockwait_calls--; + return NULL; + } + + ret = pipe(result->fd); + + if (ret != 0) { + talloc_free(result); + ctdb_db->ctdb->status.pending_lockwait_calls--; + return NULL; + } + + result->child = fork(); + + if (result->child == (pid_t)-1) { + close(result->fd[0]); + close(result->fd[1]); + talloc_free(result); + ctdb_db->ctdb->status.pending_lockwait_calls--; + return NULL; + } + + result->callback = callback; + result->private_data = private_data; + result->ctdb = ctdb_db->ctdb; + + if (result->child == 0) { + close(result->fd[0]); + /* + * Do we need a tdb_reopen here? + */ + tdb_chainlock(ctdb_db->ltdb->tdb, key); + _exit(0); + } + + close(result->fd[1]); + talloc_set_destructor(result, lockwait_destructor); + + result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0], + EVENT_FD_READ, lockwait_handler, + (void *)result); + if (result->fde == NULL) { + talloc_free(result); + ctdb_db->ctdb->status.pending_lockwait_calls--; + return NULL; + } + + result->start_time = timeval_current(); + + return result; +} diff --git a/source4/cluster/ctdb/common/ctdb_ltdb.c b/source4/cluster/ctdb/common/ctdb_ltdb.c index 785ccad9b3..cb07a72375 100644 --- a/source4/cluster/ctdb/common/ctdb_ltdb.c +++ b/source4/cluster/ctdb/common/ctdb_ltdb.c @@ -45,9 +45,8 @@ struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *na /* this is the dummy null procedure that all databases support */ -static int ctdb_fetch_func(struct ctdb_call_info *call) +static int ctdb_null_func(struct ctdb_call_info *call) { - call->reply_data = &call->record_data; return 0; } @@ -82,10 +81,21 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, } } + if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) { + DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n", + ctdb->db_directory)); + talloc_free(ctdb_db); + return NULL; + } + + /* add the node id to the database name, so when we run on loopback + we don't conflict in the local filesystem */ + name = talloc_asprintf(ctdb_db, "%s/%s", ctdb->db_directory, name); + /* when we have a separate daemon this will need to be a real file, not a TDB_INTERNAL, so the parent can access it to for ltdb bypass */ - ctdb_db->ltdb = tdb_wrap_open(ctdb, name, 0, TDB_INTERNAL, open_flags, mode); + ctdb_db->ltdb = tdb_wrap_open(ctdb, name, 0, TDB_CLEAR_IF_FIRST, open_flags, mode); if (ctdb_db->ltdb == NULL) { ctdb_set_error(ctdb, "Failed to open tdb %s\n", name); talloc_free(ctdb_db); @@ -94,9 +104,10 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, /* - all databases support the "fetch" function. we need this in order to do forced migration of records + all databases support the "null" function. we need this in + order to do forced migration of records */ - ret = ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC); + ret = ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC); if (ret != 0) { talloc_free(ctdb_db); return NULL; @@ -145,13 +156,15 @@ int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db, rec = tdb_fetch(ctdb_db->ltdb->tdb, key); if (rec.dsize < sizeof(*header)) { + TDB_DATA d2; /* return an initial header */ - free(rec.dptr); + if (rec.dptr) free(rec.dptr); ltdb_initial_header(ctdb_db, key, header); + ZERO_STRUCT(d2); if (data) { - data->dptr = NULL; - data->dsize = 0; + *data = d2; } + ctdb_ltdb_store(ctdb_db, key, header, d2); return 0; } @@ -215,3 +228,118 @@ int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key) return tdb_chainunlock(ctdb_db->ltdb->tdb, key); } +struct lock_fetch_state { + struct ctdb_context *ctdb; + void (*recv_pkt)(void *, uint8_t *, uint32_t); + void *recv_context; + struct ctdb_req_header *hdr; +}; + +/* + called when we should retry the operation + */ +static void lock_fetch_callback(void *p) +{ + struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state); + state->recv_pkt(state->recv_context, (uint8_t *)state->hdr, state->hdr->length); + talloc_free(state); + DEBUG(2,(__location__ " PACKET REQUEUED\n")); +} + + +/* + do a non-blocking ltdb_lock, deferring this ctdb request until we + have the chainlock + + It does the following: + + 1) tries to get the chainlock. If it succeeds, then it returns 0 + + 2) if it fails to get a chainlock immediately then it sets up a + non-blocking chainlock via ctdb_lockwait, and when it gets the + chainlock it re-submits this ctdb request to the main packet + receive function + + This effectively queues all ctdb requests that cannot be + immediately satisfied until it can get the lock. This means that + the main ctdb daemon will not block waiting for a chainlock held by + a client + + There are 3 possible return values: + + 0: means that it got the lock immediately. + -1: means that it failed to get the lock, and won't retry + -2: means that it failed to get the lock immediately, but will retry + */ +int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, + TDB_DATA key, struct ctdb_req_header *hdr, + void (*recv_pkt)(void *, uint8_t *, uint32_t ), + void *recv_context) +{ + int ret; + struct tdb_context *tdb = ctdb_db->ltdb->tdb; + struct lockwait_handle *h; + struct lock_fetch_state *state; + + ret = tdb_chainlock_nonblock(tdb, key); + + if (ret != 0 && + !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) { + /* a hard failure - don't try again */ + return -1; + } + + /* when torturing, ensure we test the contended path */ + if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) && + random() % 5 == 0) { + ret = -1; + tdb_chainunlock(tdb, key); + } + + /* first the non-contended path */ + if (ret == 0) { + return 0; + } + + state = talloc(ctdb_db, struct lock_fetch_state); + state->ctdb = ctdb_db->ctdb; + state->hdr = hdr; + state->recv_pkt = recv_pkt; + state->recv_context = recv_context; + + /* now the contended path */ + h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state); + if (h == NULL) { + tdb_chainunlock(tdb, key); + return -1; + } + + /* we need to move the packet off the temporary context in ctdb_recv_pkt(), + so it won't be freed yet */ + talloc_steal(state, hdr); + talloc_steal(state, h); + + /* now tell the caller than we will retry asynchronously */ + return -2; +} + +/* + a varient of ctdb_ltdb_lock_requeue that also fetches the record + */ +int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, + TDB_DATA key, struct ctdb_ltdb_header *header, + struct ctdb_req_header *hdr, TDB_DATA *data, + void (*recv_pkt)(void *, uint8_t *, uint32_t ), + void *recv_context) +{ + int ret; + + ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, recv_context); + if (ret == 0) { + ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data); + if (ret != 0) { + ctdb_ltdb_unlock(ctdb_db, key); + } + } + return ret; +} diff --git a/source4/cluster/ctdb/common/ctdb_message.c b/source4/cluster/ctdb/common/ctdb_message.c index ad88ec22d2..70fcf00c4d 100644 --- a/source4/cluster/ctdb/common/ctdb_message.c +++ b/source4/cluster/ctdb/common/ctdb_message.c @@ -42,7 +42,8 @@ static int ctdb_dispatch_message(struct ctdb_context *ctdb, uint32_t srvid, TDB_ if (ml->srvid == srvid || ml->srvid == CTDB_SRVID_ALL) break; } if (ml == NULL) { - printf("daemon vnn:%d no msg handler for srvid=%u\n", ctdb_get_vnn(ctdb), srvid); + DEBUG(1,(__location__ " daemon vnn:%d no msg handler for srvid=%u\n", + ctdb_get_vnn(ctdb), srvid)); /* no registered message handler */ return -1; } @@ -86,7 +87,7 @@ static void ctdb_local_message_trigger(struct event_context *ev, struct timed_ev res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data); if (res != 0) { - printf("Failed to dispatch message for srvid=%u\n", m->srvid); + DEBUG(0, (__location__ " Failed to dispatch message for srvid=%u\n", m->srvid)); } talloc_free(m); } @@ -147,18 +148,6 @@ int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn, return 0; } -/* - send a ctdb message -*/ -int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn, - uint32_t srvid, TDB_DATA data) -{ - if (ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_send_message(ctdb, vnn, srvid, data); - } - return ctdb_daemon_send_message(ctdb, vnn, srvid, data); -} - /* when a client goes away, we need to remove its srvid handler from the list diff --git a/source4/cluster/ctdb/common/ctdb_util.c b/source4/cluster/ctdb/common/ctdb_util.c index cf0c72a58b..9a5e51bfa0 100644 --- a/source4/cluster/ctdb/common/ctdb_util.c +++ b/source4/cluster/ctdb/common/ctdb_util.c @@ -25,6 +25,8 @@ #include "system/filesys.h" #include "../include/ctdb_private.h" +int LogLevel; + /* return error string for last error */ @@ -100,3 +102,29 @@ uint32_t ctdb_hash(const TDB_DATA *key) return (1103515243 * value + 12345); } + +/* + a type checking varient of idr_find + */ +void *_idr_find_type(struct idr_context *idp, int id, const char *type, const char *location) +{ + void *p = idr_find(idp, id); + if (p && talloc_check_name(p, type) == NULL) { + DEBUG(0,("%s idr_find_type expected type %s but got %s\n", + location, type, talloc_get_name(p))); + return NULL; + } + return p; +} + + +/* + update a max latency number + */ +void ctdb_latency(double *latency, struct timeval t) +{ + double l = timeval_elapsed(&t); + if (l > *latency) { + *latency = l; + } +} diff --git a/source4/cluster/ctdb/config.mk b/source4/cluster/ctdb/config.mk index 0e0629bfb1..076562297f 100644 --- a/source4/cluster/ctdb/config.mk +++ b/source4/cluster/ctdb/config.mk @@ -24,6 +24,7 @@ OBJ_FILES = \ common/ctdb_util.o \ common/ctdb_io.o \ common/ctdb_client.o \ - common/ctdb_daemon.o + common/ctdb_daemon.o \ + common/ctdb_lockwait.o PUBLIC_DEPENDENCIES = LIBTDB LIBTALLOC PRIVATE_DEPENDENCIES = ctdb_tcp diff --git a/source4/cluster/ctdb/ctdb_cluster.c b/source4/cluster/ctdb/ctdb_cluster.c index c9233696e8..e3e60685b0 100644 --- a/source4/cluster/ctdb/ctdb_cluster.c +++ b/source4/cluster/ctdb/ctdb_cluster.c @@ -104,12 +104,30 @@ static void *ctdb_backend_handle(struct cluster_ops *ops) return (void *)state->ctdb; } +struct ctdb_handler_state { + struct cluster_state *state; + cluster_message_fn_t handler; + struct messaging_context *msg; +}; + /* dispatch incoming ctdb messages */ static void ctdb_message_handler(struct ctdb_context *ctdb, uint32_t srvid, TDB_DATA data, void *private) { + struct ctdb_handler_state *s = talloc_get_type(private, + struct ctdb_handler_state); + DATA_BLOB blob; + blob.data = data.dptr; + blob.length = data.dsize; + s->handler(s->msg, blob); +} + +static int ctdb_handler_destructor(struct ctdb_handler_state *s) +{ + /* XXX - tell ctdb to de-register the message handler */ + return 0; } /* @@ -120,7 +138,28 @@ static NTSTATUS ctdb_message_init(struct cluster_ops *ops, struct server_id server, cluster_message_fn_t handler) { - /* nothing to do - we're now using the wildcard message handler */ + struct cluster_state *state = ops->private; + struct ctdb_handler_state *h; + int ret; + + h = talloc(msg, struct ctdb_handler_state); + NT_STATUS_HAVE_NO_MEMORY(h); + + h->state = state; + h->handler = handler; + h->msg = msg; + + talloc_set_destructor(h, ctdb_handler_destructor); + + /* setup a message handler */ + ret = ctdb_set_message_handler(state->ctdb, server.id, + ctdb_message_handler, h); + if (ret == -1) { + DEBUG(0,("ctdb_set_message_handler failed - %s\n", + ctdb_errstr(state->ctdb))); + exit(1); + } + return NT_STATUS_OK; } @@ -198,11 +237,6 @@ void cluster_ctdb_init(struct event_context *ev, const char *model) ctdb_set_flags(state->ctdb, CTDB_FLAG_SELF_CONNECT); } - if (strcmp(model, "single") != 0) { - DEBUG(0,("Enabling ctdb daemon mode\n")); - ctdb_set_flags(state->ctdb, CTDB_FLAG_DAEMON_MODE); - } - lacount = lp_parm_int(-1, "ctdb", "maxlacount", -1); if (lacount != -1) { ctdb_set_max_lacount(state->ctdb, lacount); @@ -215,6 +249,12 @@ void cluster_ctdb_init(struct event_context *ev, const char *model) goto failed; } + ret = ctdb_set_tdb_dir(state->ctdb, lp_lockdir()); + if (ret == -1) { + DEBUG(0,("ctdb_set_tdb_dir failed - %s\n", ctdb_errstr(state->ctdb))); + goto failed; + } + /* tell ctdb what nodes are available */ ret = ctdb_set_nlist(state->ctdb, nlist); if (ret == -1) { @@ -230,14 +270,13 @@ void cluster_ctdb_init(struct event_context *ev, const char *model) if (ctdb_db == NULL) goto failed; } - /* setup a global message handler */ - ret = ctdb_set_message_handler(state->ctdb, CTDB_SRVID_ALL, - ctdb_message_handler, state); - if (ret == -1) { - DEBUG(0,("ctdb_set_message_handler failed - %s\n", - ctdb_errstr(state->ctdb))); - exit(1); - } + cluster_set_ops(&cluster_ctdb_ops); + + /* nasty hack for now ... */ + { + void brl_ctdb_init_ops(void); + brl_ctdb_init_ops(); + } /* start the protocol running */ ret = ctdb_start(state->ctdb); @@ -250,8 +289,6 @@ void cluster_ctdb_init(struct event_context *ev, const char *model) outside of test code) */ ctdb_connect_wait(state->ctdb); - cluster_set_ops(&cluster_ctdb_ops); - return; failed: diff --git a/source4/cluster/ctdb/direct/ctdbd.c b/source4/cluster/ctdb/direct/ctdbd.c index 700416e5e9..674b54d47a 100644 --- a/source4/cluster/ctdb/direct/ctdbd.c +++ b/source4/cluster/ctdb/direct/ctdbd.c @@ -23,6 +23,7 @@ #include "system/filesys.h" #include "popt.h" #include "system/wait.h" +#include "cmdline.h" static void block_signal(int signum) { @@ -43,21 +44,12 @@ static void block_signal(int signum) int main(int argc, const char *argv[]) { struct ctdb_context *ctdb; - const char *nlist = NULL; - const char *transport = "tcp"; - const char *myaddress = NULL; - int self_connect=0; - int daemon_mode=0; const char *db_list = "test.tdb"; char *s, *tok; struct poptOption popt_options[] = { POPT_AUTOHELP - { "nlist", 0, POPT_ARG_STRING, &nlist, 0, "node list file", "filename" }, - { "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" }, - { "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL }, - { "self-connect", 0, POPT_ARG_NONE, &self_connect, 0, "enable self connect", "boolean" }, - { "daemon", 0, POPT_ARG_NONE, &daemon_mode, 0, "spawn a ctdb daemon", "boolean" }, + POPT_CTDB_CMDLINE { "dblist", 0, POPT_ARG_STRING, &db_list, 0, "list of databases", NULL }, POPT_TABLEEND }; @@ -86,48 +78,11 @@ int main(int argc, const char *argv[]) while (extra_argv[extra_argc]) extra_argc++; } - if (nlist == NULL || myaddress == NULL) { - printf("You must provide a node list with --nlist and an address with --listen\n"); - exit(1); - } - block_signal(SIGPIPE); ev = event_context_init(NULL); - /* initialise ctdb */ - ctdb = ctdb_init(ev); - if (ctdb == NULL) { - printf("Failed to init ctdb\n"); - exit(1); - } - - if (self_connect) { - ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT); - } - if (daemon_mode) { - ctdb_set_flags(ctdb, CTDB_FLAG_DAEMON_MODE); - } - - ret = ctdb_set_transport(ctdb, transport); - if (ret == -1) { - printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb)); - exit(1); - } - - /* tell ctdb what address to listen on */ - ret = ctdb_set_address(ctdb, myaddress); - if (ret == -1) { - printf("ctdb_set_address failed - %s\n", ctdb_errstr(ctdb)); - exit(1); - } - - /* tell ctdb what nodes are available */ - ret = ctdb_set_nlist(ctdb, nlist); - if (ret == -1) { - printf("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb)); - exit(1); - } + ctdb = ctdb_cmdline_init(ev); /* attach to the list of databases */ s = talloc_strdup(ctdb, db_list); diff --git a/source4/cluster/ctdb/direct/ctdbd.sh b/source4/cluster/ctdb/direct/ctdbd.sh index 366226260b..7224bdec6d 100755 --- a/source4/cluster/ctdb/direct/ctdbd.sh +++ b/source4/cluster/ctdb/direct/ctdbd.sh @@ -3,6 +3,6 @@ killall -q ctdbd echo "Starting 2 ctdb daemons" -bin/ctdbd --nlist direct/nodes.txt --listen 127.0.0.2:9001 --daemon & -bin/ctdbd --nlist direct/nodes.txt --listen 127.0.0.1:9001 --daemon & +bin/ctdbd --nlist direct/nodes.txt --listen 127.0.0.2:9001 & +bin/ctdbd --nlist direct/nodes.txt --listen 127.0.0.1:9001 & diff --git a/source4/cluster/ctdb/direct/ctdbd_test.c b/source4/cluster/ctdb/direct/ctdbd_test.c index 019cdad30d..00b9f967df 100644 --- a/source4/cluster/ctdb/direct/ctdbd_test.c +++ b/source4/cluster/ctdb/direct/ctdbd_test.c @@ -184,78 +184,32 @@ uint32_t ctdb_hash(const TDB_DATA *key) return (1103515243 * value + 12345); } -void fetch_lock(int fd, uint32_t db_id, TDB_DATA key) -{ - struct ctdb_req_fetch_lock *req; - struct ctdb_reply_fetch_lock *rep; - uint32_t length; - int len, cnt, tot; - - len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize; - req = malloc(len); - - req->hdr.length = len; - req->hdr.ctdb_magic = CTDB_MAGIC; - req->hdr.ctdb_version = CTDB_VERSION; - req->hdr.operation = CTDB_REQ_FETCH_LOCK; - req->hdr.reqid = 1; - req->db_id = db_id; - req->keylen = key.dsize; - memcpy(&req->key[0], key.dptr, key.dsize); +/* ask the daemon to migrate a record over so that the local node is the dmaster the client must not have the record locked when performing this call. - cnt=write(fd, req, len); - - - /* wait fot the reply */ - /* read the 4 bytes of length for the pdu */ - cnt=0; - tot=4; - while(cnt!=tot){ - int numread; - numread=read(fd, ((char *)&length)+cnt, tot-cnt); - if(numread>0){ - cnt+=numread; - } - } - /* read the rest of the pdu */ - rep = malloc(length); - tot=length; - while(cnt!=tot){ - int numread; - numread=read(fd, ((char *)rep)+cnt, tot-cnt); - if(numread>0){ - cnt+=numread; - } - } - printf("fetch lock reply: state:%d datalen:%d\n",rep->state,rep->datalen); - if(!rep->datalen){ - printf("no data\n"); - } else { - printf("data:[%s]\n",rep->data); - } - -} - -void store_unlock(int fd, uint32_t db_id, TDB_DATA key, TDB_DATA data) + when the daemon has responded this node should be the dmaster (unless it has migrated off again) + */ +void fetch_record(int fd, uint32_t db_id, TDB_DATA key) { - struct ctdb_req_store_unlock *req; - struct ctdb_reply_store_unlock *rep; + struct ctdb_req_call *req; + struct ctdb_reply_call *rep; uint32_t length; int len, cnt, tot; - len = offsetof(struct ctdb_req_store_unlock, data) + key.dsize + data.dsize; + len = offsetof(struct ctdb_req_call, data) + key.dsize; req = malloc(len); req->hdr.length = len; req->hdr.ctdb_magic = CTDB_MAGIC; req->hdr.ctdb_version = CTDB_VERSION; - req->hdr.operation = CTDB_REQ_STORE_UNLOCK; + req->hdr.operation = CTDB_REQ_CALL; req->hdr.reqid = 1; + + req->flags = CTDB_IMMEDIATE_MIGRATION; req->db_id = db_id; + req->callid = CTDB_NULL_FUNC; req->keylen = key.dsize; - req->datalen = data.dsize; + req->calldatalen = 0; memcpy(&req->data[0], key.dptr, key.dsize); - memcpy(&req->data[key.dsize], data.dptr, data.dsize); cnt=write(fd, req, len); @@ -281,7 +235,7 @@ void store_unlock(int fd, uint32_t db_id, TDB_DATA key, TDB_DATA data) cnt+=numread; } } - printf("store unlock reply: state:%d\n",rep->state); + printf("fetch record reply: operation:%d state:%d\n",rep->hdr.operation,rep->status); } int main(int argc, const char *argv[]) @@ -291,8 +245,7 @@ int main(int argc, const char *argv[]) struct ctdb_req_message *reply; TDB_DATA dbname; uint32_t db_id; - TDB_DATA key, data; - char str[256]; + TDB_DATA key; /* open the socket to talk to the local ctdb daemon */ fd=ux_socket_connect(CTDB_SOCKET); @@ -338,25 +291,12 @@ int main(int argc, const char *argv[]) printf("the has for the database id is 0x%08x\n",db_id); printf("\n"); - /* send a fetch lock */ + /* send a request to migrate a record to the local node */ key.dptr=discard_const("TestKey"); key.dsize=strlen((const char *)(key.dptr)); printf("fetch the test key:[%s]\n",key.dptr); - fetch_lock(fd, db_id, key); - printf("\n"); - - /* send a store unlock */ - sprintf(str,"TestData_%d",getpid()); - data.dptr=discard_const(str); - data.dsize=strlen((const char *)(data.dptr)); - printf("store new data==[%s] for this record\n",data.dptr); - store_unlock(fd, db_id, key, data); - printf("\n"); - - /* send a fetch lock */ - printf("fetch the test key:[%s]\n",key.dptr); - fetch_lock(fd, db_id, key); + fetch_record(fd, db_id, key); printf("\n"); diff --git a/source4/cluster/ctdb/ib/ibw_ctdb_init.c b/source4/cluster/ctdb/ib/ibw_ctdb_init.c index 3b0c6ad28f..b4adfe6f12 100644 --- a/source4/cluster/ctdb/ib/ibw_ctdb_init.c +++ b/source4/cluster/ctdb/ib/ibw_ctdb_init.c @@ -158,10 +158,10 @@ static int ctdb_ibw_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t le /* * transport packet allocator - allows transport to control memory for packets */ -static void *ctdb_ibw_allocate_pkt(struct ctdb_context *ctdb, size_t size) +static void *ctdb_ibw_allocate_pkt(TALLOC_CTX *mem_ctx, size_t size) { /* TODO: use ibw_alloc_send_buf instead... */ - return talloc_size(ctdb, size); + return talloc_size(mem_ctx, size); } #ifdef __NOTDEF__ diff --git a/source4/cluster/ctdb/include/cmdline.h b/source4/cluster/ctdb/include/cmdline.h new file mode 100644 index 0000000000..785595ee6c --- /dev/null +++ b/source4/cluster/ctdb/include/cmdline.h @@ -0,0 +1,7 @@ + +extern struct poptOption popt_ctdb_cmdline[]; + +#define POPT_CTDB_CMDLINE { NULL, 0, POPT_ARG_INCLUDE_TABLE, popt_ctdb_cmdline, 0, "Common ctdb test options:", NULL }, + +struct ctdb_context *ctdb_cmdline_init(struct event_context *ev); + diff --git a/source4/cluster/ctdb/include/ctdb.h b/source4/cluster/ctdb/include/ctdb.h index f24f1000a4..cb765884b6 100644 --- a/source4/cluster/ctdb/include/ctdb.h +++ b/source4/cluster/ctdb/include/ctdb.h @@ -50,10 +50,7 @@ struct ctdb_call_info { ctdb flags */ #define CTDB_FLAG_SELF_CONNECT (1<<0) -/* fork off a separate ctdb daemon */ -#define CTDB_FLAG_DAEMON_MODE (1<<1) -/* for test code only: make ctdb_start() block until all nodes are connected */ -#define CTDB_FLAG_CONNECT_WAIT (1<<2) +#define CTDB_FLAG_TORTURE (1<<1) /* @@ -73,6 +70,11 @@ struct ctdb_context *ctdb_init(struct event_context *ev); */ int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport); +/* + set the directory for the local databases +*/ +int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir); + /* set some flags */ @@ -142,9 +144,10 @@ int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call); void ctdb_connect_wait(struct ctdb_context *ctdb); /* - wait until we're the only node left + initiate an ordered ctdb cluster shutdown + this function will never return */ -void ctdb_wait_loop(struct ctdb_context *ctdb); +void ctdb_shutdown(struct ctdb_context *ctdb); /* return vnn of this node */ uint32_t ctdb_get_vnn(struct ctdb_context *ctdb); @@ -163,8 +166,8 @@ int ctdb_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call); -struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call); -int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call); +struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call); +int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call); /* send a ctdb message */ int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn, @@ -172,19 +175,29 @@ int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn, /* - fetch and lock a ctdb record. Underneath this will force the + Fetch a ctdb record from a remote node + . Underneath this will force the dmaster for the record to be moved to the local node. - The lock is released when is talloc_free() is called on the - returned ctdb_record_handle. */ -struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data); +struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data); + /* - change the data in a record held with a ctdb_record_handle - if the new data is zero length, this implies a delete of the record - */ -int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data); + do a fetch lock from a client to the local daemon +*/ +#define FETCH_LOCK_SUCCESS 0 +#define FETCH_LOCK_LOCKFAILED 1 +#define FETCH_LOCK_FETCHFAILED 2 +#define FETCH_LOCK_DMASTERFAILED 3 + +int ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db, + TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data); + + +int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data); int ctdb_register_message_handler(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, @@ -194,4 +207,10 @@ int ctdb_register_message_handler(struct ctdb_context *ctdb, struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id); + +struct ctdb_context *ctdb_cmdline_client(struct event_context *ev, const char *ctdb_socket); + +struct ctdb_status; +int ctdb_status(struct ctdb_context *ctdb, struct ctdb_status *status); + #endif diff --git a/source4/cluster/ctdb/include/ctdb_private.h b/source4/cluster/ctdb/include/ctdb_private.h index c50b481cf3..a8f7b48f49 100644 --- a/source4/cluster/ctdb/include/ctdb_private.h +++ b/source4/cluster/ctdb/include/ctdb_private.h @@ -30,8 +30,8 @@ #define CTDB_DS_ALIGNMENT 8 +#define CTDB_NULL_FUNC 0xf0000001 -#define CTDB_FETCH_FUNC 0xf0000001 /* an installed ctdb remote call */ @@ -50,6 +50,11 @@ struct ctdb_address { int port; }; +/* + check a vnn is valid + */ +#define ctdb_validate_vnn(ctdb, vnn) (((uint32_t)(vnn)) < (ctdb)->num_nodes) + /* called from the queue code when a packet comes in. Called with data==NULL on error */ @@ -68,12 +73,6 @@ struct ctdb_node { uint32_t vnn; }; -struct ctdb_record_handle { - struct ctdb_db_context *ctdb_db; - TDB_DATA key; - TDB_DATA *data; -}; - /* transport specific methods */ @@ -81,7 +80,7 @@ struct ctdb_methods { int (*start)(struct ctdb_context *); /* start protocol processing */ int (*add_node)(struct ctdb_node *); /* setup a new node */ int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length); - void *(*allocate_pkt)(struct ctdb_context *, size_t ); + void *(*allocate_pkt)(TALLOC_CTX *mem_ctx, size_t ); }; /* @@ -115,14 +114,51 @@ struct ctdb_daemon_data { struct ctdb_queue *queue; }; +/* + ctdb status information + */ +struct ctdb_status { + uint32_t client_packets_sent; + uint32_t client_packets_recv; + uint32_t node_packets_sent; + uint32_t node_packets_recv; + struct { + uint32_t req_call; + uint32_t reply_call; + uint32_t reply_redirect; + uint32_t req_dmaster; + uint32_t reply_dmaster; + uint32_t reply_error; + uint32_t req_message; + uint32_t req_finished; + } count; + struct { + uint32_t req_call; + uint32_t req_message; + uint32_t req_finished; + uint32_t req_register; + uint32_t req_connect_wait; + uint32_t req_shutdown; + uint32_t req_status; + } client; + uint32_t total_calls; + uint32_t pending_calls; + uint32_t lockwait_calls; + uint32_t pending_lockwait_calls; + double max_call_latency; + double max_lockwait_latency; +}; + /* main state of the ctdb daemon */ struct ctdb_context { struct event_context *ev; struct ctdb_address address; const char *name; + const char *db_directory; uint32_t vnn; /* our own vnn */ uint32_t num_nodes; uint32_t num_connected; + uint32_t num_finished; unsigned flags; struct idr_context *idr; struct ctdb_node **nodes; /* array of nodes in the cluster - indexed by vnn */ @@ -134,6 +170,7 @@ struct ctdb_context { struct ctdb_db_context *db_list; struct ctdb_message_list *message_list; struct ctdb_daemon_data daemon; + struct ctdb_status status; }; struct ctdb_db_context { @@ -192,7 +229,6 @@ struct ctdb_call_state { struct ctdb_call call; int redirect_count; struct ctdb_ltdb_header header; - void *fetch_private; struct { void (*fn)(struct ctdb_call_state *); void *private_data; @@ -200,6 +236,14 @@ struct ctdb_call_state { }; +/* used for fetch_lock */ +struct ctdb_fetch_handle { + struct ctdb_db_context *ctdb_db; + TDB_DATA key; + TDB_DATA *data; + struct ctdb_ltdb_header header; +}; + /* operation IDs */ @@ -211,15 +255,15 @@ enum ctdb_operation { CTDB_REPLY_DMASTER = 4, CTDB_REPLY_ERROR = 5, CTDB_REQ_MESSAGE = 6, + CTDB_REQ_FINISHED = 7, /* only used on the domain socket */ CTDB_REQ_REGISTER = 1000, CTDB_REQ_CONNECT_WAIT = 1001, CTDB_REPLY_CONNECT_WAIT = 1002, - CTDB_REQ_FETCH_LOCK = 1003, - CTDB_REPLY_FETCH_LOCK = 1004, - CTDB_REQ_STORE_UNLOCK = 1005, - CTDB_REPLY_STORE_UNLOCK = 1006 + CTDB_REQ_SHUTDOWN = 1003, + CTDB_REQ_STATUS = 1004, + CTDB_REPLY_STATUS = 1005 }; #define CTDB_MAGIC 0x43544442 /* CTDB */ @@ -294,40 +338,31 @@ struct ctdb_req_message { uint8_t data[1]; }; -struct ctdb_req_connect_wait { +struct ctdb_req_finished { struct ctdb_req_header hdr; }; -struct ctdb_reply_connect_wait { +struct ctdb_req_shutdown { struct ctdb_req_header hdr; - uint32_t vnn; - uint32_t num_connected; }; -struct ctdb_req_fetch_lock { +struct ctdb_req_connect_wait { struct ctdb_req_header hdr; - uint32_t db_id; - uint32_t keylen; - uint8_t key[1]; /* key[] */ }; -struct ctdb_reply_fetch_lock { +struct ctdb_reply_connect_wait { struct ctdb_req_header hdr; - uint32_t state; - uint32_t datalen; - uint8_t data[1]; /* data[] */ + uint32_t vnn; + uint32_t num_connected; }; -struct ctdb_req_store_unlock { + +struct ctdb_req_status { struct ctdb_req_header hdr; - uint32_t db_id; - uint32_t keylen; - uint32_t datalen; - uint8_t data[1]; /* key[] and data[] */ }; -struct ctdb_reply_store_unlock { +struct ctdb_reply_status { struct ctdb_req_header hdr; - uint32_t state; + struct ctdb_status status; }; /* internal prototypes */ @@ -353,6 +388,16 @@ int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db, int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data); void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr); +int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, + TDB_DATA key, struct ctdb_req_header *hdr, + void (*recv_pkt)(void *, uint8_t *, uint32_t ), + void *recv_context); +int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, + TDB_DATA key, struct ctdb_ltdb_header *header, + struct ctdb_req_header *hdr, TDB_DATA *data, + void (*recv_pkt)(void *, uint8_t *, uint32_t ), + void *recv_context); +void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length); struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call, @@ -386,7 +431,7 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb, /* allocate a packet for use in client<->daemon communication */ -void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len); +void *ctdbd_allocate_pkt(TALLOC_CTX *mem_ctx, size_t len); /* @@ -437,16 +482,32 @@ int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn, void ctdb_daemon_connect_wait(struct ctdb_context *ctdb); -/* - do a fetch lock from a client to the local daemon -*/ -struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db, - TALLOC_CTX *mem_ctx, - TDB_DATA key, TDB_DATA *data); +struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db, + TDB_DATA key, + void (*callback)(void *), void *private_data); -/* - do a store unlock from a client to the local daemon -*/ -int ctdb_client_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data); +struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call); + +int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call); + +struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call, + struct ctdb_ltdb_header *header); + +void ctdb_request_finished(struct ctdb_context *ctdb, struct ctdb_req_header *hdr); + +int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call, + struct ctdb_ltdb_header *header, TDB_DATA *data, + uint32_t caller); + +void *_idr_find_type(struct idr_context *idp, int id, const char *type, const char *location); +#define idr_find_type(idp, id, type) (type *)_idr_find_type(idp, id, #type, __location__) + +void ctdb_recv_raw_pkt(void *p, uint8_t *data, uint32_t length); + +int ctdb_socket_connect(struct ctdb_context *ctdb); + +void ctdb_latency(double *latency, struct timeval t); #endif diff --git a/source4/cluster/ctdb/include/includes.h b/source4/cluster/ctdb/include/includes.h index 994c25452c..bffc66b358 100644 --- a/source4/cluster/ctdb/include/includes.h +++ b/source4/cluster/ctdb/include/includes.h @@ -6,15 +6,16 @@ #include "idtree.h" #include "ctdb.h" #include "lib/util/dlinklist.h" +#include "lib/util/debug.h" typedef bool BOOL; #define True 1 #define False 0 -#define LogLevel 0 +extern int LogLevel; -#define DEBUG(lvl, x) if ((lvl) <= LogLevel) (printf x) +#define DEBUG(lvl, x) if ((lvl) <= LogLevel) (do_debug x) #define _PUBLIC_ @@ -32,5 +33,5 @@ int timeval_compare(const struct timeval *tv1, const struct timeval *tv2); struct timeval timeval_until(const struct timeval *tv1, const struct timeval *tv2); _PUBLIC_ struct timeval timeval_current_ofs(uint32_t secs, uint32_t usecs); +double timeval_elapsed(struct timeval *tv); char **file_lines_load(const char *fname, int *numlines, TALLOC_CTX *mem_ctx); - diff --git a/source4/cluster/ctdb/opendb_ctdb.c b/source4/cluster/ctdb/opendb_ctdb.c index 6e2748291e..df399515eb 100644 --- a/source4/cluster/ctdb/opendb_ctdb.c +++ b/source4/cluster/ctdb/opendb_ctdb.c @@ -223,7 +223,8 @@ static NTSTATUS odb_push_record(struct odb_lock *lck, struct opendb_file *file) if (!file->num_entries) { dbuf.dptr = NULL; dbuf.dsize = 0; - ctdb_store_unlock(lck->rec, dbuf); + ctdb_record_store(lck->rec, dbuf); + talloc_free(lck->rec); return NT_STATUS_OK; } @@ -233,7 +234,8 @@ static NTSTATUS odb_push_record(struct odb_lock *lck, struct opendb_file *file) dbuf.dptr = blob.data; dbuf.dsize = blob.length; - ret = ctdb_store_unlock(lck->rec, dbuf); + ret = ctdb_record_store(lck->rec, dbuf); + talloc_free(lck->rec); data_blob_free(&blob); if (ret != 0) { return NT_STATUS_INTERNAL_DB_CORRUPTION; diff --git a/source4/cluster/ctdb/tcp/tcp_connect.c b/source4/cluster/ctdb/tcp/tcp_connect.c index a1f2d331cf..4d9d8e8386 100644 --- a/source4/cluster/ctdb/tcp/tcp_connect.c +++ b/source4/cluster/ctdb/tcp/tcp_connect.c @@ -43,6 +43,10 @@ void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private_data) struct ctdb_tcp_node *tnode = talloc_get_type( node->private_data, struct ctdb_tcp_node); + if (data == NULL) { + node->ctdb->upcalls->node_dead(node); + } + /* start a new connect cycle to try to re-establish the link */ close(tnode->fd); diff --git a/source4/cluster/ctdb/tcp/tcp_init.c b/source4/cluster/ctdb/tcp/tcp_init.c index 20b9bc9e33..9b54bb75ba 100644 --- a/source4/cluster/ctdb/tcp/tcp_init.c +++ b/source4/cluster/ctdb/tcp/tcp_init.c @@ -46,12 +46,6 @@ static int ctdb_tcp_start(struct ctdb_context *ctdb) ctdb_tcp_node_connect, node); } - if (ctdb->flags&CTDB_FLAG_CONNECT_WAIT) { - /* wait until all nodes are connected (should not be needed - outide of test code) */ - ctdb_connect_wait(ctdb); - } - return 0; } @@ -78,13 +72,13 @@ static int ctdb_tcp_add_node(struct ctdb_node *node) /* transport packet allocator - allows transport to control memory for packets */ -static void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size) +static void *ctdb_tcp_allocate_pkt(TALLOC_CTX *mem_ctx, size_t size) { /* tcp transport needs to round to 8 byte alignment to ensure that we can use a length header and 64 bit elements in structures */ size = (size+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1); - return talloc_size(ctdb, size); + return talloc_size(mem_ctx, size); } diff --git a/source4/cluster/ctdb/tests/bench.sh b/source4/cluster/ctdb/tests/bench.sh index 50e9e08f99..3d0696f171 100755 --- a/source4/cluster/ctdb/tests/bench.sh +++ b/source4/cluster/ctdb/tests/bench.sh @@ -3,7 +3,7 @@ killall -q ctdb_bench echo "Trying 2 nodes" -bin/ctdb_bench --nlist tests/nodes.txt --listen 127.0.0.2:9001 $* & -bin/ctdb_bench --nlist tests/nodes.txt --listen 127.0.0.1:9001 $* +$VALGRIND bin/ctdb_bench --nlist tests/nodes.txt --listen 127.0.0.2:9001 $* & +$VALGRIND bin/ctdb_bench --nlist tests/nodes.txt --listen 127.0.0.1:9001 $* +wait -killall -q ctdb_bench diff --git a/source4/cluster/ctdb/tests/bench1.sh b/source4/cluster/ctdb/tests/bench1.sh index 3481d82be2..9adcf3198b 100755 --- a/source4/cluster/ctdb/tests/bench1.sh +++ b/source4/cluster/ctdb/tests/bench1.sh @@ -4,5 +4,4 @@ killall -q ctdb_bench echo "Trying 1 nodes" bin/ctdb_bench --nlist tests/1node.txt --listen 127.0.0.2:9001 $* - -killall -q ctdb_bench +wait diff --git a/source4/cluster/ctdb/tests/ctdb_bench.c b/source4/cluster/ctdb/tests/ctdb_bench.c index 37c095c0c0..02fcc1f2d4 100644 --- a/source4/cluster/ctdb/tests/ctdb_bench.c +++ b/source4/cluster/ctdb/tests/ctdb_bench.c @@ -22,7 +22,7 @@ #include "lib/events/events.h" #include "system/filesys.h" #include "popt.h" -#include "tests/cmdline.h" +#include "cmdline.h" #include #include @@ -45,7 +45,6 @@ static double end_timer(void) static int timelimit = 10; static int num_records = 10; static int num_msgs = 1; -static int num_repeats = 100; enum my_functions {FUNC_INCR=1, FUNC_FETCH=2}; @@ -78,51 +77,6 @@ static int fetch_func(struct ctdb_call_info *call) return 0; } -/* - benchmark incrementing an integer -*/ -static void bench_incr(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db) -{ - int loops=0; - int ret, i; - struct ctdb_call call; - - ZERO_STRUCT(call); - - start_timer(); - - while (1) { - uint32_t v = loops % num_records; - - call.call_id = FUNC_INCR; - call.key.dptr = (uint8_t *)&v; - call.key.dsize = 4; - - for (i=0;i timelimit) break; - printf("Incr: %.2f ops/sec\r", num_repeats*loops/end_timer()); - fflush(stdout); - } - } - - call.call_id = FUNC_FETCH; - - ret = ctdb_call(ctdb_db, &call); - if (ret == -1) { - printf("ctdb_call FUNC_FETCH failed - %s\n", ctdb_errstr(ctdb)); - return; - } - - printf("Incr: %.2f ops/sec (loops=%d val=%d)\n", - num_repeats*loops/end_timer(), loops, *(uint32_t *)call.reply_data.dptr); -} static int msg_count; static int msg_plus, msg_minus; @@ -259,6 +213,7 @@ int main(int argc, const char *argv[]) bench_ring(ctdb, ev); /* shut it down */ - talloc_free(ctdb); + ctdb_shutdown(ctdb); + return 0; } diff --git a/source4/cluster/ctdb/tests/ctdb_fetch.c b/source4/cluster/ctdb/tests/ctdb_fetch.c index febaf13fe4..39bd861fc1 100644 --- a/source4/cluster/ctdb/tests/ctdb_fetch.c +++ b/source4/cluster/ctdb/tests/ctdb_fetch.c @@ -22,7 +22,7 @@ #include "lib/events/events.h" #include "system/filesys.h" #include "popt.h" -#include "tests/cmdline.h" +#include "cmdline.h" #include #include @@ -58,18 +58,18 @@ static int msg_count; static void bench_fetch_1node(struct ctdb_context *ctdb) { TDB_DATA key, data, nulldata; - struct ctdb_record_handle *rec; struct ctdb_db_context *ctdb_db; TALLOC_CTX *tmp_ctx = talloc_new(ctdb); int dest, ret; + struct ctdb_record_handle *h; - key.dptr = discard_const("testkey"); - key.dsize = strlen((const char *)key.dptr); + key.dptr = discard_const(TESTKEY); + key.dsize = strlen(TESTKEY); ctdb_db = ctdb_db_handle(ctdb, "test.tdb"); - rec = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data); - if (rec == NULL) { + h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data); + if (h == NULL) { printf("Failed to fetch record '%s' on node %d\n", (const char *)key.dptr, ctdb_get_vnn(ctdb)); talloc_free(tmp_ctx); @@ -88,7 +88,8 @@ static void bench_fetch_1node(struct ctdb_context *ctdb) msg_count, ctdb_get_vnn(ctdb)); data.dsize = strlen((const char *)data.dptr)+1; - ret = ctdb_store_unlock(rec, data); + ret = ctdb_record_store(h, data); + talloc_free(h); if (ret != 0) { printf("Failed to store record\n"); } @@ -141,6 +142,10 @@ static void bench_fetch(struct ctdb_context *ctdb, struct event_context *ev) printf("Event loop failed!\n"); break; } + + if (LogLevel > 9) { + talloc_report_null_full(); + } } printf("Fetch: %.2f msgs/sec\n", msg_count/end_timer()); @@ -192,6 +197,8 @@ int main(int argc, const char *argv[]) } } + /* talloc_enable_leak_report_full(); */ + /* setup the remaining options for the main program to use */ extra_argv = poptGetArgs(pc); if (extra_argv) { @@ -240,7 +247,8 @@ int main(int argc, const char *argv[]) printf("DATA:\n%s\n", (char *)call.reply_data.dptr); - /* shut it down */ - talloc_free(ctdb); + /* go into a wait loop to allow other nodes to complete */ + ctdb_shutdown(ctdb); + return 0; } diff --git a/source4/cluster/ctdb/tests/ctdb_fetch1.c b/source4/cluster/ctdb/tests/ctdb_fetch1.c index ffe9c7c946..b92e7fcda3 100644 --- a/source4/cluster/ctdb/tests/ctdb_fetch1.c +++ b/source4/cluster/ctdb/tests/ctdb_fetch1.c @@ -24,7 +24,8 @@ #include "popt.h" #include "ctdb.h" #include "ctdb_private.h" -#include "tests/cmdline.h" +#include "cmdline.h" +#include #define PARENT_SRVID 0 #define CHILD1_SRVID 1 @@ -32,6 +33,20 @@ int num_msg=0; +static struct timeval tp1,tp2; + +static void start_timer(void) +{ + gettimeofday(&tp1,NULL); +} + +static double end_timer(void) +{ + gettimeofday(&tp2,NULL); + return (tp2.tv_sec + (tp2.tv_usec*1.0e-6)) - + (tp1.tv_sec + (tp1.tv_usec*1.0e-6)); +} + static void message_handler(struct ctdb_context *ctdb, uint32_t srvid, TDB_DATA data, void *private_data) { @@ -45,9 +60,9 @@ static void child_handler(struct ctdb_context *ctdb, uint32_t srvid, void test1(struct ctdb_db_context *ctdb_db) { - struct ctdb_record_handle *rh; TDB_DATA key, data, data2, store_data; int ret; + struct ctdb_record_handle *h; /* test 1 : write data and read it back. should all be the same @@ -55,13 +70,27 @@ void test1(struct ctdb_db_context *ctdb_db) printf("Test1: write and verify we can read it back: "); key.dptr = discard_const("Record"); key.dsize = strlen((const char *)key.dptr)+1; - rh = ctdb_fetch_lock(ctdb_db, ctdb_db, key, &data); + h = ctdb_fetch_lock(ctdb_db, ctdb_db, key, &data); + if (h == NULL) { + printf("test1: ctdb_fetch_lock() failed\n"); + exit(1); + } store_data.dptr = discard_const("data to store"); store_data.dsize = strlen((const char *)store_data.dptr)+1; - ret = ctdb_store_unlock(rh, store_data); + ret = ctdb_record_store(h, store_data); + talloc_free(h); + if (ret!=0) { + printf("test1: ctdb_record_store() failed\n"); + exit(1); + } + + h = ctdb_fetch_lock(ctdb_db, ctdb_db, key, &data2); + if (h == NULL) { + printf("test1: ctdb_fetch_lock() failed\n"); + exit(1); + } - rh = ctdb_fetch_lock(ctdb_db, ctdb_db, key, &data2); /* hopefully data2 will now contain the record written above */ if (!strcmp("data to store", (const char *)data2.dptr)) { printf("SUCCESS\n"); @@ -71,14 +100,19 @@ void test1(struct ctdb_db_context *ctdb_db) } /* just write it back to unlock it */ - ret = ctdb_store_unlock(rh, store_data); + ret = ctdb_record_store(h, store_data); + talloc_free(h); + if (ret!=0) { + printf("test1: ctdb_record_store() failed\n"); + exit(1); + } } void child(int srvid, struct event_context *ev, struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db) { TDB_DATA data; - struct ctdb_record_handle *rh; TDB_DATA key, data2; + struct ctdb_record_handle *h; data.dptr=discard_const("dummy message"); data.dsize=strlen((const char *)data.dptr)+1; @@ -94,13 +128,23 @@ void child(int srvid, struct event_context *ev, struct ctdb_context *ctdb, struc /* fetch and lock the record */ key.dptr = discard_const("Record"); key.dsize = strlen((const char *)key.dptr)+1; - rh = ctdb_fetch_lock(ctdb_db, ctdb_db, key, &data2); + printf("client:%d fetching the record\n",srvid); + h = ctdb_fetch_lock(ctdb_db, ctdb_db, key, &data2); + printf("client:%d the record is fetched and locked\n",srvid); + if (h == NULL) { + printf("client: ctdb_fetch_lock() failed\n"); + exit(1); + } ctdb_send_message(ctdb, ctdb_get_vnn(ctdb), PARENT_SRVID, data); - - while (1) { + /* wait until parent tells us to release the lock */ + while (num_msg==1) { event_loop_once(ev); } + + printf("child %d terminating\n",srvid); + exit(10); + } /* @@ -204,29 +248,48 @@ int main(int argc, const char *argv[]) */ data.dptr=discard_const("dummy message"); data.dsize=strlen((const char *)data.dptr)+1; - printf("Send message to child 1 to fetch_lock the record\n"); ctdb_send_message(ctdb, ctdb_get_vnn(ctdb), CHILD1_SRVID, data); /* wait for child 1 to complete fetching and locking the record */ while (num_msg!=3) { event_loop_once(ev); } - printf("Child 1 has fetched and locked the record\n"); /* now tell child 2 to fetch and lock the same record */ - printf("Send message to child 2 to fetch_lock the record\n"); ctdb_send_message(ctdb, ctdb_get_vnn(ctdb), CHILD2_SRVID, data); - /* wait for child 2 to complete fetching and locking the record */ - while (num_msg!=4) { + /* wait a while for child 2 to complete fetching and locking the + record, this should fail since the record is already locked + by the first child */ + start_timer(); + while ( (end_timer() < 1.0) && (num_msg!=4) ) { event_loop_once(ev); } - printf("Child 2 has fetched and locked the record\n"); + if (num_msg!=4) { + printf("Child 2 did not get the lock since it is held by client 1:SUCCESS\n"); + } else { + printf("Child 2 did get the lock:FAILURE\n"); + exit(10); + } + /* send message to child 1 to terminate, which should let child 2 + get the lock. + */ + ctdb_send_message(ctdb, ctdb_get_vnn(ctdb), CHILD1_SRVID, data); - while (1) { + + /* wait for a final message from child 2 it has received the lock + which indicates success */ + while (num_msg!=4) { event_loop_once(ev); } + printf("child 2 aquired the lock after child 1 terminated:SUCCESS\n"); + + /* send a message to child 2 to tell it to terminate too */ + ctdb_send_message(ctdb, ctdb_get_vnn(ctdb), CHILD2_SRVID, data); + + + printf("Test was SUCCESSFUL\n"); /* shut it down */ talloc_free(ctdb); diff --git a/source4/cluster/ctdb/tests/ctdb_messaging.c b/source4/cluster/ctdb/tests/ctdb_messaging.c index d02c2116e8..e7057b64fb 100644 --- a/source4/cluster/ctdb/tests/ctdb_messaging.c +++ b/source4/cluster/ctdb/tests/ctdb_messaging.c @@ -22,7 +22,7 @@ #include "lib/events/events.h" #include "system/filesys.h" #include "popt.h" -#include "tests/cmdline.h" +#include "cmdline.h" static int timelimit = 10; static int num_records = 10; @@ -130,7 +130,7 @@ int main(int argc, const char *argv[]) for (j=0;jcallback; + void *p = h->private_data; + talloc_set_destructor(h, NULL); + close(h->fd[0]); + talloc_free(h); + callback(p); + waitpid(h->child, NULL, 0); +} + +static int lockwait_destructor(struct lockwait_handle *h) +{ + close(h->fd[0]); + kill(h->child, SIGKILL); + waitpid(h->child, NULL, 0); + return 0; +} + + +static struct lockwait_handle *lockwait(struct event_context *ev, + TALLOC_CTX *mem_ctx, + int fd, off_t ofs, size_t len, + void (*callback)(void *), void *private_data) +{ + struct lockwait_handle *h; + int ret; + + h = talloc_zero(mem_ctx, struct lockwait_handle); + if (h == NULL) { + return NULL; + } + + ret = pipe(h->fd); + if (ret != 0) { + talloc_free(h); + return NULL; + } + + h->child = fork(); + if (h->child == (pid_t)-1) { + close(h->fd[0]); + close(h->fd[1]); + talloc_free(h); + return NULL; + } + + h->callback = callback; + h->private_data = private_data; + + if (h->child == 0) { + /* in child */ + struct flock lock; + close(h->fd[0]); + lock.l_type = F_WRLCK; + lock.l_whence = SEEK_SET; + lock.l_start = ofs; + lock.l_len = len; + lock.l_pid = 0; + fcntl(fd,F_SETLKW,&lock); + _exit(0); + } + + close(h->fd[1]); + talloc_set_destructor(h, lockwait_destructor); + + h->fde = event_add_fd(ev, h, h->fd[0], EVENT_FD_READ, lockwait_handler, h); + if (h->fde == NULL) { + talloc_free(h); + return NULL; + } + + return h; +} + + + + +static void fcntl_lock_callback(void *p) +{ + int *got_lock = (int *)p; + *got_lock = 1; +} + +/* + get an fcntl lock - waiting if necessary + */ +static int fcntl_lock(struct event_context *ev, + int fd, int op, off_t offset, off_t count, int type) +{ + struct flock lock; + int ret; + int use_lockwait = (op == F_SETLKW); + int got_lock = 0; + + lock.l_type = type; + lock.l_whence = SEEK_SET; + lock.l_start = offset; + lock.l_len = count; + lock.l_pid = 0; + + do { + ret = fcntl(fd,use_lockwait?F_SETLK:op,&lock); + if (ret == 0) { + return 0; + } + if (ret == -1 && + (errno == EACCES || errno == EAGAIN || errno == EDEADLK)) { + struct lockwait_handle *h; + h = lockwait(ev, ev, fd, offset, count, + fcntl_lock_callback, &got_lock); + if (h == NULL) { + errno = ENOLCK; + return -1; + } + /* in real code we would return to the event loop */ + while (!got_lock) { + event_loop_once(ev); + } + got_lock = 0; + } + } while (!got_lock); + + return ret; +} + +static void child(struct event_context *ev, int n) +{ + int fd; + int count=0; + struct timeval tv; + fd = open("test.dat", O_CREAT|O_RDWR, 0666); + if (fd == -1) { + perror("test.dat"); + exit(1); + } + + tv = timeval_current(); + + while (timeval_elapsed(&tv) < 10) { + int ret; + ret = fcntl_lock(ev, fd, F_SETLKW, 0, 1, F_WRLCK); + if (ret != 0) { + printf("Failed to get lock in child %d!\n", n); + break; + } + fcntl_lock(ev, fd, F_SETLK, 0, 1, F_UNLCK); + count++; + } + + printf("child %2d %.0f ops/sec\n", n, count/timeval_elapsed(&tv)); + _exit(0); +} + +static int timelimit = 10; + +/* + main program +*/ +int main(int argc, const char *argv[]) +{ + pid_t *pids; + int nprogs = 2; + int i; + struct event_context *ev; + struct poptOption popt_options[] = { + POPT_AUTOHELP + { "timelimit", 't', POPT_ARG_INT, &timelimit, 0, "timelimit", "integer" }, + { "num-progs", 'n', POPT_ARG_INT, &nprogs, 0, "num_progs", "integer" }, + POPT_TABLEEND + }; + poptContext pc; + int opt; + + pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST); + + while ((opt = poptGetNextOpt(pc)) != -1) { + switch (opt) { + default: + fprintf(stderr, "Invalid option %s: %s\n", + poptBadOption(pc, 0), poptStrerror(opt)); + exit(1); + } + } + + ev = event_context_init(NULL); + + pids = talloc_array(ev, pid_t, nprogs); + + /* create N processes fighting over the same lock */ + for (i=0;iclient_packets_sent); + printf(" client_packets_recv %u\n", s->client_packets_recv); + printf(" req_call %u\n", s->client.req_call); + printf(" req_message %u\n", s->client.req_message); + printf(" req_finished %u\n", s->client.req_finished); + printf(" req_register %u\n", s->client.req_register); + printf(" req_connect_wait %u\n", s->client.req_connect_wait); + printf(" req_shutdown %u\n", s->client.req_shutdown); + printf(" req_status %u\n", s->client.req_status); + printf(" node_packets_sent %u\n", s->node_packets_sent); + printf(" node_packets_recv %u\n", s->node_packets_recv); + printf(" req_call %u\n", s->client.req_call); + printf(" reply_call %u\n", s->count.reply_call); + printf(" reply_redirect %u\n", s->count.reply_redirect); + printf(" req_dmaster %u\n", s->count.req_dmaster); + printf(" reply_dmaster %u\n", s->count.reply_dmaster); + printf(" reply_error %u\n", s->count.reply_error); + printf(" reply_redirect %u\n", s->count.reply_redirect); + printf(" req_message %u\n", s->count.req_message); + printf(" req_finished %u\n", s->count.req_finished); + printf(" total_calls %u\n", s->total_calls); + printf(" pending_calls %u\n", s->pending_calls); + printf(" lockwait_calls %u\n", s->lockwait_calls); + printf(" pending_lockwait_calls %u\n", s->pending_lockwait_calls); + printf(" max_call_latency %.6f seconds\n", s->max_call_latency); + printf(" max_lockwait_latency %.6f seconds\n", s->max_lockwait_latency); +} + +/* + show usage message + */ +static void usage(void) +{ + printf("Usage: ctdb_status \n"); + exit(1); +} + +/* + main program +*/ +int main(int argc, const char *argv[]) +{ + struct ctdb_context *ctdb; + struct poptOption popt_options[] = { + POPT_AUTOHELP + POPT_CTDB_CMDLINE + POPT_TABLEEND + }; + int opt; + const char **extra_argv; + int extra_argc = 0; + int ret; + poptContext pc; + struct event_context *ev; + const char *ctdb_socket; + struct ctdb_status status; + + pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST); + + while ((opt = poptGetNextOpt(pc)) != -1) { + switch (opt) { + default: + fprintf(stderr, "Invalid option %s: %s\n", + poptBadOption(pc, 0), poptStrerror(opt)); + exit(1); + } + } + + /* setup the remaining options for the main program to use */ + extra_argv = poptGetArgs(pc); + if (extra_argv) { + extra_argv++; + while (extra_argv[extra_argc]) extra_argc++; + } + + if (extra_argc < 1) { + usage(); + } + + ctdb_socket = extra_argv[0]; + + ev = event_context_init(NULL); + + /* initialise ctdb */ + ctdb = ctdb_cmdline_client(ev, ctdb_socket); + if (ctdb == NULL) { + printf("Failed to init ctdb\n"); + exit(1); + } + + ret = ctdb_status(ctdb, &status); + if (ret != 0) { + printf("Failed to get ctdb status\n"); + exit(1); + } + + show_status(&status); + + return 0; +} -- cgit