diff options
Diffstat (limited to 'source4/cluster/ctdb/common')
-rw-r--r-- | source4/cluster/ctdb/common/ctdb.c | 26 | ||||
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_call.c | 151 | ||||
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_client.c | 662 | ||||
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_daemon.c | 631 | ||||
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_io.c | 303 | ||||
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_ltdb.c | 18 | ||||
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_message.c | 132 |
7 files changed, 1853 insertions, 70 deletions
diff --git a/source4/cluster/ctdb/common/ctdb.c b/source4/cluster/ctdb/common/ctdb.c index b98c0a3d84..8a8d52f3f1 100644 --- a/source4/cluster/ctdb/common/ctdb.c +++ b/source4/cluster/ctdb/common/ctdb.c @@ -58,6 +58,14 @@ void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags) } /* + clear some ctdb flags +*/ +void ctdb_clear_flags(struct ctdb_context *ctdb, unsigned flags) +{ + ctdb->flags &= ~flags; +} + +/* set max acess count before a dmaster migration */ void ctdb_set_max_lacount(struct ctdb_context *ctdb, unsigned count) @@ -180,14 +188,6 @@ uint32_t ctdb_get_num_nodes(struct ctdb_context *ctdb) /* - start the protocol going -*/ -int ctdb_start(struct ctdb_context *ctdb) -{ - return ctdb->methods->start(ctdb); -} - -/* called by the transport layer when a packet comes in */ static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length) @@ -274,7 +274,7 @@ static void ctdb_node_connected(struct ctdb_node *node) /* wait for all nodes to be connected */ -void ctdb_connect_wait(struct ctdb_context *ctdb) +void ctdb_daemon_connect_wait(struct ctdb_context *ctdb) { int expected = ctdb->num_nodes - 1; if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) { @@ -338,3 +338,11 @@ struct ctdb_context *ctdb_init(struct event_context *ev) return ctdb; } +int ctdb_start(struct ctdb_context *ctdb) +{ + if (ctdb->flags&CTDB_FLAG_DAEMON_MODE) { + return ctdbd_start(ctdb); + } + + return ctdb->methods->start(ctdb); +} diff --git a/source4/cluster/ctdb/common/ctdb_call.c b/source4/cluster/ctdb/common/ctdb_call.c index 77ec872852..ab5c2cce3b 100644 --- a/source4/cluster/ctdb/common/ctdb_call.c +++ b/source4/cluster/ctdb/common/ctdb_call.c @@ -29,6 +29,22 @@ #include "../include/ctdb_private.h" /* + find the ctdb_db from a db index + */ + struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id) +{ + struct ctdb_db_context *ctdb_db; + + for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) { + if (ctdb_db->db_id == id) { + break; + } + } + return ctdb_db; +} + + +/* local version of ctdb_call */ static int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call, @@ -38,7 +54,7 @@ static int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *ca struct ctdb_call_info *c; struct ctdb_registered_call *fn; struct ctdb_context *ctdb = ctdb_db->ctdb; - + c = talloc(ctdb, struct ctdb_call_info); CTDB_NO_MEMORY(ctdb, c); @@ -242,13 +258,11 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr data.dptr = c->data + c->keylen; data.dsize = c->datalen; - for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) { - if (ctdb_db->db_id == c->db_id) { - break; - } - } + ctdb_db = find_ctdb_db(ctdb, c->db_id); if (!ctdb_db) { - ctdb_send_error(ctdb, hdr, ret, "Unknown database in request. db_id==0x%08x",c->db_id); + ctdb_send_error(ctdb, hdr, -1, + "Unknown database in request. db_id==0x%08x", + c->db_id); return; } @@ -309,13 +323,11 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) struct ctdb_call call; struct ctdb_db_context *ctdb_db; - for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) { - if (ctdb_db->db_id == c->db_id) { - break; - } - } + ctdb_db = find_ctdb_db(ctdb, c->db_id); if (!ctdb_db) { - ctdb_send_error(ctdb, hdr, ret, "Unknown database in request. db_id==0x%08x",c->db_id); + ctdb_send_error(ctdb, hdr, -1, + "Unknown database in request. db_id==0x%08x", + c->db_id); return; } @@ -380,24 +392,6 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) talloc_free(r); } -enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR}; - -/* - state of a in-progress ctdb call -*/ -struct ctdb_call_state { - enum call_state state; - struct ctdb_req_call *c; - struct ctdb_db_context *ctdb_db; - struct ctdb_node *node; - const char *errmsg; - struct ctdb_call call; - int redirect_count; - struct ctdb_ltdb_header header; - void *fetch_private; -}; - - /* called when a CTDB_REPLY_CALL packet comes in @@ -418,7 +412,14 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) talloc_steal(state, c); + /* get an extra reference here - this prevents the free in ctdb_recv_pkt() + from freeing the data */ + (void)talloc_reference(state, c); + state->state = CTDB_CALL_DONE; + if (state->async.fn) { + state->async.fn(state); + } } /* @@ -458,6 +459,9 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn); state->state = CTDB_CALL_DONE; + if (state->async.fn) { + state->async.fn(state); + } } @@ -476,6 +480,9 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) state->state = CTDB_CALL_ERROR; state->errmsg = (char *)c->msg; + if (state->async.fn) { + state->async.fn(state); + } } @@ -521,15 +528,31 @@ static int ctdb_call_destructor(struct ctdb_call_state *state) called when a ctdb_call times out */ void ctdb_call_timeout(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private) + struct timeval t, void *private_data) { - struct ctdb_call_state *state = talloc_get_type(private, struct ctdb_call_state); + struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state); state->state = CTDB_CALL_ERROR; ctdb_set_error(state->node->ctdb, "ctdb_call %u timed out", state->c->hdr.reqid); + if (state->async.fn) { + state->async.fn(state); + } } /* + this allows the caller to setup a async.fn +*/ +static void call_local_trigger(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state); + if (state->async.fn) { + state->async.fn(state); + } +} + + +/* construct an event driven local ctdb_call this is used so that locally processed ctdb_call requests are processed @@ -556,17 +579,20 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn); + event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state); + return state; } /* - make a remote ctdb call - async send + make a remote ctdb call - async send. Called in daemon context. This constructs a ctdb_call request and queues it for processing. This call never blocks. */ -struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) +static struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call) { uint32_t len; struct ctdb_call_state *state; @@ -633,21 +659,27 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct c return state; } +/* + make a remote ctdb call - async send - -struct ctdb_record_handle { - struct ctdb_db_context *ctdb_db; - TDB_DATA key; - TDB_DATA *data; -}; + This constructs a ctdb_call request and queues it for processing. + This call never blocks. +*/ +struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) +{ + if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { + return ctdb_client_call_send(ctdb_db, call); + } + return ctdb_daemon_call_send(ctdb_db, call); +} /* - make a remote ctdb call - async recv. + make a remote ctdb call - async recv - called in daemon context This is called when the program wants to wait for a ctdb_call to complete and get the results. This call will block unless the call has already completed. */ -int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) +static int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) { struct ctdb_record_handle *rec; @@ -684,21 +716,34 @@ int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) return 0; } + +/* + make a remote ctdb call - async recv. + + This is called when the program wants to wait for a ctdb_call to complete and get the + results. This call will block unless the call has already completed. +*/ +int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) +{ + if (state->ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { + return ctdb_client_call_recv(state, call); + } + return ctdb_daemon_call_recv(state, call); +} + /* full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv() */ int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) { struct ctdb_call_state *state; + state = ctdb_call_send(ctdb_db, call); return ctdb_call_recv(state, call); } - - - struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data) { @@ -707,6 +752,10 @@ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALL struct ctdb_call_state *state; int ret; + if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { + return ctdb_client_fetch_lock(ctdb_db, mem_ctx, key, data); + } + ZERO_STRUCT(call); call.call_id = CTDB_FETCH_FUNC; call.key = key; @@ -733,19 +782,27 @@ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALL } -int ctdb_record_store(struct ctdb_record_handle *rec, TDB_DATA data) +int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data) { int ret; struct ctdb_ltdb_header header; + struct ctdb_db_context *ctdb_db = talloc_get_type(rec->ctdb_db, struct ctdb_db_context); + + if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { + return ctdb_client_store_unlock(rec, data); + } /* should be avoided if possible hang header off rec ? */ ret = ctdb_ltdb_fetch(rec->ctdb_db, rec->key, &header, NULL, NULL); if (ret) { ctdb_set_error(rec->ctdb_db->ctdb, "Fetch of locally held record failed"); + talloc_free(rec); return ret; } ret = ctdb_ltdb_store(rec->ctdb_db, rec->key, &header, data); + talloc_free(rec); + return ret; } diff --git a/source4/cluster/ctdb/common/ctdb_client.c b/source4/cluster/ctdb/common/ctdb_client.c new file mode 100644 index 0000000000..3cb27a1165 --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb_client.c @@ -0,0 +1,662 @@ +/* + ctdb daemon code + + Copyright (C) Andrew Tridgell 2007 + Copyright (C) Ronnie Sahlberg 2007 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "db_wrap.h" +#include "lib/tdb/include/tdb.h" +#include "lib/events/events.h" +#include "lib/util/dlinklist.h" +#include "system/network.h" +#include "system/filesys.h" +#include "../include/ctdb.h" +#include "../include/ctdb_private.h" + +/* + queue a packet for sending from client to daemon +*/ +static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length); +} + + +/* + handle a connect wait reply packet + */ +static void ctdb_reply_connect_wait(struct ctdb_context *ctdb, + struct ctdb_req_header *hdr) +{ + struct ctdb_reply_connect_wait *r = (struct ctdb_reply_connect_wait *)hdr; + ctdb->num_connected = r->num_connected; +} + +/* + called in the client when we receive a CTDB_REPLY_FETCH_LOCK from the daemon + + This packet comes in response to a CTDB_REQ_FETCH_LOCK request packet. It + contains any reply data from the call +*/ +void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_fetch_lock *c = (struct ctdb_reply_fetch_lock *)hdr; + struct ctdb_call_state *state; + + state = idr_find(ctdb->idr, hdr->reqid); + if (state == NULL) return; + + state->call.reply_data.dptr = c->data; + state->call.reply_data.dsize = c->datalen; + state->call.status = c->state; + + talloc_steal(state, c); + + /* get an extra reference here - this prevents the free in ctdb_recv_pkt() + from freeing the data */ + (void)talloc_reference(state, c); + + state->state = CTDB_CALL_DONE; + if (state->async.fn) { + state->async.fn(state); + } +} + +/* + called in the client when we receive a CTDB_REPLY_STORE_UNLOCK from the daemon + + This packet comes in response to a CTDB_REQ_STORE_UNLOCK request packet. It + contains any reply data from the call +*/ +void ctdb_reply_store_unlock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_store_unlock *c = (struct ctdb_reply_store_unlock *)hdr; + struct ctdb_call_state *state; + + state = idr_find(ctdb->idr, hdr->reqid); + if (state == NULL) return; + + state->call.status = c->state; + + talloc_steal(state, c); + + /* get an extra reference here - this prevents the free in ctdb_recv_pkt() + from freeing the data */ + (void)talloc_reference(state, c); + + state->state = CTDB_CALL_DONE; + if (state->async.fn) { + state->async.fn(state); + } +} +/* + this is called in the client, when data comes in from the daemon + */ +static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) +{ + struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context); + struct ctdb_req_header *hdr; + + if (cnt < sizeof(*hdr)) { + ctdb_set_error(ctdb, "Bad packet length %d\n", cnt); + return; + } + hdr = (struct ctdb_req_header *)data; + if (cnt != hdr->length) { + ctdb_set_error(ctdb, "Bad header length %d expected %d\n", + hdr->length, cnt); + return; + } + + if (hdr->ctdb_magic != CTDB_MAGIC) { + ctdb_set_error(ctdb, "Non CTDB packet rejected\n"); + return; + } + + if (hdr->ctdb_version != CTDB_VERSION) { + ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); + return; + } + + switch (hdr->operation) { + case CTDB_REPLY_CALL: + ctdb_reply_call(ctdb, hdr); + break; + + case CTDB_REQ_MESSAGE: + ctdb_request_message(ctdb, hdr); + break; + + case CTDB_REPLY_CONNECT_WAIT: + ctdb_reply_connect_wait(ctdb, hdr); + break; + + case CTDB_REPLY_FETCH_LOCK: + ctdb_reply_fetch_lock(ctdb, hdr); + break; + + case CTDB_REPLY_STORE_UNLOCK: + ctdb_reply_store_unlock(ctdb, hdr); + break; + + default: + printf("bogus operation code:%d\n",hdr->operation); + } +} + +/* + connect to a unix domain socket +*/ +static int ux_socket_connect(struct ctdb_context *ctdb) +{ + struct sockaddr_un addr; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)); + + ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0); + if (ctdb->daemon.sd == -1) { + return -1; + } + + if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + close(ctdb->daemon.sd); + ctdb->daemon.sd = -1; + return -1; + } + + ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, + CTDB_DS_ALIGNMENT, + ctdb_client_read_cb, ctdb); + return 0; +} + + + +/* + make a recv call to the local ctdb daemon - called from client context + + This is called when the program wants to wait for a ctdb_call to complete and get the + results. This call will block unless the call has already completed. +*/ +int ctdb_client_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) +{ + struct ctdb_record_handle *rec; + + while (state->state < CTDB_CALL_DONE) { + event_loop_once(state->node->ctdb->ev); + } + if (state->state != CTDB_CALL_DONE) { + ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + talloc_free(state); + return -1; + } + + rec = state->fetch_private; + + /* ugly hack to manage forced migration */ + if (rec != NULL) { + rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr); + rec->data->dsize = state->call.reply_data.dsize; + talloc_free(state); + return 0; + } + + if (state->call.reply_data.dsize) { + call->reply_data.dptr = talloc_memdup(state->node->ctdb, + state->call.reply_data.dptr, + state->call.reply_data.dsize); + call->reply_data.dsize = state->call.reply_data.dsize; + } else { + call->reply_data.dptr = NULL; + call->reply_data.dsize = 0; + } + call->status = state->call.status; + talloc_free(state); + + return 0; +} + + + + +/* + destroy a ctdb_call in client +*/ +static int ctdb_client_call_destructor(struct ctdb_call_state *state) +{ + idr_remove(state->node->ctdb->idr, state->c->hdr.reqid); + return 0; +} + + + +/* + make a ctdb call to the local daemon - async send. Called from client context. + + This constructs a ctdb_call request and queues it for processing. + This call never blocks. +*/ +struct ctdb_call_state *ctdb_client_call_send(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call) +{ + struct ctdb_call_state *state; + struct ctdb_context *ctdb = ctdb_db->ctdb; + struct ctdb_ltdb_header header; + TDB_DATA data; + int ret; + size_t len; + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ux_socket_connect(ctdb); + } + + ret = ctdb_ltdb_lock(ctdb_db, call->key); + if (ret != 0) { + printf("failed to lock ltdb record\n"); + return NULL; + } + + ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data); + if (ret != 0) { + ctdb_ltdb_unlock(ctdb_db, call->key); + return NULL; + } + +#if 0 + if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { + state = ctdb_call_local_send(ctdb_db, call, &header, &data); + ctdb_ltdb_unlock(ctdb_db, call->key); + return state; + } +#endif + + state = talloc_zero(ctdb_db, struct ctdb_call_state); + if (state == NULL) { + printf("failed to allocate state\n"); + ctdb_ltdb_unlock(ctdb_db, call->key); + return NULL; + } + + talloc_steal(state, data.dptr); + + len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize; + state->c = ctdbd_allocate_pkt(ctdb, len); + if (state->c == NULL) { + printf("failed to allocate packet\n"); + ctdb_ltdb_unlock(ctdb_db, call->key); + return NULL; + } + talloc_set_name_const(state->c, "ctdbd req_call packet"); + talloc_steal(state, state->c); + + state->c->hdr.length = len; + state->c->hdr.ctdb_magic = CTDB_MAGIC; + state->c->hdr.ctdb_version = CTDB_VERSION; + state->c->hdr.operation = CTDB_REQ_CALL; + state->c->hdr.destnode = header.dmaster; + state->c->hdr.srcnode = ctdb->vnn; + /* this limits us to 16k outstanding messages - not unreasonable */ + state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + state->c->flags = call->flags; + state->c->db_id = ctdb_db->db_id; + state->c->callid = call->call_id; + state->c->keylen = call->key.dsize; + state->c->calldatalen = call->call_data.dsize; + memcpy(&state->c->data[0], call->key.dptr, call->key.dsize); + memcpy(&state->c->data[call->key.dsize], + call->call_data.dptr, call->call_data.dsize); + state->call = *call; + state->call.call_data.dptr = &state->c->data[call->key.dsize]; + state->call.key.dptr = &state->c->data[0]; + + state->node = ctdb->nodes[header.dmaster]; + state->state = CTDB_CALL_WAIT; + state->header = header; + state->ctdb_db = ctdb_db; + + talloc_set_destructor(state, ctdb_client_call_destructor); + + ctdb_client_queue_pkt(ctdb, &state->c->hdr); + +/*XXX set up timeout to cleanup if server doesnt respond + event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), + ctdb_call_timeout, state); +*/ + + ctdb_ltdb_unlock(ctdb_db, call->key); + return state; +} + + + +/* + tell the daemon what messaging srvid we will use, and register the message + handler function in the client +*/ +int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, + ctdb_message_fn_t handler, + void *private_data) + +{ + struct ctdb_req_register c; + int res; + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ux_socket_connect(ctdb); + } + + ZERO_STRUCT(c); + + c.hdr.length = sizeof(c); + c.hdr.ctdb_magic = CTDB_MAGIC; + c.hdr.ctdb_version = CTDB_VERSION; + c.hdr.operation = CTDB_REQ_REGISTER; + c.srvid = srvid; + + res = ctdb_client_queue_pkt(ctdb, &c.hdr); + if (res != 0) { + return res; + } + + /* also need to register the handler with our ctdb structure */ + return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data); +} + + + +/* + setup handler for receipt of ctdb messages from ctdb_send_message() +*/ +int ctdb_set_message_handler(struct ctdb_context *ctdb, + uint32_t srvid, + ctdb_message_fn_t handler, + void *private_data) +{ + if (ctdb->flags & CTDB_FLAG_DAEMON_MODE) { + return ctdb_client_set_message_handler(ctdb, srvid, handler, private_data); + } + return ctdb_daemon_set_message_handler(ctdb, srvid, handler, private_data); +} + + +/* + send a message - from client context + */ +int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn, + uint32_t srvid, TDB_DATA data) +{ + struct ctdb_req_message *r; + int len, res; + + len = offsetof(struct ctdb_req_message, data) + data.dsize; + r = ctdb->methods->allocate_pkt(ctdb, len); + CTDB_NO_MEMORY(ctdb, r); + talloc_set_name_const(r, "req_message packet"); + + r->hdr.length = len; + r->hdr.ctdb_magic = CTDB_MAGIC; + r->hdr.ctdb_version = CTDB_VERSION; + r->hdr.operation = CTDB_REQ_MESSAGE; + r->hdr.destnode = vnn; + r->hdr.srcnode = ctdb->vnn; + r->hdr.reqid = 0; + r->srvid = srvid; + r->datalen = data.dsize; + memcpy(&r->data[0], data.dptr, data.dsize); + + res = ctdb_client_queue_pkt(ctdb, &r->hdr); + if (res != 0) { + return res; + } + + talloc_free(r); + return 0; +} + +/* + wait for all nodes to be connected - from client + */ +static void ctdb_client_connect_wait(struct ctdb_context *ctdb) +{ + struct ctdb_req_connect_wait r; + int res; + + ZERO_STRUCT(r); + + r.hdr.length = sizeof(r); + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.operation = CTDB_REQ_CONNECT_WAIT; + + res = ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)&r.hdr, r.hdr.length); + if (res != 0) { + printf("Failed to queue a connect wait request\n"); + return; + } + + /* now we can go into the normal wait routine, as the reply packet + will update the ctdb->num_connected variable */ + ctdb_daemon_connect_wait(ctdb); +} + +/* + wait for all nodes to be connected +*/ +void ctdb_connect_wait(struct ctdb_context *ctdb) +{ + if (!(ctdb->flags & CTDB_FLAG_DAEMON_MODE)) { + ctdb_daemon_connect_wait(ctdb); + return; + } + + ctdb_client_connect_wait(ctdb); +} + + +struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db, + TALLOC_CTX *mem_ctx, + TDB_DATA key) +{ + struct ctdb_call_state *state; + struct ctdb_context *ctdb = ctdb_db->ctdb; + struct ctdb_req_fetch_lock *req; + int len, res; + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ux_socket_connect(ctdb); + } + + state = talloc_zero(ctdb_db, struct ctdb_call_state); + if (state == NULL) { + printf("failed to allocate state\n"); + return NULL; + } + state->state = CTDB_CALL_WAIT; + state->ctdb_db = ctdb_db; + len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize; + state->c = ctdbd_allocate_pkt(ctdb, len); + if (state->c == NULL) { + printf("failed to allocate packet\n"); + return NULL; + } + ZERO_STRUCT(*state->c); + talloc_set_name_const(state->c, "ctdbd req_fetch_lock packet"); + talloc_steal(state, state->c); + + req = (struct ctdb_req_fetch_lock *)state->c; + req->hdr.length = len; + req->hdr.ctdb_magic = CTDB_MAGIC; + req->hdr.ctdb_version = CTDB_VERSION; + req->hdr.operation = CTDB_REQ_FETCH_LOCK; + req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + req->db_id = ctdb_db->db_id; + req->keylen = key.dsize; + memcpy(&req->key[0], key.dptr, key.dsize); + + res = ctdb_client_queue_pkt(ctdb, &req->hdr); + if (res != 0) { + return NULL; + } + + talloc_free(req); + + return state; +} + + +struct ctdb_call_state *ctdb_client_store_unlock_send( + struct ctdb_record_handle *rh, + TALLOC_CTX *mem_ctx, + TDB_DATA data) +{ + struct ctdb_call_state *state; + struct ctdb_db_context *ctdb_db = talloc_get_type(rh->ctdb_db, struct ctdb_db_context); + struct ctdb_context *ctdb = ctdb_db->ctdb; + struct ctdb_req_store_unlock *req; + int len, res; + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ux_socket_connect(ctdb); + } + + state = talloc_zero(ctdb_db, struct ctdb_call_state); + if (state == NULL) { + printf("failed to allocate state\n"); + return NULL; + } + state->state = CTDB_CALL_WAIT; + state->ctdb_db = ctdb_db; + len = offsetof(struct ctdb_req_store_unlock, data) + rh->key.dsize + data.dsize; + state->c = ctdbd_allocate_pkt(ctdb, len); + if (state->c == NULL) { + printf("failed to allocate packet\n"); + return NULL; + } + ZERO_STRUCT(*state->c); + talloc_set_name_const(state->c, "ctdbd req_store_unlock packet"); + talloc_steal(state, state->c); + + req = (struct ctdb_req_store_unlock *)state->c; + req->hdr.length = len; + req->hdr.ctdb_magic = CTDB_MAGIC; + req->hdr.ctdb_version = CTDB_VERSION; + req->hdr.operation = CTDB_REQ_STORE_UNLOCK; + req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + req->db_id = ctdb_db->db_id; + req->keylen = rh->key.dsize; + req->datalen = data.dsize; + memcpy(&req->data[0], rh->key.dptr, rh->key.dsize); + memcpy(&req->data[req->keylen], data.dptr, data.dsize); + + res = ctdb_client_queue_pkt(ctdb, &req->hdr); + if (res != 0) { + return NULL; + } + + talloc_free(req); + + return state; +} + +/* + make a recv call to the local ctdb daemon - called from client context + + This is called when the program wants to wait for a ctdb_fetch_lock to complete and get the + results. This call will block unless the call has already completed. +*/ +struct ctdb_record_handle *ctdb_client_fetch_lock_recv(struct ctdb_call_state *state, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data) +{ + struct ctdb_record_handle *rec; + + while (state->state < CTDB_CALL_DONE) { + event_loop_once(state->ctdb_db->ctdb->ev); + } + if (state->state != CTDB_CALL_DONE) { + ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + talloc_free(state); + return NULL; + } + + rec = talloc(mem_ctx, struct ctdb_record_handle); + CTDB_NO_MEMORY_NULL(state->ctdb_db->ctdb, rec); + + rec->ctdb_db = state->ctdb_db; + rec->key = key; + rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize); + rec->data = talloc(rec, TDB_DATA); + rec->data->dsize = state->call.reply_data.dsize; + rec->data->dptr = talloc_memdup(rec, state->call.reply_data.dptr, rec->data->dsize); + + if (data) { + *data = *rec->data; + } + return rec; +} + +/* + make a recv call to the local ctdb daemon - called from client context + + This is called when the program wants to wait for a ctdb_store_unlock to complete and get the + results. This call will block unless the call has already completed. +*/ +int ctdb_client_store_unlock_recv(struct ctdb_call_state *state, struct ctdb_record_handle *rec) +{ + while (state->state < CTDB_CALL_DONE) { + event_loop_once(state->ctdb_db->ctdb->ev); + } + if (state->state != CTDB_CALL_DONE) { + ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + } + + talloc_free(state); + return state->state; +} + +struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db, + TALLOC_CTX *mem_ctx, + TDB_DATA key, + TDB_DATA *data) +{ + struct ctdb_call_state *state; + struct ctdb_record_handle *rec; + + state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key); + rec = ctdb_client_fetch_lock_recv(state, mem_ctx, key, data); + + return rec; +} + +int ctdb_client_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data) +{ + struct ctdb_call_state *state; + int res; + + state = ctdb_client_store_unlock_send(rec, rec, data); + res = ctdb_client_store_unlock_recv(state, rec); + + talloc_free(rec); + + return res; +} diff --git a/source4/cluster/ctdb/common/ctdb_daemon.c b/source4/cluster/ctdb/common/ctdb_daemon.c new file mode 100644 index 0000000000..945030d77e --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb_daemon.c @@ -0,0 +1,631 @@ +/* + ctdb daemon code + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "db_wrap.h" +#include "lib/tdb/include/tdb.h" +#include "lib/events/events.h" +#include "lib/util/dlinklist.h" +#include "system/network.h" +#include "system/filesys.h" +#include "../include/ctdb.h" +#include "../include/ctdb_private.h" + +static void ctdb_main_loop(struct ctdb_context *ctdb) +{ + ctdb->methods->start(ctdb); + + /* go into a wait loop to allow other nodes to complete */ + event_loop_wait(ctdb->ev); + + printf("event_loop_wait() returned. this should not happen\n"); + exit(1); +} + + +static void set_non_blocking(int fd) +{ + unsigned v; + v = fcntl(fd, F_GETFL, 0); + fcntl(fd, F_SETFL, v | O_NONBLOCK); +} + + +/* + structure describing a connected client in the daemon + */ +struct ctdb_client { + struct ctdb_context *ctdb; + int fd; + struct ctdb_queue *queue; +}; + + +/* + message handler for when we are in daemon mode. This redirects the message + to the right client + */ +static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, + TDB_DATA data, void *private_data) +{ + struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client); + struct ctdb_req_message *r; + int len; + + /* construct a message to send to the client containing the data */ + len = offsetof(struct ctdb_req_message, data) + data.dsize; + r = ctdbd_allocate_pkt(ctdb, len); + +/*XXX cant use this since it returns an int CTDB_NO_MEMORY(ctdb, r);*/ + talloc_set_name_const(r, "req_message packet"); + + ZERO_STRUCT(*r); + + r->hdr.length = len; + r->hdr.ctdb_magic = CTDB_MAGIC; + r->hdr.ctdb_version = CTDB_VERSION; + r->hdr.operation = CTDB_REQ_MESSAGE; + r->srvid = srvid; + r->datalen = data.dsize; + memcpy(&r->data[0], data.dptr, data.dsize); + + ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, len); + + talloc_free(r); + return; +} + + +/* + this is called when the ctdb daemon received a ctdb request to + set the srvid from the client + */ +static void daemon_request_register_message_handler(struct ctdb_client *client, + struct ctdb_req_register *c) +{ + int res; + res = ctdb_register_message_handler(client->ctdb, client, + c->srvid, daemon_message_handler, + client); + if (res != 0) { + printf("Failed to register handler %u in daemon\n", c->srvid); + } +} + + +static struct ctdb_call_state *ctdb_fetch_lock_send(struct ctdb_db_context *ctdb_db, + TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data) +{ + struct ctdb_call *call; + struct ctdb_record_handle *rec; + struct ctdb_call_state *state; + + rec = talloc(mem_ctx, struct ctdb_record_handle); + CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec); + + + call = talloc(rec, struct ctdb_call); + ZERO_STRUCT(*call); + call->call_id = CTDB_FETCH_FUNC; + call->key = key; + call->flags = CTDB_IMMEDIATE_MIGRATION; + + + rec->ctdb_db = ctdb_db; + rec->key = key; + rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize); + rec->data = data; + + state = ctdb_call_send(ctdb_db, call); + state->fetch_private = rec; + + return state; +} + +struct client_fetch_lock_data { + struct ctdb_client *client; + uint32_t reqid; +}; +static void daemon_fetch_lock_complete(struct ctdb_call_state *state) +{ + struct ctdb_reply_fetch_lock *r; + struct client_fetch_lock_data *data = talloc_get_type(state->async.private_data, struct client_fetch_lock_data); + struct ctdb_client *client = talloc_get_type(data->client, struct ctdb_client); + int length, res; + + length = offsetof(struct ctdb_reply_fetch_lock, data) + state->call.reply_data.dsize; + r = ctdbd_allocate_pkt(client->ctdb, length); + if (r == NULL) { + printf("Failed to allocate reply_call in ctdb daemon\n"); + return; + } + ZERO_STRUCT(*r); + r->hdr.length = length; + r->hdr.ctdb_magic = CTDB_MAGIC; + r->hdr.ctdb_version = CTDB_VERSION; + r->hdr.operation = CTDB_REPLY_FETCH_LOCK; + r->hdr.reqid = data->reqid; + r->state = state->state; + r->datalen = state->call.reply_data.dsize; + memcpy(&r->data[0], state->call.reply_data.dptr, r->datalen); + + res = ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, r->hdr.length); + if (res != 0) { + printf("Failed to queue packet from daemon to client\n"); + } + talloc_free(r); +} + +/* + called when the daemon gets a fetch lock request from a client + */ +static void daemon_request_fetch_lock(struct ctdb_client *client, + struct ctdb_req_fetch_lock *f) +{ + struct ctdb_call_state *state; + TDB_DATA key, *data; + struct ctdb_db_context *ctdb_db; + struct client_fetch_lock_data *fl_data; + + ctdb_db = find_ctdb_db(client->ctdb, f->db_id); + + key.dsize = f->keylen; + key.dptr = &f->key[0]; + + data = talloc(client, TDB_DATA); + data->dptr = NULL; + data->dsize = 0; + + state = ctdb_fetch_lock_send(ctdb_db, client, key, data); + talloc_steal(state, data); + + fl_data = talloc(state, struct client_fetch_lock_data); + fl_data->client = client; + fl_data->reqid = f->hdr.reqid; + state->async.fn = daemon_fetch_lock_complete; + state->async.private_data = fl_data; +} + +/* + called when the daemon gets a store unlock request from a client + + this would never block? + */ +static void daemon_request_store_unlock(struct ctdb_client *client, + struct ctdb_req_store_unlock *f) +{ + struct ctdb_db_context *ctdb_db; + struct ctdb_reply_store_unlock r; + uint32_t caller = ctdb_get_vnn(client->ctdb); + struct ctdb_ltdb_header header; + TDB_DATA key, data; + int res; + + ctdb_db = find_ctdb_db(client->ctdb, f->db_id); + + /* write the data to ltdb */ + key.dsize = f->keylen; + key.dptr = &f->data[0]; + res = ctdb_ltdb_fetch(ctdb_db, key, &header, NULL, NULL); + if (res) { + ctdb_set_error(ctdb_db->ctdb, "Fetch of locally held record failed"); + res = -1; + goto done; + } + if (header.laccessor != caller) { + header.lacount = 0; + } + header.laccessor = caller; + header.lacount++; + data.dsize = f->datalen; + data.dptr = &f->data[f->keylen]; + res = ctdb_ltdb_store(ctdb_db, key, &header, data); + if ( res != 0) { + ctdb_set_error(ctdb_db->ctdb, "ctdb_call tdb_store failed\n"); + } + + +done: + /* now send the reply */ + ZERO_STRUCT(r); + + r.hdr.length = sizeof(r); + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.operation = CTDB_REPLY_STORE_UNLOCK; + r.hdr.reqid = f->hdr.reqid; + r.state = res; + + res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length); + if (res != 0) { + printf("Failed to queue a store unlock response\n"); + return; + } +} + +/* + called when the daemon gets a connect wait request from a client + */ +static void daemon_request_connect_wait(struct ctdb_client *client, + struct ctdb_req_connect_wait *c) +{ + struct ctdb_reply_connect_wait r; + int res; + + /* first wait - in the daemon */ + ctdb_daemon_connect_wait(client->ctdb); + + /* now send the reply */ + ZERO_STRUCT(r); + + r.hdr.length = sizeof(r); + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.operation = CTDB_REPLY_CONNECT_WAIT; + r.vnn = ctdb_get_vnn(client->ctdb); + r.num_connected = client->ctdb->num_connected; + + res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length); + if (res != 0) { + printf("Failed to queue a connect wait response\n"); + return; + } +} + +/* + destroy a ctdb_client +*/ +static int ctdb_client_destructor(struct ctdb_client *client) +{ + close(client->fd); + client->fd = -1; + return 0; +} + + +/* + this is called when the ctdb daemon received a ctdb request message + from a local client over the unix domain socket + */ +static void daemon_request_message_from_client(struct ctdb_client *client, + struct ctdb_req_message *c) +{ + TDB_DATA data; + int res; + + /* maybe the message is for another client on this node */ + if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) { + ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c); + return; + } + + /* its for a remote node */ + data.dptr = &c->data[0]; + data.dsize = c->datalen; + res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode, + c->srvid, data); + if (res != 0) { + printf("Failed to send message to remote node %u\n", + c->hdr.destnode); + } +} + +/* + this is called when the ctdb daemon received a ctdb request call + from a local client over the unix domain socket + */ +static void daemon_request_call_from_client(struct ctdb_client *client, + struct ctdb_req_call *c) +{ + struct ctdb_call_state *state; + struct ctdb_db_context *ctdb_db; + struct ctdb_call call; + struct ctdb_reply_call *r; + int res; + uint32_t length; + + ctdb_db = find_ctdb_db(client->ctdb, c->db_id); + if (!ctdb_db) { + printf("Unknown database in request. db_id==0x%08x",c->db_id); + return; + } + + ZERO_STRUCT(call); + call.call_id = c->callid; + call.key.dptr = c->data; + call.key.dsize = c->keylen; + call.call_data.dptr = c->data + c->keylen; + call.call_data.dsize = c->calldatalen; + + state = ctdb_call_send(ctdb_db, &call); + +/* XXX this must be converted to fully async */ + res = ctdb_call_recv(state, &call); + if (res != 0) { + printf("ctdbd_call_recv() returned error\n"); + exit(1); + } + + length = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize; + r = ctdbd_allocate_pkt(client->ctdb, length); + if (r == NULL) { + printf("Failed to allocate reply_call in ctdb daemon\n"); + return; + } + ZERO_STRUCT(*r); + r->hdr.length = length; + r->hdr.ctdb_magic = CTDB_MAGIC; + r->hdr.ctdb_version = CTDB_VERSION; + r->hdr.operation = CTDB_REPLY_CALL; + r->hdr.reqid = c->hdr.reqid; + r->datalen = call.reply_data.dsize; + memcpy(&r->data[0], call.reply_data.dptr, r->datalen); + + res = ctdb_queue_send(client->queue, (uint8_t *)&r, r->hdr.length); + if (res != 0) { + printf("Failed to queue packet from daemon to client\n"); + } + talloc_free(r); +} + + +/* data contains a packet from the client */ +static void client_incoming_packet(struct ctdb_client *client, void *data, size_t nread) +{ + struct ctdb_req_header *hdr = data; + + if (hdr->ctdb_magic != CTDB_MAGIC) { + ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n"); + goto done; + } + + if (hdr->ctdb_version != CTDB_VERSION) { + ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); + goto done; + } + + switch (hdr->operation) { + case CTDB_REQ_CALL: + daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr); + break; + + case CTDB_REQ_REGISTER: + daemon_request_register_message_handler(client, + (struct ctdb_req_register *)hdr); + break; + case CTDB_REQ_MESSAGE: + daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr); + break; + + case CTDB_REQ_CONNECT_WAIT: + daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr); + break; + case CTDB_REQ_FETCH_LOCK: + daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr); + break; + case CTDB_REQ_STORE_UNLOCK: + daemon_request_store_unlock(client, (struct ctdb_req_store_unlock *)hdr); + break; + default: + printf("daemon: unrecognized operation:%d\n",hdr->operation); + } + +done: + talloc_free(data); +} + + +static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) +{ + struct ctdb_client *client = talloc_get_type(args, struct ctdb_client); + struct ctdb_req_header *hdr; + + if (cnt == 0) { + talloc_free(client); + return; + } + + if (cnt < sizeof(*hdr)) { + ctdb_set_error(client->ctdb, "Bad packet length %d\n", cnt); + return; + } + hdr = (struct ctdb_req_header *)data; + if (cnt != hdr->length) { + ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n", + hdr->length, cnt); + return; + } + + if (hdr->ctdb_magic != CTDB_MAGIC) { + ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n"); + return; + } + + if (hdr->ctdb_version != CTDB_VERSION) { + ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); + return; + } + + /* it is the responsibility of the incoming packet function to free 'data' */ + client_incoming_packet(client, data, cnt); +} + +static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private_data) +{ + struct sockaddr_in addr; + socklen_t len; + int fd; + struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context); + struct ctdb_client *client; + + memset(&addr, 0, sizeof(addr)); + len = sizeof(addr); + fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len); + if (fd == -1) { + return; + } + set_non_blocking(fd); + + client = talloc_zero(ctdb, struct ctdb_client); + client->ctdb = ctdb; + client->fd = fd; + + client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, + ctdb_client_read_cb, client); + + talloc_set_destructor(client, ctdb_client_destructor); +} + + + +static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private_data) +{ + int *fd = private_data; + int cnt; + char buf; + + /* XXX this is a good place to try doing some cleaning up before exiting */ + cnt = read(*fd, &buf, 1); + if (cnt==0) { + printf("parent process exited. filedescriptor dissappeared\n"); + exit(1); + } else { + printf("ctdb: did not expect data from parent process\n"); + exit(1); + } +} + + + +/* + create a unix domain socket and bind it + return a file descriptor open on the socket +*/ +static int ux_socket_bind(struct ctdb_context *ctdb) +{ + struct sockaddr_un addr; + + ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0); + if (ctdb->daemon.sd == -1) { + ctdb->daemon.sd = -1; + return -1; + } + + set_non_blocking(ctdb->daemon.sd); + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)); + + if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + close(ctdb->daemon.sd); + ctdb->daemon.sd = -1; + return -1; + } + listen(ctdb->daemon.sd, 1); + + return 0; +} + +/* + delete the socket on exit - called on destruction of autofree context + */ +static int unlink_destructor(const char *name) +{ + unlink(name); + return 0; +} + +/* + start the protocol going +*/ +int ctdbd_start(struct ctdb_context *ctdb) +{ + pid_t pid; + static int fd[2]; + int res; + struct fd_event *fde; + const char *domain_socket_name; + + /* generate a name to use for our local socket */ + ctdb->daemon.name = talloc_asprintf(ctdb, "%s.%s", CTDB_PATH, ctdb->address.address); + /* get rid of any old sockets */ + unlink(ctdb->daemon.name); + + /* create a unix domain stream socket to listen to */ + res = ux_socket_bind(ctdb); + if (res!=0) { + printf("Failed to open CTDB unix domain socket\n"); + exit(10); + } + + res = pipe(&fd[0]); + if (res) { + printf("Failed to open pipe for CTDB\n"); + exit(1); + } + pid = fork(); + if (pid==-1) { + printf("Failed to fork CTDB daemon\n"); + exit(1); + } + + if (pid) { + close(fd[0]); + close(ctdb->daemon.sd); + ctdb->daemon.sd = -1; + return 0; + } + + /* ensure the socket is deleted on exit of the daemon */ + domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name); + talloc_set_destructor(domain_socket_name, unlink_destructor); + + close(fd[1]); + ctdb_clear_flags(ctdb, CTDB_FLAG_DAEMON_MODE); + ctdb->ev = event_context_init(NULL); + fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]); + fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb); + ctdb_main_loop(ctdb); + + return 0; +} + +/* + allocate a packet for use in client<->daemon communication + */ +void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len) +{ + int size; + + size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1); + return talloc_size(ctdb, size); +} + +int ctdb_daemon_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, + ctdb_message_fn_t handler, + void *private_data) +{ + return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data); +} + diff --git a/source4/cluster/ctdb/common/ctdb_io.c b/source4/cluster/ctdb/common/ctdb_io.c new file mode 100644 index 0000000000..238f1701cf --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb_io.c @@ -0,0 +1,303 @@ +/* + ctdb database library + Utility functions to read/write blobs of data from a file descriptor + and handle the case where we might need multiple read/writes to get all the + data. + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/tdb/include/tdb.h" +#include "lib/events/events.h" +#include "lib/util/dlinklist.h" +#include "system/network.h" +#include "system/filesys.h" +#include "../include/ctdb_private.h" +#include "../include/ctdb.h" + +/* structures for packet queueing - see common/ctdb_io.c */ +struct ctdb_partial { + uint8_t *data; + uint32_t length; +}; + +struct ctdb_queue_pkt { + struct ctdb_queue_pkt *next, *prev; + uint8_t *data; + uint32_t length; +}; + +struct ctdb_queue { + struct ctdb_context *ctdb; + struct ctdb_partial partial; /* partial input packet */ + struct ctdb_queue_pkt *out_queue; + struct fd_event *fde; + int fd; + size_t alignment; + void *private_data; + ctdb_queue_cb_fn_t callback; +}; + + + +/* + called when an incoming connection is readable +*/ +static void queue_io_read(struct ctdb_queue *queue) +{ + int num_ready = 0; + ssize_t nread; + uint8_t *data, *data_base; + + if (ioctl(queue->fd, FIONREAD, &num_ready) != 0 || + num_ready == 0) { + /* the descriptor has been closed */ + goto failed; + } + + + queue->partial.data = talloc_realloc_size(queue, queue->partial.data, + num_ready + queue->partial.length); + + if (queue->partial.data == NULL) { + goto failed; + } + + nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready); + if (nread <= 0) { + goto failed; + } + + + data = queue->partial.data; + nread += queue->partial.length; + + queue->partial.data = NULL; + queue->partial.length = 0; + + if (nread >= 4 && *(uint32_t *)data == nread) { + /* it is the responsibility of the incoming packet + function to free 'data' */ + queue->callback(data, nread, queue->private_data); + return; + } + + data_base = data; + + while (nread >= 4 && *(uint32_t *)data <= nread) { + /* we have at least one packet */ + uint8_t *d2; + uint32_t len; + len = *(uint32_t *)data; + d2 = talloc_memdup(queue, data, len); + if (d2 == NULL) { + /* sigh */ + goto failed; + } + queue->callback(d2, len, queue->private_data); + data += len; + nread -= len; + } + + if (nread > 0) { + /* we have only part of a packet */ + if (data_base == data) { + queue->partial.data = data; + queue->partial.length = nread; + } else { + queue->partial.data = talloc_memdup(queue, data, nread); + if (queue->partial.data == NULL) { + goto failed; + } + queue->partial.length = nread; + talloc_free(data_base); + } + return; + } + + talloc_free(data_base); + return; + +failed: + queue->callback(NULL, 0, queue->private_data); +} + + +/* used when an event triggers a dead queue */ +static void queue_dead(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue); + queue->callback(NULL, 0, queue->private_data); +} + + +/* + called when an incoming connection is writeable +*/ +static void queue_io_write(struct ctdb_queue *queue) +{ + while (queue->out_queue) { + struct ctdb_queue_pkt *pkt = queue->out_queue; + ssize_t n; + + n = write(queue->fd, pkt->data, pkt->length); + + if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + event_add_timed(queue->ctdb->ev, queue, timeval_zero(), + queue_dead, queue); + EVENT_FD_NOT_WRITEABLE(queue->fde); + return; + } + if (n <= 0) return; + + if (n != pkt->length) { + pkt->length -= n; + pkt->data += n; + return; + } + + DLIST_REMOVE(queue->out_queue, pkt); + talloc_free(pkt); + } + + EVENT_FD_NOT_WRITEABLE(queue->fde); +} + +/* + called when an incoming connection is readable or writeable +*/ +static void queue_io_handler(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private_data) +{ + struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue); + + if (flags & EVENT_FD_READ) { + queue_io_read(queue); + } else { + queue_io_write(queue); + } +} + + +/* + queue a packet for sending +*/ +int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length) +{ + struct ctdb_queue_pkt *pkt; + uint32_t length2; + + /* enforce the length and alignment rules from the tcp packet allocator */ + length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1); + *(uint32_t *)data = length2; + + if (length2 != length) { + memset(data+length, 0, length2-length); + } + + /* if the queue is empty then try an immediate write, avoiding + queue overhead. This relies on non-blocking sockets */ + if (queue->out_queue == NULL && queue->fd != -1) { + ssize_t n = write(queue->fd, data, length2); + if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + event_add_timed(queue->ctdb->ev, queue, timeval_zero(), + queue_dead, queue); + /* yes, we report success, as the dead node is + handled via a separate event */ + return 0; + } + if (n > 0) { + data += n; + length2 -= n; + } + if (length2 == 0) return 0; + } + + pkt = talloc(queue, struct ctdb_queue_pkt); + CTDB_NO_MEMORY(queue->ctdb, pkt); + + pkt->data = talloc_memdup(pkt, data, length2); + CTDB_NO_MEMORY(queue->ctdb, pkt->data); + + pkt->length = length2; + + if (queue->out_queue == NULL && queue->fd != -1) { + EVENT_FD_WRITEABLE(queue->fde); + } + + DLIST_ADD_END(queue->out_queue, pkt, struct ctdb_queue_pkt *); + + return 0; +} + + +/* + setup the fd used by the queue + */ +int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd) +{ + queue->fd = fd; + talloc_free(queue->fde); + queue->fde = NULL; + + if (fd != -1) { + queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ, + queue_io_handler, queue); + if (queue->fde == NULL) { + return -1; + } + + if (queue->out_queue) { + EVENT_FD_WRITEABLE(queue->fde); + } + } + + return 0; +} + + + +/* + setup a packet queue on a socket + */ +struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb, + TALLOC_CTX *mem_ctx, int fd, int alignment, + + ctdb_queue_cb_fn_t callback, + void *private_data) +{ + struct ctdb_queue *queue; + + queue = talloc_zero(mem_ctx, struct ctdb_queue); + CTDB_NO_MEMORY_NULL(ctdb, queue); + + queue->ctdb = ctdb; + queue->fd = fd; + queue->alignment = alignment; + queue->private_data = private_data; + queue->callback = callback; + if (fd != -1) { + if (ctdb_queue_set_fd(queue, fd) != 0) { + talloc_free(queue); + return NULL; + } + } + + return queue; +} diff --git a/source4/cluster/ctdb/common/ctdb_ltdb.c b/source4/cluster/ctdb/common/ctdb_ltdb.c index 84c3bd49da..785ccad9b3 100644 --- a/source4/cluster/ctdb/common/ctdb_ltdb.c +++ b/source4/cluster/ctdb/common/ctdb_ltdb.c @@ -197,3 +197,21 @@ int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, return ret; } + + +/* + lock a record in the ltdb, given a key + */ +int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key) +{ + return tdb_chainlock(ctdb_db->ltdb->tdb, key); +} + +/* + unlock a record in the ltdb, given a key + */ +int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key) +{ + return tdb_chainunlock(ctdb_db->ltdb->tdb, key); +} + diff --git a/source4/cluster/ctdb/common/ctdb_message.c b/source4/cluster/ctdb/common/ctdb_message.c index 300bee8339..dba15aecb9 100644 --- a/source4/cluster/ctdb/common/ctdb_message.c +++ b/source4/cluster/ctdb/common/ctdb_message.c @@ -27,36 +27,104 @@ #include "system/network.h" #include "system/filesys.h" #include "../include/ctdb_private.h" +#include "lib/util/dlinklist.h" + +/* + this dispatches the messages to the registered ctdb message handler +*/ +static int ctdb_dispatch_message(struct ctdb_context *ctdb, uint32_t srvid, TDB_DATA data) +{ + struct ctdb_message_list *ml; + + /* XXX we need a must faster way of finding the matching srvid + - maybe a tree? */ + for (ml=ctdb->message_list;ml;ml=ml->next) { + if (ml->srvid == srvid) break; + } + if (ml == NULL) { + printf("daemon vnn:%d no msg handler for srvid=%u\n", ctdb_get_vnn(ctdb), srvid); + /* no registered message handler */ + return -1; + } + + ml->message_handler(ctdb, srvid, data, ml->message_private); + return 0; +} /* called when a CTDB_REQ_MESSAGE packet comes in - - this dispatches the messages to the registered ctdb message handler */ void ctdb_request_message(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { struct ctdb_req_message *c = (struct ctdb_req_message *)hdr; TDB_DATA data; - if (ctdb->message_handler == NULL) { - /* no registered message handler */ - return; - } + data.dptr = &c->data[0]; data.dsize = c->datalen; - ctdb->message_handler(ctdb, c->srvid, data, ctdb->message_private); + + ctdb_dispatch_message(ctdb, c->srvid, data); +} + +/* + this local messaging handler is ugly, but is needed to prevent + recursion in ctdb_send_message() when the destination node is the + same as the source node + */ +struct ctdb_local_message { + struct ctdb_context *ctdb; + uint32_t srvid; + TDB_DATA data; +}; + +static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ctdb_local_message *m = talloc_get_type(private_data, + struct ctdb_local_message); + int res; + + res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data); + if (res != 0) { + printf("Failed to dispatch message for srvid=%u\n", m->srvid); + } + talloc_free(m); } +static int ctdb_local_message(struct ctdb_context *ctdb, uint32_t srvid, TDB_DATA data) +{ + struct ctdb_local_message *m; + m = talloc(ctdb, struct ctdb_local_message); + CTDB_NO_MEMORY(ctdb, m); + + m->ctdb = ctdb; + m->srvid = srvid; + m->data = data; + m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize); + if (m->data.dptr == NULL) { + talloc_free(m); + return -1; + } + + /* this needs to be done as an event to prevent recursion */ + event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m); + return 0; +} /* send a ctdb message */ -int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn, - uint32_t srvid, TDB_DATA data) +int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn, + uint32_t srvid, TDB_DATA data) { struct ctdb_req_message *r; int len; + /* see if this is a message to ourselves */ + if (vnn == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { + return ctdb_local_message(ctdb, srvid, data); + } + len = offsetof(struct ctdb_req_message, data) + data.dsize; r = ctdb->methods->allocate_pkt(ctdb, len); CTDB_NO_MEMORY(ctdb, r); @@ -80,13 +148,49 @@ int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn, } /* - setup handler for receipt of ctdb messages from ctdb_send_message() + send a ctdb message */ -int ctdb_set_message_handler(struct ctdb_context *ctdb, ctdb_message_fn_t handler, - void *private) +int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn, + uint32_t srvid, TDB_DATA data) +{ + if (ctdb->flags & CTDB_FLAG_DAEMON_MODE) { + return ctdb_client_send_message(ctdb, vnn, srvid, data); + } + return ctdb_daemon_send_message(ctdb, vnn, srvid, data); +} + + +/* + when a client goes away, we need to remove its srvid handler from the list + */ +static int message_handler_destructor(struct ctdb_message_list *m) { - ctdb->message_handler = handler; - ctdb->message_private = private; + DLIST_REMOVE(m->ctdb->message_list, m); return 0; } +/* + setup handler for receipt of ctdb messages from ctdb_send_message() +*/ +int ctdb_register_message_handler(struct ctdb_context *ctdb, + TALLOC_CTX *mem_ctx, + uint32_t srvid, + ctdb_message_fn_t handler, + void *private_data) +{ + struct ctdb_message_list *m; + + m = talloc(mem_ctx, struct ctdb_message_list); + CTDB_NO_MEMORY(ctdb, m); + + m->ctdb = ctdb; + m->srvid = srvid; + m->message_handler = handler; + m->message_private = private_data; + + DLIST_ADD(ctdb->message_list, m); + + talloc_set_destructor(m, message_handler_destructor); + + return 0; +} |