diff options
Diffstat (limited to 'source4/cluster/ctdb/common/ctdb_daemon.c')
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_daemon.c | 710 |
1 files changed, 0 insertions, 710 deletions
diff --git a/source4/cluster/ctdb/common/ctdb_daemon.c b/source4/cluster/ctdb/common/ctdb_daemon.c deleted file mode 100644 index f1de3933e7..0000000000 --- a/source4/cluster/ctdb/common/ctdb_daemon.c +++ /dev/null @@ -1,710 +0,0 @@ -/* - ctdb daemon code - - Copyright (C) Andrew Tridgell 2006 - - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation; either - version 3 of the License, or (at your option) any later version. - - This library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, see <http://www.gnu.org/licenses/>. -*/ - -#include "includes.h" -#include "db_wrap.h" -#include "lib/tdb/include/tdb.h" -#include "lib/events/events.h" -#include "lib/util/dlinklist.h" -#include "system/network.h" -#include "system/filesys.h" -#include "system/wait.h" -#include "../include/ctdb.h" -#include "../include/ctdb_private.h" - -/* - structure describing a connected client in the daemon - */ -struct ctdb_client { - struct ctdb_context *ctdb; - int fd; - struct ctdb_queue *queue; -}; - - - -static void daemon_incoming_packet(void *, uint8_t *, uint32_t ); - -static void ctdb_main_loop(struct ctdb_context *ctdb) -{ - ctdb->methods->start(ctdb); - - /* go into a wait loop to allow other nodes to complete */ - event_loop_wait(ctdb->ev); - - DEBUG(0,("event_loop_wait() returned. this should not happen\n")); - exit(1); -} - - -static void set_non_blocking(int fd) -{ - unsigned v; - v = fcntl(fd, F_GETFL, 0); - fcntl(fd, F_SETFL, v | O_NONBLOCK); -} - -static void block_signal(int signum) -{ - struct sigaction act; - - memset(&act, 0, sizeof(act)); - - act.sa_handler = SIG_IGN; - sigemptyset(&act.sa_mask); - sigaddset(&act.sa_mask, signum); - sigaction(signum, &act, NULL); -} - - -/* - send a packet to a client - */ -static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr) -{ - client->ctdb->status.client_packets_sent++; - return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length); -} - -/* - message handler for when we are in daemon mode. This redirects the message - to the right client - */ -static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, - TDB_DATA data, void *private_data) -{ - struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client); - struct ctdb_req_message *r; - int len; - - /* construct a message to send to the client containing the data */ - len = offsetof(struct ctdb_req_message, data) + data.dsize; - r = ctdbd_allocate_pkt(ctdb, len); - - talloc_set_name_const(r, "req_message packet"); - - memset(r, 0, offsetof(struct ctdb_req_message, data)); - - r->hdr.length = len; - r->hdr.ctdb_magic = CTDB_MAGIC; - r->hdr.ctdb_version = CTDB_VERSION; - r->hdr.operation = CTDB_REQ_MESSAGE; - r->srvid = srvid; - r->datalen = data.dsize; - memcpy(&r->data[0], data.dptr, data.dsize); - - daemon_queue_send(client, &r->hdr); - - talloc_free(r); -} - - -/* - this is called when the ctdb daemon received a ctdb request to - set the srvid from the client - */ -static void daemon_request_register_message_handler(struct ctdb_client *client, - struct ctdb_req_register *c) -{ - int res; - res = ctdb_register_message_handler(client->ctdb, client, - c->srvid, daemon_message_handler, - client); - if (res != 0) { - DEBUG(0,(__location__ " Failed to register handler %u in daemon\n", - c->srvid)); - } else { - DEBUG(2,(__location__ " Registered message handler for srvid=%u\n", - c->srvid)); - } -} - - -/* - called when the daemon gets a shutdown request from a client - */ -static void daemon_request_shutdown(struct ctdb_client *client, - struct ctdb_req_shutdown *f) -{ - struct ctdb_context *ctdb = talloc_get_type(client->ctdb, struct ctdb_context); - int len; - uint32_t node; - - /* we dont send to ourself so we can already count one daemon as - exiting */ - ctdb->num_finished++; - - - /* loop over all nodes of the cluster */ - for (node=0; node<ctdb->num_nodes;node++) { - struct ctdb_req_finished *rf; - - /* dont send a message to ourself */ - if (ctdb->vnn == node) { - continue; - } - - len = sizeof(struct ctdb_req_finished); - rf = ctdb->methods->allocate_pkt(ctdb, len); - CTDB_NO_MEMORY_FATAL(ctdb, rf); - talloc_set_name_const(rf, "ctdb_req_finished packet"); - - ZERO_STRUCT(*rf); - rf->hdr.length = len; - rf->hdr.ctdb_magic = CTDB_MAGIC; - rf->hdr.ctdb_version = CTDB_VERSION; - rf->hdr.operation = CTDB_REQ_FINISHED; - rf->hdr.destnode = node; - rf->hdr.srcnode = ctdb->vnn; - rf->hdr.reqid = 0; - - ctdb_queue_packet(ctdb, &(rf->hdr)); - - talloc_free(rf); - } - - /* wait until all nodes have are prepared to shutdown */ - while (ctdb->num_finished != ctdb->num_nodes) { - event_loop_once(ctdb->ev); - } - - /* all daemons have requested to finish - we now exit */ - DEBUG(1,("All daemons finished - exiting\n")); - _exit(0); -} - - - -/* - called when the daemon gets a connect wait request from a client - */ -static void daemon_request_connect_wait(struct ctdb_client *client, - struct ctdb_req_connect_wait *c) -{ - struct ctdb_reply_connect_wait r; - int res; - - /* first wait - in the daemon */ - ctdb_daemon_connect_wait(client->ctdb); - - /* now send the reply */ - ZERO_STRUCT(r); - - r.hdr.length = sizeof(r); - r.hdr.ctdb_magic = CTDB_MAGIC; - r.hdr.ctdb_version = CTDB_VERSION; - r.hdr.operation = CTDB_REPLY_CONNECT_WAIT; - r.vnn = ctdb_get_vnn(client->ctdb); - r.num_connected = client->ctdb->num_connected; - - res = daemon_queue_send(client, &r.hdr); - if (res != 0) { - DEBUG(0,(__location__ " Failed to queue a connect wait response\n")); - return; - } -} - - -/* - called when the daemon gets a status request from a client - */ -static void daemon_request_status(struct ctdb_client *client, - struct ctdb_req_status *c) -{ - struct ctdb_reply_status r; - int res; - - /* now send the reply */ - ZERO_STRUCT(r); - - r.hdr.length = sizeof(r); - r.hdr.ctdb_magic = CTDB_MAGIC; - r.hdr.ctdb_version = CTDB_VERSION; - r.hdr.operation = CTDB_REPLY_STATUS; - r.hdr.reqid = c->hdr.reqid; - r.status = client->ctdb->status; - - res = daemon_queue_send(client, &r.hdr); - if (res != 0) { - DEBUG(0,(__location__ " Failed to queue a connect wait response\n")); - return; - } -} - -/* - destroy a ctdb_client -*/ -static int ctdb_client_destructor(struct ctdb_client *client) -{ - close(client->fd); - client->fd = -1; - return 0; -} - - -/* - this is called when the ctdb daemon received a ctdb request message - from a local client over the unix domain socket - */ -static void daemon_request_message_from_client(struct ctdb_client *client, - struct ctdb_req_message *c) -{ - TDB_DATA data; - int res; - - /* maybe the message is for another client on this node */ - if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) { - ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c); - return; - } - - /* its for a remote node */ - data.dptr = &c->data[0]; - data.dsize = c->datalen; - res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode, - c->srvid, data); - if (res != 0) { - DEBUG(0,(__location__ " Failed to send message to remote node %u\n", - c->hdr.destnode)); - } -} - - -struct daemon_call_state { - struct ctdb_client *client; - uint32_t reqid; - struct ctdb_call *call; - struct timeval start_time; -}; - -/* - complete a call from a client -*/ -static void daemon_call_from_client_callback(struct ctdb_call_state *state) -{ - struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, - struct daemon_call_state); - struct ctdb_reply_call *r; - int res; - uint32_t length; - struct ctdb_client *client = dstate->client; - - talloc_steal(client, dstate); - talloc_steal(dstate, dstate->call); - - res = ctdb_daemon_call_recv(state, dstate->call); - if (res != 0) { - DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n")); - client->ctdb->status.pending_calls--; - ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time); - return; - } - - length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize; - r = ctdbd_allocate_pkt(dstate, length); - if (r == NULL) { - DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n")); - client->ctdb->status.pending_calls--; - ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time); - return; - } - memset(r, 0, offsetof(struct ctdb_reply_call, data)); - r->hdr.length = length; - r->hdr.ctdb_magic = CTDB_MAGIC; - r->hdr.ctdb_version = CTDB_VERSION; - r->hdr.operation = CTDB_REPLY_CALL; - r->hdr.reqid = dstate->reqid; - r->datalen = dstate->call->reply_data.dsize; - memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen); - - res = daemon_queue_send(client, &r->hdr); - if (res != 0) { - DEBUG(0, (__location__ "Failed to queue packet from daemon to client\n")); - } - ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time); - talloc_free(dstate); - client->ctdb->status.pending_calls--; -} - - -/* - this is called when the ctdb daemon received a ctdb request call - from a local client over the unix domain socket - */ -static void daemon_request_call_from_client(struct ctdb_client *client, - struct ctdb_req_call *c) -{ - struct ctdb_call_state *state; - struct ctdb_db_context *ctdb_db; - struct daemon_call_state *dstate; - struct ctdb_call *call; - struct ctdb_ltdb_header header; - TDB_DATA key, data; - int ret; - struct ctdb_context *ctdb = client->ctdb; - - ctdb->status.total_calls++; - ctdb->status.pending_calls++; - - ctdb_db = find_ctdb_db(client->ctdb, c->db_id); - if (!ctdb_db) { - DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x", - c->db_id)); - ctdb->status.pending_calls--; - return; - } - - key.dptr = c->data; - key.dsize = c->keylen; - - ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, - (struct ctdb_req_header *)c, &data, - daemon_incoming_packet, client); - if (ret == -2) { - /* will retry later */ - ctdb->status.pending_calls--; - return; - } - - if (ret != 0) { - DEBUG(0,(__location__ " Unable to fetch record\n")); - ctdb->status.pending_calls--; - return; - } - - dstate = talloc(client, struct daemon_call_state); - if (dstate == NULL) { - ctdb_ltdb_unlock(ctdb_db, key); - DEBUG(0,(__location__ " Unable to allocate dstate\n")); - ctdb->status.pending_calls--; - return; - } - dstate->start_time = timeval_current(); - dstate->client = client; - dstate->reqid = c->hdr.reqid; - talloc_steal(dstate, data.dptr); - - call = dstate->call = talloc_zero(dstate, struct ctdb_call); - if (call == NULL) { - ctdb_ltdb_unlock(ctdb_db, key); - DEBUG(0,(__location__ " Unable to allocate call\n")); - ctdb->status.pending_calls--; - ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time); - return; - } - - call->call_id = c->callid; - call->key = key; - call->call_data.dptr = c->data + c->keylen; - call->call_data.dsize = c->calldatalen; - call->flags = c->flags; - - if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { - state = ctdb_call_local_send(ctdb_db, call, &header, &data); - } else { - state = ctdb_daemon_call_send_remote(ctdb_db, call, &header); - } - - ctdb_ltdb_unlock(ctdb_db, key); - - if (state == NULL) { - DEBUG(0,(__location__ " Unable to setup call send\n")); - ctdb->status.pending_calls--; - ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time); - return; - } - talloc_steal(state, dstate); - talloc_steal(client, state); - - state->async.fn = daemon_call_from_client_callback; - state->async.private_data = dstate; -} - -/* data contains a packet from the client */ -static void daemon_incoming_packet(void *p, uint8_t *data, uint32_t nread) -{ - struct ctdb_req_header *hdr = (struct ctdb_req_header *)data; - struct ctdb_client *client = talloc_get_type(p, struct ctdb_client); - TALLOC_CTX *tmp_ctx; - struct ctdb_context *ctdb = client->ctdb; - - /* place the packet as a child of a tmp_ctx. We then use - talloc_free() below to free it. If any of the calls want - to keep it, then they will steal it somewhere else, and the - talloc_free() will be a no-op */ - tmp_ctx = talloc_new(client); - talloc_steal(tmp_ctx, hdr); - - if (hdr->ctdb_magic != CTDB_MAGIC) { - ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n"); - goto done; - } - - if (hdr->ctdb_version != CTDB_VERSION) { - ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version); - goto done; - } - - switch (hdr->operation) { - case CTDB_REQ_CALL: - ctdb->status.client.req_call++; - daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr); - break; - - case CTDB_REQ_REGISTER: - ctdb->status.client.req_register++; - daemon_request_register_message_handler(client, - (struct ctdb_req_register *)hdr); - break; - case CTDB_REQ_MESSAGE: - ctdb->status.client.req_message++; - daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr); - break; - - case CTDB_REQ_CONNECT_WAIT: - ctdb->status.client.req_connect_wait++; - daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr); - break; - - case CTDB_REQ_SHUTDOWN: - ctdb->status.client.req_shutdown++; - daemon_request_shutdown(client, (struct ctdb_req_shutdown *)hdr); - break; - - case CTDB_REQ_STATUS: - ctdb->status.client.req_status++; - daemon_request_status(client, (struct ctdb_req_status *)hdr); - break; - - default: - DEBUG(0,(__location__ " daemon: unrecognized operation %d\n", - hdr->operation)); - } - -done: - talloc_free(tmp_ctx); -} - -/* - called when the daemon gets a incoming packet - */ -static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args) -{ - struct ctdb_client *client = talloc_get_type(args, struct ctdb_client); - struct ctdb_req_header *hdr; - - if (cnt == 0) { - talloc_free(client); - return; - } - - client->ctdb->status.client_packets_recv++; - - if (cnt < sizeof(*hdr)) { - ctdb_set_error(client->ctdb, "Bad packet length %d in daemon\n", cnt); - return; - } - hdr = (struct ctdb_req_header *)data; - if (cnt != hdr->length) { - ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n in daemon", - hdr->length, cnt); - return; - } - - if (hdr->ctdb_magic != CTDB_MAGIC) { - ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n"); - return; - } - - if (hdr->ctdb_version != CTDB_VERSION) { - ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version); - return; - } - - DEBUG(3,(__location__ " client request %d of type %d length %d from " - "node %d to %d\n", hdr->reqid, hdr->operation, hdr->length, - hdr->srcnode, hdr->destnode)); - - /* it is the responsibility of the incoming packet function to free 'data' */ - daemon_incoming_packet(client, data, cnt); -} - -static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private_data) -{ - struct sockaddr_in addr; - socklen_t len; - int fd; - struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context); - struct ctdb_client *client; - - memset(&addr, 0, sizeof(addr)); - len = sizeof(addr); - fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len); - if (fd == -1) { - return; - } - set_non_blocking(fd); - - client = talloc_zero(ctdb, struct ctdb_client); - client->ctdb = ctdb; - client->fd = fd; - - client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, - ctdb_daemon_read_cb, client); - - talloc_set_destructor(client, ctdb_client_destructor); -} - - - -static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private_data) -{ - int *fd = private_data; - int cnt; - char buf; - - /* XXX this is a good place to try doing some cleaning up before exiting */ - cnt = read(*fd, &buf, 1); - if (cnt==0) { - DEBUG(2,(__location__ " parent process exited. filedescriptor dissappeared\n")); - exit(1); - } else { - DEBUG(0,(__location__ " ctdb: did not expect data from parent process\n")); - exit(1); - } -} - - - -/* - create a unix domain socket and bind it - return a file descriptor open on the socket -*/ -static int ux_socket_bind(struct ctdb_context *ctdb) -{ - struct sockaddr_un addr; - - ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0); - if (ctdb->daemon.sd == -1) { - ctdb->daemon.sd = -1; - return -1; - } - - set_non_blocking(ctdb->daemon.sd); - - memset(&addr, 0, sizeof(addr)); - addr.sun_family = AF_UNIX; - strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)); - - if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { - close(ctdb->daemon.sd); - ctdb->daemon.sd = -1; - return -1; - } - listen(ctdb->daemon.sd, 1); - - return 0; -} - -/* - delete the socket on exit - called on destruction of autofree context - */ -static int unlink_destructor(const char *name) -{ - unlink(name); - return 0; -} - -/* - start the protocol going -*/ -int ctdb_start(struct ctdb_context *ctdb) -{ - pid_t pid; - static int fd[2]; - int res; - struct fd_event *fde; - const char *domain_socket_name; - - /* generate a name to use for our local socket */ - ctdb->daemon.name = talloc_asprintf(ctdb, "%s.%s", CTDB_PATH, ctdb->address.address); - /* get rid of any old sockets */ - unlink(ctdb->daemon.name); - - /* create a unix domain stream socket to listen to */ - res = ux_socket_bind(ctdb); - if (res!=0) { - DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n")); - exit(10); - } - - res = pipe(&fd[0]); - if (res) { - DEBUG(0,(__location__ " Failed to open pipe for CTDB\n")); - exit(1); - } - pid = fork(); - if (pid==-1) { - DEBUG(0,(__location__ " Failed to fork CTDB daemon\n")); - exit(1); - } - - if (pid) { - close(fd[0]); - close(ctdb->daemon.sd); - ctdb->daemon.sd = -1; - return 0; - } - - block_signal(SIGPIPE); - - /* ensure the socket is deleted on exit of the daemon */ - domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name); - talloc_set_destructor(domain_socket_name, unlink_destructor); - - close(fd[1]); - - ctdb->ev = event_context_init(NULL); - fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]); - fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb); - ctdb_main_loop(ctdb); - - return 0; -} - -/* - allocate a packet for use in client<->daemon communication - */ -void *ctdbd_allocate_pkt(TALLOC_CTX *mem_ctx, size_t len) -{ - int size; - - size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1); - return talloc_size(mem_ctx, size); -} - -/* - called when a CTDB_REQ_FINISHED packet comes in -*/ -void ctdb_request_finished(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) -{ - ctdb->num_finished++; -} |