diff options
Diffstat (limited to 'source4/cluster/ctdb/common/ctdb_daemon.c')
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_daemon.c | 500 |
1 files changed, 290 insertions, 210 deletions
diff --git a/source4/cluster/ctdb/common/ctdb_daemon.c b/source4/cluster/ctdb/common/ctdb_daemon.c index 945030d77e..ff3431a392 100644 --- a/source4/cluster/ctdb/common/ctdb_daemon.c +++ b/source4/cluster/ctdb/common/ctdb_daemon.c @@ -25,9 +25,23 @@ #include "lib/util/dlinklist.h" #include "system/network.h" #include "system/filesys.h" +#include "system/wait.h" #include "../include/ctdb.h" #include "../include/ctdb_private.h" +/* + structure describing a connected client in the daemon + */ +struct ctdb_client { + struct ctdb_context *ctdb; + int fd; + struct ctdb_queue *queue; +}; + + + +static void daemon_incoming_packet(void *, uint8_t *, uint32_t ); + static void ctdb_main_loop(struct ctdb_context *ctdb) { ctdb->methods->start(ctdb); @@ -35,7 +49,7 @@ static void ctdb_main_loop(struct ctdb_context *ctdb) /* go into a wait loop to allow other nodes to complete */ event_loop_wait(ctdb->ev); - printf("event_loop_wait() returned. this should not happen\n"); + DEBUG(0,("event_loop_wait() returned. this should not happen\n")); exit(1); } @@ -47,16 +61,27 @@ static void set_non_blocking(int fd) fcntl(fd, F_SETFL, v | O_NONBLOCK); } +static void block_signal(int signum) +{ + struct sigaction act; + + memset(&act, 0, sizeof(act)); + + act.sa_handler = SIG_IGN; + sigemptyset(&act.sa_mask); + sigaddset(&act.sa_mask, signum); + sigaction(signum, &act, NULL); +} + /* - structure describing a connected client in the daemon + send a packet to a client */ -struct ctdb_client { - struct ctdb_context *ctdb; - int fd; - struct ctdb_queue *queue; -}; - +static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr) +{ + client->ctdb->status.client_packets_sent++; + return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length); +} /* message handler for when we are in daemon mode. This redirects the message @@ -73,10 +98,9 @@ static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, len = offsetof(struct ctdb_req_message, data) + data.dsize; r = ctdbd_allocate_pkt(ctdb, len); -/*XXX cant use this since it returns an int CTDB_NO_MEMORY(ctdb, r);*/ talloc_set_name_const(r, "req_message packet"); - ZERO_STRUCT(*r); + memset(r, 0, offsetof(struct ctdb_req_message, data)); r->hdr.length = len; r->hdr.ctdb_magic = CTDB_MAGIC; @@ -85,11 +109,10 @@ static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, r->srvid = srvid; r->datalen = data.dsize; memcpy(&r->data[0], data.dptr, data.dsize); - - ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, len); + + daemon_queue_send(client, &r->hdr); talloc_free(r); - return; } @@ -105,187 +128,122 @@ static void daemon_request_register_message_handler(struct ctdb_client *client, c->srvid, daemon_message_handler, client); if (res != 0) { - printf("Failed to register handler %u in daemon\n", c->srvid); + DEBUG(0,(__location__ " Failed to register handler %u in daemon\n", + c->srvid)); + } else { + DEBUG(2,(__location__ " Registered message handler for srvid=%u\n", + c->srvid)); } } -static struct ctdb_call_state *ctdb_fetch_lock_send(struct ctdb_db_context *ctdb_db, - TALLOC_CTX *mem_ctx, - TDB_DATA key, TDB_DATA *data) +/* + called when the daemon gets a shutdown request from a client + */ +static void daemon_request_shutdown(struct ctdb_client *client, + struct ctdb_req_shutdown *f) { - struct ctdb_call *call; - struct ctdb_record_handle *rec; - struct ctdb_call_state *state; + struct ctdb_context *ctdb = talloc_get_type(client->ctdb, struct ctdb_context); + int len; + uint32_t node; - rec = talloc(mem_ctx, struct ctdb_record_handle); - CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec); + /* we dont send to ourself so we can already count one daemon as + exiting */ + ctdb->num_finished++; - - call = talloc(rec, struct ctdb_call); - ZERO_STRUCT(*call); - call->call_id = CTDB_FETCH_FUNC; - call->key = key; - call->flags = CTDB_IMMEDIATE_MIGRATION; + /* loop over all nodes of the cluster */ + for (node=0; node<ctdb->num_nodes;node++) { + struct ctdb_req_finished *rf; - rec->ctdb_db = ctdb_db; - rec->key = key; - rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize); - rec->data = data; + /* dont send a message to ourself */ + if (ctdb->vnn == node) { + continue; + } - state = ctdb_call_send(ctdb_db, call); - state->fetch_private = rec; + len = sizeof(struct ctdb_req_finished); + rf = ctdb->methods->allocate_pkt(ctdb, len); + CTDB_NO_MEMORY_FATAL(ctdb, rf); + talloc_set_name_const(rf, "ctdb_req_finished packet"); - return state; -} + ZERO_STRUCT(*rf); + rf->hdr.length = len; + rf->hdr.ctdb_magic = CTDB_MAGIC; + rf->hdr.ctdb_version = CTDB_VERSION; + rf->hdr.operation = CTDB_REQ_FINISHED; + rf->hdr.destnode = node; + rf->hdr.srcnode = ctdb->vnn; + rf->hdr.reqid = 0; -struct client_fetch_lock_data { - struct ctdb_client *client; - uint32_t reqid; -}; -static void daemon_fetch_lock_complete(struct ctdb_call_state *state) -{ - struct ctdb_reply_fetch_lock *r; - struct client_fetch_lock_data *data = talloc_get_type(state->async.private_data, struct client_fetch_lock_data); - struct ctdb_client *client = talloc_get_type(data->client, struct ctdb_client); - int length, res; + ctdb_queue_packet(ctdb, &(rf->hdr)); - length = offsetof(struct ctdb_reply_fetch_lock, data) + state->call.reply_data.dsize; - r = ctdbd_allocate_pkt(client->ctdb, length); - if (r == NULL) { - printf("Failed to allocate reply_call in ctdb daemon\n"); - return; + talloc_free(rf); } - ZERO_STRUCT(*r); - r->hdr.length = length; - r->hdr.ctdb_magic = CTDB_MAGIC; - r->hdr.ctdb_version = CTDB_VERSION; - r->hdr.operation = CTDB_REPLY_FETCH_LOCK; - r->hdr.reqid = data->reqid; - r->state = state->state; - r->datalen = state->call.reply_data.dsize; - memcpy(&r->data[0], state->call.reply_data.dptr, r->datalen); - res = ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, r->hdr.length); - if (res != 0) { - printf("Failed to queue packet from daemon to client\n"); + /* wait until all nodes have are prepared to shutdown */ + while (ctdb->num_finished != ctdb->num_nodes) { + event_loop_once(ctdb->ev); } - talloc_free(r); -} -/* - called when the daemon gets a fetch lock request from a client - */ -static void daemon_request_fetch_lock(struct ctdb_client *client, - struct ctdb_req_fetch_lock *f) -{ - struct ctdb_call_state *state; - TDB_DATA key, *data; - struct ctdb_db_context *ctdb_db; - struct client_fetch_lock_data *fl_data; - - ctdb_db = find_ctdb_db(client->ctdb, f->db_id); - - key.dsize = f->keylen; - key.dptr = &f->key[0]; - - data = talloc(client, TDB_DATA); - data->dptr = NULL; - data->dsize = 0; + /* all daemons have requested to finish - we now exit */ + DEBUG(1,("All daemons finished - exiting\n")); + _exit(0); +} - state = ctdb_fetch_lock_send(ctdb_db, client, key, data); - talloc_steal(state, data); - fl_data = talloc(state, struct client_fetch_lock_data); - fl_data->client = client; - fl_data->reqid = f->hdr.reqid; - state->async.fn = daemon_fetch_lock_complete; - state->async.private_data = fl_data; -} /* - called when the daemon gets a store unlock request from a client - - this would never block? + called when the daemon gets a connect wait request from a client */ -static void daemon_request_store_unlock(struct ctdb_client *client, - struct ctdb_req_store_unlock *f) +static void daemon_request_connect_wait(struct ctdb_client *client, + struct ctdb_req_connect_wait *c) { - struct ctdb_db_context *ctdb_db; - struct ctdb_reply_store_unlock r; - uint32_t caller = ctdb_get_vnn(client->ctdb); - struct ctdb_ltdb_header header; - TDB_DATA key, data; + struct ctdb_reply_connect_wait r; int res; - ctdb_db = find_ctdb_db(client->ctdb, f->db_id); - - /* write the data to ltdb */ - key.dsize = f->keylen; - key.dptr = &f->data[0]; - res = ctdb_ltdb_fetch(ctdb_db, key, &header, NULL, NULL); - if (res) { - ctdb_set_error(ctdb_db->ctdb, "Fetch of locally held record failed"); - res = -1; - goto done; - } - if (header.laccessor != caller) { - header.lacount = 0; - } - header.laccessor = caller; - header.lacount++; - data.dsize = f->datalen; - data.dptr = &f->data[f->keylen]; - res = ctdb_ltdb_store(ctdb_db, key, &header, data); - if ( res != 0) { - ctdb_set_error(ctdb_db->ctdb, "ctdb_call tdb_store failed\n"); - } - + /* first wait - in the daemon */ + ctdb_daemon_connect_wait(client->ctdb); -done: /* now send the reply */ ZERO_STRUCT(r); r.hdr.length = sizeof(r); r.hdr.ctdb_magic = CTDB_MAGIC; r.hdr.ctdb_version = CTDB_VERSION; - r.hdr.operation = CTDB_REPLY_STORE_UNLOCK; - r.hdr.reqid = f->hdr.reqid; - r.state = res; + r.hdr.operation = CTDB_REPLY_CONNECT_WAIT; + r.vnn = ctdb_get_vnn(client->ctdb); + r.num_connected = client->ctdb->num_connected; - res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length); + res = daemon_queue_send(client, &r.hdr); if (res != 0) { - printf("Failed to queue a store unlock response\n"); + DEBUG(0,(__location__ " Failed to queue a connect wait response\n")); return; } } + /* - called when the daemon gets a connect wait request from a client + called when the daemon gets a status request from a client */ -static void daemon_request_connect_wait(struct ctdb_client *client, - struct ctdb_req_connect_wait *c) +static void daemon_request_status(struct ctdb_client *client, + struct ctdb_req_status *c) { - struct ctdb_reply_connect_wait r; + struct ctdb_reply_status r; int res; - /* first wait - in the daemon */ - ctdb_daemon_connect_wait(client->ctdb); - /* now send the reply */ ZERO_STRUCT(r); r.hdr.length = sizeof(r); r.hdr.ctdb_magic = CTDB_MAGIC; r.hdr.ctdb_version = CTDB_VERSION; - r.hdr.operation = CTDB_REPLY_CONNECT_WAIT; - r.vnn = ctdb_get_vnn(client->ctdb); - r.num_connected = client->ctdb->num_connected; + r.hdr.operation = CTDB_REPLY_STATUS; + r.hdr.reqid = c->hdr.reqid; + r.status = client->ctdb->status; - res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length); + res = daemon_queue_send(client, &r.hdr); if (res != 0) { - printf("Failed to queue a connect wait response\n"); + DEBUG(0,(__location__ " Failed to queue a connect wait response\n")); return; } } @@ -323,11 +281,69 @@ static void daemon_request_message_from_client(struct ctdb_client *client, res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode, c->srvid, data); if (res != 0) { - printf("Failed to send message to remote node %u\n", - c->hdr.destnode); + DEBUG(0,(__location__ " Failed to send message to remote node %u\n", + c->hdr.destnode)); + } +} + + +struct daemon_call_state { + struct ctdb_client *client; + uint32_t reqid; + struct ctdb_call *call; + struct timeval start_time; +}; + +/* + complete a call from a client +*/ +static void daemon_call_from_client_callback(struct ctdb_call_state *state) +{ + struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, + struct daemon_call_state); + struct ctdb_reply_call *r; + int res; + uint32_t length; + struct ctdb_client *client = dstate->client; + + talloc_steal(client, dstate); + talloc_steal(dstate, dstate->call); + + res = ctdb_daemon_call_recv(state, dstate->call); + if (res != 0) { + DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n")); + client->ctdb->status.pending_calls--; + ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time); + return; } + + length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize; + r = ctdbd_allocate_pkt(dstate, length); + if (r == NULL) { + DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n")); + client->ctdb->status.pending_calls--; + ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time); + return; + } + memset(r, 0, offsetof(struct ctdb_reply_call, data)); + r->hdr.length = length; + r->hdr.ctdb_magic = CTDB_MAGIC; + r->hdr.ctdb_version = CTDB_VERSION; + r->hdr.operation = CTDB_REPLY_CALL; + r->hdr.reqid = dstate->reqid; + r->datalen = dstate->call->reply_data.dsize; + memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen); + + res = daemon_queue_send(client, &r->hdr); + if (res != 0) { + DEBUG(0, (__location__ "Failed to queue packet from daemon to client\n")); + } + ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time); + talloc_free(dstate); + client->ctdb->status.pending_calls--; } + /* this is called when the ctdb daemon received a ctdb request call from a local client over the unix domain socket @@ -337,103 +353,159 @@ static void daemon_request_call_from_client(struct ctdb_client *client, { struct ctdb_call_state *state; struct ctdb_db_context *ctdb_db; - struct ctdb_call call; - struct ctdb_reply_call *r; - int res; - uint32_t length; + struct daemon_call_state *dstate; + struct ctdb_call *call; + struct ctdb_ltdb_header header; + TDB_DATA key, data; + int ret; + struct ctdb_context *ctdb = client->ctdb; + + ctdb->status.total_calls++; + ctdb->status.pending_calls++; ctdb_db = find_ctdb_db(client->ctdb, c->db_id); if (!ctdb_db) { - printf("Unknown database in request. db_id==0x%08x",c->db_id); + DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x", + c->db_id)); + ctdb->status.pending_calls--; return; } - ZERO_STRUCT(call); - call.call_id = c->callid; - call.key.dptr = c->data; - call.key.dsize = c->keylen; - call.call_data.dptr = c->data + c->keylen; - call.call_data.dsize = c->calldatalen; + key.dptr = c->data; + key.dsize = c->keylen; - state = ctdb_call_send(ctdb_db, &call); + ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, + (struct ctdb_req_header *)c, &data, + daemon_incoming_packet, client); + if (ret == -2) { + /* will retry later */ + ctdb->status.pending_calls--; + return; + } -/* XXX this must be converted to fully async */ - res = ctdb_call_recv(state, &call); - if (res != 0) { - printf("ctdbd_call_recv() returned error\n"); - exit(1); + if (ret != 0) { + DEBUG(0,(__location__ " Unable to fetch record\n")); + ctdb->status.pending_calls--; + return; } - length = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize; - r = ctdbd_allocate_pkt(client->ctdb, length); - if (r == NULL) { - printf("Failed to allocate reply_call in ctdb daemon\n"); + dstate = talloc(client, struct daemon_call_state); + if (dstate == NULL) { + ctdb_ltdb_unlock(ctdb_db, key); + DEBUG(0,(__location__ " Unable to allocate dstate\n")); + ctdb->status.pending_calls--; + return; + } + dstate->start_time = timeval_current(); + dstate->client = client; + dstate->reqid = c->hdr.reqid; + talloc_steal(dstate, data.dptr); + + call = dstate->call = talloc_zero(dstate, struct ctdb_call); + if (call == NULL) { + ctdb_ltdb_unlock(ctdb_db, key); + DEBUG(0,(__location__ " Unable to allocate call\n")); + ctdb->status.pending_calls--; + ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time); return; } - ZERO_STRUCT(*r); - r->hdr.length = length; - r->hdr.ctdb_magic = CTDB_MAGIC; - r->hdr.ctdb_version = CTDB_VERSION; - r->hdr.operation = CTDB_REPLY_CALL; - r->hdr.reqid = c->hdr.reqid; - r->datalen = call.reply_data.dsize; - memcpy(&r->data[0], call.reply_data.dptr, r->datalen); - res = ctdb_queue_send(client->queue, (uint8_t *)&r, r->hdr.length); - if (res != 0) { - printf("Failed to queue packet from daemon to client\n"); + call->call_id = c->callid; + call->key = key; + call->call_data.dptr = c->data + c->keylen; + call->call_data.dsize = c->calldatalen; + call->flags = c->flags; + + if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { + state = ctdb_call_local_send(ctdb_db, call, &header, &data); + } else { + state = ctdb_daemon_call_send_remote(ctdb_db, call, &header); } - talloc_free(r); -} + ctdb_ltdb_unlock(ctdb_db, key); + + if (state == NULL) { + DEBUG(0,(__location__ " Unable to setup call send\n")); + ctdb->status.pending_calls--; + ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time); + return; + } + talloc_steal(state, dstate); + talloc_steal(client, state); + + state->async.fn = daemon_call_from_client_callback; + state->async.private_data = dstate; +} /* data contains a packet from the client */ -static void client_incoming_packet(struct ctdb_client *client, void *data, size_t nread) +static void daemon_incoming_packet(void *p, uint8_t *data, uint32_t nread) { - struct ctdb_req_header *hdr = data; + struct ctdb_req_header *hdr = (struct ctdb_req_header *)data; + struct ctdb_client *client = talloc_get_type(p, struct ctdb_client); + TALLOC_CTX *tmp_ctx; + struct ctdb_context *ctdb = client->ctdb; + + /* place the packet as a child of a tmp_ctx. We then use + talloc_free() below to free it. If any of the calls want + to keep it, then they will steal it somewhere else, and the + talloc_free() will be a no-op */ + tmp_ctx = talloc_new(client); + talloc_steal(tmp_ctx, hdr); if (hdr->ctdb_magic != CTDB_MAGIC) { - ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n"); + ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n"); goto done; } if (hdr->ctdb_version != CTDB_VERSION) { - ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); + ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version); goto done; } switch (hdr->operation) { case CTDB_REQ_CALL: + ctdb->status.client.req_call++; daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr); break; case CTDB_REQ_REGISTER: + ctdb->status.client.req_register++; daemon_request_register_message_handler(client, (struct ctdb_req_register *)hdr); break; case CTDB_REQ_MESSAGE: + ctdb->status.client.req_message++; daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr); break; case CTDB_REQ_CONNECT_WAIT: + ctdb->status.client.req_connect_wait++; daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr); break; - case CTDB_REQ_FETCH_LOCK: - daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr); + + case CTDB_REQ_SHUTDOWN: + ctdb->status.client.req_shutdown++; + daemon_request_shutdown(client, (struct ctdb_req_shutdown *)hdr); break; - case CTDB_REQ_STORE_UNLOCK: - daemon_request_store_unlock(client, (struct ctdb_req_store_unlock *)hdr); + + case CTDB_REQ_STATUS: + ctdb->status.client.req_status++; + daemon_request_status(client, (struct ctdb_req_status *)hdr); break; + default: - printf("daemon: unrecognized operation:%d\n",hdr->operation); + DEBUG(0,(__location__ " daemon: unrecognized operation %d\n", + hdr->operation)); } done: - talloc_free(data); + talloc_free(tmp_ctx); } - -static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) +/* + called when the daemon gets a incoming packet + */ +static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args) { struct ctdb_client *client = talloc_get_type(args, struct ctdb_client); struct ctdb_req_header *hdr; @@ -443,13 +515,15 @@ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) return; } + client->ctdb->status.client_packets_recv++; + if (cnt < sizeof(*hdr)) { - ctdb_set_error(client->ctdb, "Bad packet length %d\n", cnt); + ctdb_set_error(client->ctdb, "Bad packet length %d in daemon\n", cnt); return; } hdr = (struct ctdb_req_header *)data; if (cnt != hdr->length) { - ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n", + ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n in daemon", hdr->length, cnt); return; } @@ -460,12 +534,16 @@ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) } if (hdr->ctdb_version != CTDB_VERSION) { - ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); + ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version); return; } + DEBUG(3,(__location__ " client request %d of type %d length %d from " + "node %d to %d\n", hdr->reqid, hdr->operation, hdr->length, + hdr->srcnode, hdr->destnode)); + /* it is the responsibility of the incoming packet function to free 'data' */ - client_incoming_packet(client, data, cnt); + daemon_incoming_packet(client, data, cnt); } static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, @@ -490,7 +568,7 @@ static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, client->fd = fd; client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, - ctdb_client_read_cb, client); + ctdb_daemon_read_cb, client); talloc_set_destructor(client, ctdb_client_destructor); } @@ -507,10 +585,10 @@ static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde /* XXX this is a good place to try doing some cleaning up before exiting */ cnt = read(*fd, &buf, 1); if (cnt==0) { - printf("parent process exited. filedescriptor dissappeared\n"); + DEBUG(2,(__location__ " parent process exited. filedescriptor dissappeared\n")); exit(1); } else { - printf("ctdb: did not expect data from parent process\n"); + DEBUG(0,(__location__ " ctdb: did not expect data from parent process\n")); exit(1); } } @@ -559,7 +637,7 @@ static int unlink_destructor(const char *name) /* start the protocol going */ -int ctdbd_start(struct ctdb_context *ctdb) +int ctdb_start(struct ctdb_context *ctdb) { pid_t pid; static int fd[2]; @@ -575,18 +653,18 @@ int ctdbd_start(struct ctdb_context *ctdb) /* create a unix domain stream socket to listen to */ res = ux_socket_bind(ctdb); if (res!=0) { - printf("Failed to open CTDB unix domain socket\n"); + DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n")); exit(10); } res = pipe(&fd[0]); if (res) { - printf("Failed to open pipe for CTDB\n"); + DEBUG(0,(__location__ " Failed to open pipe for CTDB\n")); exit(1); } pid = fork(); if (pid==-1) { - printf("Failed to fork CTDB daemon\n"); + DEBUG(0,(__location__ " Failed to fork CTDB daemon\n")); exit(1); } @@ -597,12 +675,14 @@ int ctdbd_start(struct ctdb_context *ctdb) return 0; } + block_signal(SIGPIPE); + /* ensure the socket is deleted on exit of the daemon */ domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name); talloc_set_destructor(domain_socket_name, unlink_destructor); close(fd[1]); - ctdb_clear_flags(ctdb, CTDB_FLAG_DAEMON_MODE); + ctdb->ev = event_context_init(NULL); fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]); fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb); @@ -614,18 +694,18 @@ int ctdbd_start(struct ctdb_context *ctdb) /* allocate a packet for use in client<->daemon communication */ -void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len) +void *ctdbd_allocate_pkt(TALLOC_CTX *mem_ctx, size_t len) { int size; size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1); - return talloc_size(ctdb, size); + return talloc_size(mem_ctx, size); } -int ctdb_daemon_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, - ctdb_message_fn_t handler, - void *private_data) +/* + called when a CTDB_REQ_FINISHED packet comes in +*/ +void ctdb_request_finished(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { - return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data); + ctdb->num_finished++; } - |