diff options
author | Andrew Tridgell <tridge@samba.org> | 2007-01-19 03:54:48 +0000 |
---|---|---|
committer | Gerald (Jerry) Carter <jerry@samba.org> | 2007-10-10 14:43:46 -0500 |
commit | 5cb78383fafa15c2ff7a4ccd194cccd5cf5cd263 (patch) | |
tree | 050e2f47faf234685cd7f20ab7e4f37e6521f7a2 /source4/cluster/ctdb/common | |
parent | 8c3d15f6caa3f1ffda86755fa9b7ff9602cbb022 (diff) | |
download | samba-5cb78383fafa15c2ff7a4ccd194cccd5cf5cd263.tar.gz samba-5cb78383fafa15c2ff7a4ccd194cccd5cf5cd263.tar.bz2 samba-5cb78383fafa15c2ff7a4ccd194cccd5cf5cd263.zip |
r20889: import ctdb cluster backend from bzr
it will be interesting to see how the build farm handles this
(This used to be commit 53be449630bd67d649a9e70cc7e25a9799c0616b)
Diffstat (limited to 'source4/cluster/ctdb/common')
-rw-r--r-- | source4/cluster/ctdb/common/ctdb.c | 287 | ||||
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_call.c | 653 | ||||
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_ltdb.c | 139 | ||||
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_util.c | 103 |
4 files changed, 1182 insertions, 0 deletions
diff --git a/source4/cluster/ctdb/common/ctdb.c b/source4/cluster/ctdb/common/ctdb.c new file mode 100644 index 0000000000..ad0345b3c7 --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb.c @@ -0,0 +1,287 @@ +/* + ctdb main protocol code + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/tdb/include/tdb.h" +#include "lib/events/events.h" +#include "lib/util/dlinklist.h" +#include "system/network.h" +#include "system/filesys.h" +#include "cluster/ctdb/include/ctdb_private.h" + +/* + choose the transport we will use +*/ +int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport) +{ + int ctdb_tcp_init(struct ctdb_context *ctdb); + + if (strcmp(transport, "tcp") == 0) { + return ctdb_tcp_init(ctdb); + } + ctdb_set_error(ctdb, "Unknown transport '%s'\n", transport); + return -1; +} + +/* + set some ctdb flags +*/ +void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags) +{ + ctdb->flags |= flags; +} + + +/* + add a node to the list of active nodes +*/ +static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr) +{ + struct ctdb_node *node, **nodep; + + nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1); + CTDB_NO_MEMORY(ctdb, nodep); + + ctdb->nodes = nodep; + nodep = &ctdb->nodes[ctdb->num_nodes]; + (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node); + CTDB_NO_MEMORY(ctdb, *nodep); + node = *nodep; + + if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) { + return -1; + } + node->ctdb = ctdb; + node->name = talloc_asprintf(node, "%s:%u", + node->address.address, + node->address.port); + /* for now we just set the vnn to the line in the file - this + will change! */ + node->vnn = ctdb->num_nodes; + + if (ctdb->methods->add_node(node) != 0) { + talloc_free(node); + return -1; + } + + if (ctdb_same_address(&ctdb->address, &node->address)) { + ctdb->vnn = node->vnn; + } + + ctdb->num_nodes++; + + return 0; +} + +/* + setup the node list from a file +*/ +int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist) +{ + char **lines; + int nlines; + int i; + + lines = file_lines_load(nlist, &nlines, ctdb); + if (lines == NULL) { + ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist); + return -1; + } + + for (i=0;i<nlines;i++) { + if (ctdb_add_node(ctdb, lines[i]) != 0) { + talloc_free(lines); + return -1; + } + } + + talloc_free(lines); + return 0; +} + +/* + setup the local node address +*/ +int ctdb_set_address(struct ctdb_context *ctdb, const char *address) +{ + if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) { + return -1; + } + + ctdb->name = talloc_asprintf(ctdb, "%s:%u", + ctdb->address.address, + ctdb->address.port); + return 0; +} + +/* + add a node to the list of active nodes +*/ +int ctdb_set_call(struct ctdb_context *ctdb, ctdb_fn_t fn, int id) +{ + struct ctdb_registered_call *call; + + call = talloc(ctdb, struct ctdb_registered_call); + call->fn = fn; + call->id = id; + + DLIST_ADD(ctdb->calls, call); + return 0; +} + +/* + return the vnn of this node +*/ +uint32_t ctdb_get_vnn(struct ctdb_context *ctdb) +{ + return ctdb->vnn; +} + +/* + start the protocol going +*/ +int ctdb_start(struct ctdb_context *ctdb) +{ + return ctdb->methods->start(ctdb); +} + +/* + called by the transport layer when a packet comes in +*/ +static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length) +{ + struct ctdb_req_header *hdr; + if (length < sizeof(*hdr)) { + ctdb_set_error(ctdb, "Bad packet length %d\n", length); + return; + } + hdr = (struct ctdb_req_header *)data; + if (length != hdr->length) { + ctdb_set_error(ctdb, "Bad header length %d expected %d\n", + hdr->length, length); + return; + } + + DEBUG(0,("got ctdb op %d reqid %d\n", hdr->operation, hdr->reqid)); + + switch (hdr->operation) { + case CTDB_REQ_CALL: + ctdb_request_call(ctdb, hdr); + break; + + case CTDB_REPLY_CALL: + ctdb_reply_call(ctdb, hdr); + break; + + case CTDB_REPLY_ERROR: + ctdb_reply_error(ctdb, hdr); + break; + + case CTDB_REPLY_REDIRECT: + ctdb_reply_redirect(ctdb, hdr); + break; + + case CTDB_REQ_DMASTER: + ctdb_request_dmaster(ctdb, hdr); + break; + + case CTDB_REPLY_DMASTER: + ctdb_reply_dmaster(ctdb, hdr); + break; + + default: + printf("Packet with unknown operation %d\n", hdr->operation); + talloc_free(hdr); + break; + } +} + +/* + called by the transport layer when a node is dead +*/ +static void ctdb_node_dead(struct ctdb_node *node) +{ + node->ctdb->num_connected--; + printf("%s: node %s is dead: %d connected\n", + node->ctdb->name, node->name, node->ctdb->num_connected); +} + +/* + called by the transport layer when a node is dead +*/ +static void ctdb_node_connected(struct ctdb_node *node) +{ + node->ctdb->num_connected++; + printf("%s: connected to %s - %d connected\n", + node->ctdb->name, node->name, node->ctdb->num_connected); +} + +/* + wait for all nodes to be connected +*/ +void ctdb_connect_wait(struct ctdb_context *ctdb) +{ + int expected = ctdb->num_nodes - 1; + if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) { + expected++; + } + while (ctdb->num_connected != expected) { + event_loop_once(ctdb->ev); + } +} + +/* + wait until we're the only node left +*/ +void ctdb_wait_loop(struct ctdb_context *ctdb) +{ + int expected = 0; + if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) { + expected++; + } + while (ctdb->num_connected > expected) { + event_loop_once(ctdb->ev); + } +} + +static const struct ctdb_upcalls ctdb_upcalls = { + .recv_pkt = ctdb_recv_pkt, + .node_dead = ctdb_node_dead, + .node_connected = ctdb_node_connected +}; + +/* + initialise the ctdb daemon. + + NOTE: In current code the daemon does not fork. This is for testing purposes only + and to simplify the code. +*/ +struct ctdb_context *ctdb_init(struct event_context *ev) +{ + struct ctdb_context *ctdb; + + ctdb = talloc_zero(ev, struct ctdb_context); + ctdb->ev = ev; + ctdb->upcalls = &ctdb_upcalls; + ctdb->idr = idr_init(ctdb); + + return ctdb; +} + diff --git a/source4/cluster/ctdb/common/ctdb_call.c b/source4/cluster/ctdb/common/ctdb_call.c new file mode 100644 index 0000000000..2bedccc86a --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb_call.c @@ -0,0 +1,653 @@ +/* + ctdb_call protocol code + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + see http://wiki.samba.org/index.php/Samba_%26_Clustering for + protocol design and packet details +*/ +#include "includes.h" +#include "lib/events/events.h" +#include "lib/tdb/include/tdb.h" +#include "system/network.h" +#include "system/filesys.h" +#include "cluster/ctdb/include/ctdb_private.h" + + +/* + queue a packet or die +*/ +static void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_node *node; + DEBUG(0,("queueing destnode=%u srcnode=%u\n", hdr->destnode, hdr->srcnode)); + node = ctdb->nodes[hdr->destnode]; + if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) { + ctdb_fatal(ctdb, "Unable to queue packet\n"); + } +} + + +/* + local version of ctdb_call +*/ +static int ctdb_call_local(struct ctdb_context *ctdb, TDB_DATA key, + struct ctdb_ltdb_header *header, TDB_DATA *data, + int call_id, TDB_DATA *call_data, TDB_DATA *reply_data, + uint32_t caller) +{ + struct ctdb_call *c; + struct ctdb_registered_call *fn; + + c = talloc(ctdb, struct ctdb_call); + CTDB_NO_MEMORY(ctdb, c); + + c->key = key; + c->call_data = call_data; + c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize); + c->record_data.dsize = data->dsize; + CTDB_NO_MEMORY(ctdb, c->record_data.dptr); + c->new_data = NULL; + c->reply_data = NULL; + + for (fn=ctdb->calls;fn;fn=fn->next) { + if (fn->id == call_id) break; + } + if (fn == NULL) { + ctdb_set_error(ctdb, "Unknown call id %u\n", call_id); + return -1; + } + + if (fn->fn(c) != 0) { + ctdb_set_error(ctdb, "ctdb_call %u failed\n", call_id); + return -1; + } + + if (header->laccessor != caller) { + header->lacount = 0; + } + header->laccessor = caller; + header->lacount++; + + /* we need to force the record to be written out if this was a remote access, + so that the lacount is updated */ + if (c->new_data == NULL && header->laccessor != ctdb->vnn) { + c->new_data = &c->record_data; + } + + if (c->new_data) { + if (ctdb_ltdb_store(ctdb, key, header, *c->new_data) != 0) { + ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n"); + return -1; + } + } + + if (reply_data) { + if (c->reply_data) { + *reply_data = *c->reply_data; + talloc_steal(ctdb, reply_data->dptr); + } else { + reply_data->dptr = NULL; + reply_data->dsize = 0; + } + } + + talloc_free(c); + + return 0; +} + +/* + send an error reply +*/ +static void ctdb_send_error(struct ctdb_context *ctdb, + struct ctdb_req_header *hdr, uint32_t status, + const char *fmt, ...) +{ + va_list ap; + struct ctdb_reply_error *r; + char *msg; + int len; + + va_start(ap, fmt); + msg = talloc_vasprintf(ctdb, fmt, ap); + if (msg == NULL) { + ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n"); + } + va_end(ap); + + len = strlen(msg)+1; + r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + len); + CTDB_NO_MEMORY_FATAL(ctdb, r); + r->hdr.length = sizeof(*r) + len; + r->hdr.operation = CTDB_REPLY_ERROR; + r->hdr.destnode = hdr->srcnode; + r->hdr.srcnode = ctdb->vnn; + r->hdr.reqid = hdr->reqid; + r->status = status; + r->msglen = len; + memcpy(&r->msg[0], msg, len); + + talloc_free(msg); + + ctdb_queue_packet(ctdb, &r->hdr); + + talloc_free(r); +} + + +/* + send a redirect reply +*/ +static void ctdb_call_send_redirect(struct ctdb_context *ctdb, + struct ctdb_req_call *c, + struct ctdb_ltdb_header *header) +{ + struct ctdb_reply_redirect *r; + + r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r)); + CTDB_NO_MEMORY_FATAL(ctdb, r); + r->hdr.length = sizeof(*r); + r->hdr.operation = CTDB_REPLY_REDIRECT; + r->hdr.destnode = c->hdr.srcnode; + r->hdr.srcnode = ctdb->vnn; + r->hdr.reqid = c->hdr.reqid; + r->dmaster = header->dmaster; + + ctdb_queue_packet(ctdb, &r->hdr); + + talloc_free(r); +} + +/* + send a dmaster request (give another node the dmaster for a record) + + This is always sent to the lmaster, which ensures that the lmaster + always knows who the dmaster is. The lmaster will then send a + CTDB_REPLY_DMASTER to the new dmaster +*/ +static void ctdb_call_send_dmaster(struct ctdb_context *ctdb, + struct ctdb_req_call *c, + struct ctdb_ltdb_header *header, + TDB_DATA *key, TDB_DATA *data) +{ + struct ctdb_req_dmaster *r; + int len; + + len = sizeof(*r) + key->dsize + data->dsize; + r = ctdb->methods->allocate_pkt(ctdb, len); + CTDB_NO_MEMORY_FATAL(ctdb, r); + r->hdr.length = len; + r->hdr.operation = CTDB_REQ_DMASTER; + r->hdr.destnode = ctdb_lmaster(ctdb, key); + r->hdr.srcnode = ctdb->vnn; + r->hdr.reqid = c->hdr.reqid; + r->dmaster = header->laccessor; + r->keylen = key->dsize; + r->datalen = data->dsize; + memcpy(&r->data[0], key->dptr, key->dsize); + memcpy(&r->data[key->dsize], data->dptr, data->dsize); + + if (r->hdr.destnode == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { + /* we are the lmaster - don't send to ourselves */ + DEBUG(0,("XXXX local ctdb_req_dmaster\n")); + ctdb_request_dmaster(ctdb, &r->hdr); + } else { + ctdb_queue_packet(ctdb, &r->hdr); + + /* update the ltdb to record the new dmaster */ + header->dmaster = r->hdr.destnode; + ctdb_ltdb_store(ctdb, *key, header, *data); + } + + talloc_free(r); +} + + +/* + called when a CTDB_REQ_DMASTER packet comes in + + this comes into the lmaster for a record when the current dmaster + wants to give up the dmaster role and give it to someone else +*/ +void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr; + struct ctdb_reply_dmaster *r; + TDB_DATA key, data; + struct ctdb_ltdb_header header; + int ret; + + key.dptr = c->data; + key.dsize = c->keylen; + data.dptr = c->data + c->keylen; + data.dsize = c->datalen; + + DEBUG(0,("request dmaster reqid=%d\n", hdr->reqid)); + + /* fetch the current record */ + ret = ctdb_ltdb_fetch(ctdb, key, &header, &data); + if (ret != 0) { + ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record"); + return; + } + + { + int i, fd = open("/dev/null", O_WRONLY); + for (i=0;i<data.dsize;i++) { + write(fd, &data.dptr[i], 1); + } + close(fd); + } + + /* its a protocol error if the sending node is not the current dmaster */ + if (header.dmaster != hdr->srcnode) { + ctdb_fatal(ctdb, "dmaster request from non-master"); + return; + } + + DEBUG(0,("request dmaster reqid=%d %s\n", hdr->reqid, __location__)); + + header.dmaster = c->dmaster; + if (ctdb_ltdb_store(ctdb, key, &header, data) != 0) { + ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster"); + return; + } + + { + int i, fd = open("/dev/null", O_WRONLY); + for (i=0;i<data.dsize;i++) { + write(fd, &data.dptr[i], 1); + } + close(fd); + } + + /* send the CTDB_REPLY_DMASTER */ + r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + data.dsize); + CTDB_NO_MEMORY_FATAL(ctdb, r); + r->hdr.length = sizeof(*r) + data.dsize; + r->hdr.operation = CTDB_REPLY_DMASTER; + r->hdr.destnode = c->dmaster; + r->hdr.srcnode = ctdb->vnn; + r->hdr.reqid = hdr->reqid; + r->datalen = data.dsize; + memcpy(&r->data[0], data.dptr, data.dsize); + + { + int i, fd = open("/dev/null", O_WRONLY); + for (i=0;i<data.dsize;i++) { + write(fd, &data.dptr[i], 1); + } + close(fd); + } + + DEBUG(0,("request dmaster reqid=%d %s\n", hdr->reqid, __location__)); + + if (0 && r->hdr.destnode == r->hdr.srcnode) { + ctdb_reply_dmaster(ctdb, &r->hdr); + } else { + ctdb_queue_packet(ctdb, &r->hdr); + DEBUG(0,("request dmaster reqid=%d %s\n", hdr->reqid, __location__)); + + talloc_free(r); + } +} + + +/* + called when a CTDB_REQ_CALL packet comes in +*/ +void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_req_call *c = (struct ctdb_req_call *)hdr; + TDB_DATA key, data, call_data, reply_data; + struct ctdb_reply_call *r; + int ret; + struct ctdb_ltdb_header header; + + key.dptr = c->data; + key.dsize = c->keylen; + call_data.dptr = c->data + c->keylen; + call_data.dsize = c->calldatalen; + + /* determine if we are the dmaster for this key. This also + fetches the record data (if any), thus avoiding a 2nd fetch of the data + if the call will be answered locally */ + ret = ctdb_ltdb_fetch(ctdb, key, &header, &data); + if (ret != 0) { + ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call"); + return; + } + + /* if we are not the dmaster, then send a redirect to the + requesting node */ + if (header.dmaster != ctdb->vnn) { + ctdb_call_send_redirect(ctdb, c, &header); + talloc_free(data.dptr); + return; + } + + /* if this nodes has done enough consecutive calls on the same record + then give them the record */ + if (header.laccessor == c->hdr.srcnode && + header.lacount >= CTDB_MAX_LACOUNT) { + ctdb_call_send_dmaster(ctdb, c, &header, &key, &data); + talloc_free(data.dptr); + return; + } + + ctdb_call_local(ctdb, key, &header, &data, c->callid, + call_data.dsize?&call_data:NULL, + &reply_data, c->hdr.srcnode); + + r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + reply_data.dsize); + CTDB_NO_MEMORY_FATAL(ctdb, r); + r->hdr.length = sizeof(*r) + reply_data.dsize; + r->hdr.operation = CTDB_REPLY_CALL; + r->hdr.destnode = hdr->srcnode; + r->hdr.srcnode = hdr->destnode; + r->hdr.reqid = hdr->reqid; + r->datalen = reply_data.dsize; + memcpy(&r->data[0], reply_data.dptr, reply_data.dsize); + + ctdb_queue_packet(ctdb, &r->hdr); + + talloc_free(reply_data.dptr); + talloc_free(r); +} + +enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR}; + +/* + state of a in-progress ctdb call +*/ +struct ctdb_call_state { + enum call_state state; + struct ctdb_req_call *c; + struct ctdb_node *node; + const char *errmsg; + TDB_DATA call_data; + TDB_DATA reply_data; + TDB_DATA key; + int redirect_count; + struct ctdb_ltdb_header header; +}; + + +/* + called when a CTDB_REPLY_CALL packet comes in + + This packet comes in response to a CTDB_REQ_CALL request packet. It + contains any reply data freom the call +*/ +void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr; + struct ctdb_call_state *state; + TDB_DATA reply_data; + + state = idr_find(ctdb->idr, hdr->reqid); + + reply_data.dptr = c->data; + reply_data.dsize = c->datalen; + + state->reply_data = reply_data; + + talloc_steal(state, c); + + state->state = CTDB_CALL_DONE; +} + +/* + called when a CTDB_REPLY_DMASTER packet comes in + + This packet comes in from the lmaster response to a CTDB_REQ_CALL + request packet. It means that the current dmaster wants to give us + the dmaster role +*/ +void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr; + struct ctdb_call_state *state; + TDB_DATA data; + + state = idr_find(ctdb->idr, hdr->reqid); + + data.dptr = c->data; + data.dsize = c->datalen; + + talloc_steal(state, c); + + /* we're now the dmaster - update our local ltdb with new header + and data */ + state->header.dmaster = ctdb->vnn; + + if (ctdb_ltdb_store(ctdb, state->key, &state->header, data) != 0) { + ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n"); + return; + } + + ctdb_call_local(ctdb, state->key, &state->header, &data, state->c->callid, + state->call_data.dsize?&state->call_data:NULL, + &state->reply_data, ctdb->vnn); + + state->state = CTDB_CALL_DONE; +} + + +/* + called when a CTDB_REPLY_ERROR packet comes in +*/ +void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr; + struct ctdb_call_state *state; + + state = idr_find(ctdb->idr, hdr->reqid); + + talloc_steal(state, c); + + state->state = CTDB_CALL_ERROR; + state->errmsg = (char *)c->msg; +} + + +/* + called when a CTDB_REPLY_REDIRECT packet comes in + + This packet arrives when we have sent a CTDB_REQ_CALL request and + the node that received it is not the dmaster for the given key. We + are given a hint as to what node to try next. +*/ +void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr; + struct ctdb_call_state *state; + + state = idr_find(ctdb->idr, hdr->reqid); + + talloc_steal(state, c); + + /* don't allow for too many redirects */ + if (state->redirect_count++ == CTDB_MAX_REDIRECT) { + c->dmaster = ctdb_lmaster(ctdb, &state->key); + } + + /* send it off again */ + state->node = ctdb->nodes[c->dmaster]; + + ctdb_queue_packet(ctdb, &state->c->hdr); +} + +/* + destroy a ctdb_call +*/ +static int ctdb_call_destructor(struct ctdb_call_state *state) +{ +// idr_remove(state->node->ctdb->idr, state->c->hdr.reqid); + return 0; +} + + +/* + called when a ctdb_call times out +*/ +void ctdb_call_timeout(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private) +{ + struct ctdb_call_state *state = talloc_get_type(private, struct ctdb_call_state); + state->state = CTDB_CALL_ERROR; + ctdb_set_error(state->node->ctdb, "ctdb_call %u timed out", + state->c->hdr.reqid); +} + +/* + construct an event driven local ctdb_call + + this is used so that locally processed ctdb_call requests are processed + in an event driven manner +*/ +struct ctdb_call_state *ctdb_call_local_send(struct ctdb_context *ctdb, + TDB_DATA key, int call_id, + TDB_DATA *call_data, TDB_DATA *reply_data, + struct ctdb_ltdb_header *header, + TDB_DATA *data) +{ + struct ctdb_call_state *state; + int ret; + + state = talloc_zero(ctdb, struct ctdb_call_state); + CTDB_NO_MEMORY_NULL(ctdb, state); + + state->state = CTDB_CALL_DONE; + state->node = ctdb->nodes[ctdb->vnn]; + + ret = ctdb_call_local(ctdb, key, header, data, + call_id, call_data, &state->reply_data, + ctdb->vnn); + return state; +} + + +/* + make a remote ctdb call - async send + + This constructs a ctdb_call request and queues it for processing. + This call never blocks. +*/ +struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb, + TDB_DATA key, int call_id, + TDB_DATA *call_data, TDB_DATA *reply_data) +{ + uint32_t len; + struct ctdb_call_state *state; + int ret; + struct ctdb_ltdb_header header; + TDB_DATA data; + + /* + if we are the dmaster for this key then we don't need to + send it off at all, we can bypass the network and handle it + locally. To find out if we are the dmaster we need to look + in our ltdb + */ + ret = ctdb_ltdb_fetch(ctdb, key, &header, &data); + if (ret != 0) return NULL; + + if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { + return ctdb_call_local_send(ctdb, key, call_id, call_data, reply_data, + &header, &data); + } + + state = talloc_zero(ctdb, struct ctdb_call_state); + CTDB_NO_MEMORY_NULL(ctdb, state); + + len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0); + state->c = ctdb->methods->allocate_pkt(ctdb, len); + CTDB_NO_MEMORY_NULL(ctdb, state->c); + + state->c->hdr.length = len; + state->c->hdr.operation = CTDB_REQ_CALL; + state->c->hdr.destnode = header.dmaster; + state->c->hdr.srcnode = ctdb->vnn; + /* this limits us to 16k outstanding messages - not unreasonable */ + state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + DEBUG(0,("Allocate reqid %u\n", state->c->hdr.reqid)); + state->c->callid = call_id; + state->c->keylen = key.dsize; + state->c->calldatalen = call_data?call_data->dsize:0; + memcpy(&state->c->data[0], key.dptr, key.dsize); + if (call_data) { + memcpy(&state->c->data[key.dsize], call_data->dptr, call_data->dsize); + state->call_data.dptr = &state->c->data[key.dsize]; + state->call_data.dsize = call_data->dsize; + } + state->key.dptr = &state->c->data[0]; + state->key.dsize = key.dsize; + + state->node = ctdb->nodes[header.dmaster]; + state->state = CTDB_CALL_WAIT; + state->header = header; + + talloc_set_destructor(state, ctdb_call_destructor); + + ctdb_queue_packet(ctdb, &state->c->hdr); + + event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), + ctdb_call_timeout, state); + return state; +} + + +/* + make a remote ctdb call - async recv. + + This is called when the program wants to wait for a ctdb_call to complete and get the + results. This call will block unless the call has already completed. +*/ +int ctdb_call_recv(struct ctdb_call_state *state, TDB_DATA *reply_data) +{ + while (state->state < CTDB_CALL_DONE) { + event_loop_once(state->node->ctdb->ev); + } + if (state->state != CTDB_CALL_DONE) { + ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + talloc_free(state); + return -1; + } + if (reply_data) { + reply_data->dptr = talloc_memdup(state->node->ctdb, + state->reply_data.dptr, + state->reply_data.dsize); + reply_data->dsize = state->reply_data.dsize; + } + talloc_free(state); + return 0; +} + +/* + full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv() +*/ +int ctdb_call(struct ctdb_context *ctdb, + TDB_DATA key, int call_id, + TDB_DATA *call_data, TDB_DATA *reply_data) +{ + struct ctdb_call_state *state; + state = ctdb_call_send(ctdb, key, call_id, call_data, reply_data); + return ctdb_call_recv(state, reply_data); +} diff --git a/source4/cluster/ctdb/common/ctdb_ltdb.c b/source4/cluster/ctdb/common/ctdb_ltdb.c new file mode 100644 index 0000000000..bc15a3e898 --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb_ltdb.c @@ -0,0 +1,139 @@ +/* + ctdb ltdb code + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "lib/tdb/include/tdb.h" +#include "system/network.h" +#include "system/filesys.h" +#include "cluster/ctdb/include/ctdb_private.h" + +/* + attach to a specific database +*/ +int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags, + int open_flags, mode_t mode) +{ + /* when we have a separate daemon this will need to be a real + file, not a TDB_INTERNAL, so the parent can access it to + for ltdb bypass */ + ctdb->ltdb = tdb_open(name, 0, /* tdb_flags */ TDB_INTERNAL, open_flags, mode); + if (ctdb->ltdb == NULL) { + ctdb_set_error(ctdb, "Failed to open tdb %s\n", name); + return -1; + } + return 0; +} + +/* + return the lmaster given a key +*/ +uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key) +{ + return ctdb_hash(key) % ctdb->num_nodes; +} + + +/* + construct an initial header for a record with no ltdb header yet +*/ +static void ltdb_initial_header(struct ctdb_context *ctdb, + TDB_DATA key, + struct ctdb_ltdb_header *header) +{ + header->rsn = 0; + /* initial dmaster is the lmaster */ + header->dmaster = ctdb_lmaster(ctdb, &key); + header->laccessor = header->dmaster; + header->lacount = 0; +} + + +/* + fetch a record from the ltdb, separating out the header information + and returning the body of the record. A valid (initial) header is + returned if the record is not present +*/ +int ctdb_ltdb_fetch(struct ctdb_context *ctdb, + TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA *data) +{ + TDB_DATA rec; + + rec = tdb_fetch(ctdb->ltdb, key); + if (rec.dsize < sizeof(*header)) { + /* return an initial header */ + free(rec.dptr); + ltdb_initial_header(ctdb, key, header); + data->dptr = NULL; + data->dsize = 0; + return 0; + } + + *header = *(struct ctdb_ltdb_header *)rec.dptr; + + data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header); + data->dptr = talloc_memdup(ctdb, sizeof(struct ctdb_ltdb_header)+rec.dptr, + data->dsize); + free(rec.dptr); + CTDB_NO_MEMORY(ctdb, data->dptr); + + { + int i, fd = open("/dev/null", O_WRONLY); + for (i=0;i<data->dsize;i++) { + write(fd, &data->dptr[i], 1); + } + close(fd); + } + + return 0; +} + + +/* + fetch a record from the ltdb, separating out the header information + and returning the body of the record. A valid (initial) header is + returned if the record is not present +*/ +int ctdb_ltdb_store(struct ctdb_context *ctdb, TDB_DATA key, + struct ctdb_ltdb_header *header, TDB_DATA data) +{ + TDB_DATA rec; + int ret; + + rec.dsize = sizeof(*header) + data.dsize; + rec.dptr = talloc_size(ctdb, rec.dsize); + CTDB_NO_MEMORY(ctdb, rec.dptr); + + memcpy(rec.dptr, header, sizeof(*header)); + memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize); + + { + int i, fd = open("/dev/null", O_WRONLY); + for (i=0;i<rec.dsize;i++) { + write(fd, &rec.dptr[i], 1); + } + close(fd); + } + + ret = tdb_store(ctdb->ltdb, key, rec, TDB_REPLACE); + talloc_free(rec.dptr); + + return ret; +} diff --git a/source4/cluster/ctdb/common/ctdb_util.c b/source4/cluster/ctdb/common/ctdb_util.c new file mode 100644 index 0000000000..8e25759609 --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb_util.c @@ -0,0 +1,103 @@ +/* + ctdb utility code + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "lib/tdb/include/tdb.h" +#include "system/network.h" +#include "system/filesys.h" +#include "cluster/ctdb/include/ctdb_private.h" + +/* + return error string for last error +*/ +const char *ctdb_errstr(struct ctdb_context *ctdb) +{ + return ctdb->err_msg; +} + + +/* + remember an error message +*/ +void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...) +{ + va_list ap; + talloc_free(ctdb->err_msg); + va_start(ap, fmt); + ctdb->err_msg = talloc_vasprintf(ctdb, fmt, ap); + DEBUG(0,("ctdb error: %s\n", ctdb->err_msg)); + va_end(ap); +} + + +/* + a fatal internal error occurred - no hope for recovery +*/ +void ctdb_fatal(struct ctdb_context *ctdb, const char *msg) +{ + DEBUG(0,("ctdb fatal error: %s\n", msg)); + fprintf(stderr, "ctdb fatal error: '%s'\n", msg); + abort(); +} + +/* + parse a IP:port pair +*/ +int ctdb_parse_address(struct ctdb_context *ctdb, + TALLOC_CTX *mem_ctx, const char *str, + struct ctdb_address *address) +{ + char *p; + p = strchr(str, ':'); + if (p == NULL) { + ctdb_set_error(ctdb, "Badly formed node '%s'\n", str); + return -1; + } + + address->address = talloc_strndup(mem_ctx, str, p-str); + address->port = strtoul(p+1, NULL, 0); + return 0; +} + + +/* + check if two addresses are the same +*/ +bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2) +{ + return strcmp(a1->address, a2->address) == 0 && a1->port == a2->port; +} + + +/* + hash function for mapping data to a VNN - taken from tdb +*/ +uint32_t ctdb_hash(const TDB_DATA *key) +{ + uint32_t value; /* Used to compute the hash value. */ + uint32_t i; /* Used to cycle through random values. */ + + /* Set the initial value from the key size. */ + for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++) + value = (value + (key->dptr[i] << (i*5 % 24))); + + return (1103515243 * value + 12345); +} |