diff options
Diffstat (limited to 'source4/cluster/ctdb/server')
-rw-r--r-- | source4/cluster/ctdb/server/ctdb_call.c | 737 | ||||
-rw-r--r-- | source4/cluster/ctdb/server/ctdb_control.c | 499 | ||||
-rw-r--r-- | source4/cluster/ctdb/server/ctdb_daemon.c | 927 | ||||
-rw-r--r-- | source4/cluster/ctdb/server/ctdb_freeze.c | 256 | ||||
-rw-r--r-- | source4/cluster/ctdb/server/ctdb_lockwait.c | 165 | ||||
-rw-r--r-- | source4/cluster/ctdb/server/ctdb_ltdb_server.c | 366 | ||||
-rw-r--r-- | source4/cluster/ctdb/server/ctdb_monitor.c | 227 | ||||
-rw-r--r-- | source4/cluster/ctdb/server/ctdb_recover.c | 680 | ||||
-rw-r--r-- | source4/cluster/ctdb/server/ctdb_recoverd.c | 1511 | ||||
-rw-r--r-- | source4/cluster/ctdb/server/ctdb_server.c | 469 | ||||
-rw-r--r-- | source4/cluster/ctdb/server/ctdb_takeover.c | 822 | ||||
-rw-r--r-- | source4/cluster/ctdb/server/ctdb_traverse.c | 463 | ||||
-rw-r--r-- | source4/cluster/ctdb/server/ctdb_tunables.c | 163 | ||||
-rw-r--r-- | source4/cluster/ctdb/server/ctdbd.c | 229 | ||||
-rw-r--r-- | source4/cluster/ctdb/server/eventscript.c | 191 |
15 files changed, 7705 insertions, 0 deletions
diff --git a/source4/cluster/ctdb/server/ctdb_call.c b/source4/cluster/ctdb/server/ctdb_call.c new file mode 100644 index 0000000000..bbe07717ed --- /dev/null +++ b/source4/cluster/ctdb/server/ctdb_call.c @@ -0,0 +1,737 @@ +/* + ctdb_call protocol code + + Copyright (C) Andrew Tridgell 2006 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ +/* + see http://wiki.samba.org/index.php/Samba_%26_Clustering for + protocol design and packet details +*/ +#include "includes.h" +#include "lib/events/events.h" +#include "lib/tdb/include/tdb.h" +#include "lib/util/dlinklist.h" +#include "system/network.h" +#include "system/filesys.h" +#include "../include/ctdb_private.h" + +/* + find the ctdb_db from a db index + */ + struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id) +{ + struct ctdb_db_context *ctdb_db; + + for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) { + if (ctdb_db->db_id == id) { + break; + } + } + return ctdb_db; +} + + +/* + a varient of input packet that can be used in lock requeue +*/ +static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr) +{ + struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context); + ctdb_input_pkt(ctdb, hdr); +} + + +/* + send an error reply +*/ +static void ctdb_send_error(struct ctdb_context *ctdb, + struct ctdb_req_header *hdr, uint32_t status, + const char *fmt, ...) PRINTF_ATTRIBUTE(4,5); +static void ctdb_send_error(struct ctdb_context *ctdb, + struct ctdb_req_header *hdr, uint32_t status, + const char *fmt, ...) +{ + va_list ap; + struct ctdb_reply_error *r; + char *msg; + int msglen, len; + + va_start(ap, fmt); + msg = talloc_vasprintf(ctdb, fmt, ap); + if (msg == NULL) { + ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n"); + } + va_end(ap); + + msglen = strlen(msg)+1; + len = offsetof(struct ctdb_reply_error, msg); + r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen, + struct ctdb_reply_error); + CTDB_NO_MEMORY_FATAL(ctdb, r); + + r->hdr.destnode = hdr->srcnode; + r->hdr.reqid = hdr->reqid; + r->status = status; + r->msglen = msglen; + memcpy(&r->msg[0], msg, msglen); + + ctdb_queue_packet(ctdb, &r->hdr); + + talloc_free(msg); +} + + +/* + send a redirect reply +*/ +static void ctdb_call_send_redirect(struct ctdb_context *ctdb, + TDB_DATA key, + struct ctdb_req_call *c, + struct ctdb_ltdb_header *header) +{ + + uint32_t lmaster = ctdb_lmaster(ctdb, &key); + if (ctdb->vnn == lmaster) { + c->hdr.destnode = header->dmaster; + } else if ((c->hopcount % ctdb->tunable.max_redirect_count) == 0) { + c->hdr.destnode = lmaster; + } else { + c->hdr.destnode = header->dmaster; + } + c->hopcount++; + ctdb_queue_packet(ctdb, &c->hdr); +} + + +/* + send a dmaster reply + + caller must have the chainlock before calling this routine. Caller must be + the lmaster +*/ +static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db, + struct ctdb_ltdb_header *header, + TDB_DATA key, TDB_DATA data, + uint32_t new_dmaster, + uint32_t reqid) +{ + struct ctdb_context *ctdb = ctdb_db->ctdb; + struct ctdb_reply_dmaster *r; + int ret, len; + TALLOC_CTX *tmp_ctx; + + if (ctdb->vnn != ctdb_lmaster(ctdb, &key)) { + DEBUG(0,(__location__ " Caller is not lmaster!\n")); + return; + } + + header->dmaster = new_dmaster; + ret = ctdb_ltdb_store(ctdb_db, key, header, data); + if (ret != 0) { + ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster"); + return; + } + + /* put the packet on a temporary context, allowing us to safely free + it below even if ctdb_reply_dmaster() has freed it already */ + tmp_ctx = talloc_new(ctdb); + + /* send the CTDB_REPLY_DMASTER */ + len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize; + r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len, + struct ctdb_reply_dmaster); + CTDB_NO_MEMORY_FATAL(ctdb, r); + + r->hdr.destnode = new_dmaster; + r->hdr.reqid = reqid; + r->rsn = header->rsn; + r->keylen = key.dsize; + r->datalen = data.dsize; + r->db_id = ctdb_db->db_id; + memcpy(&r->data[0], key.dptr, key.dsize); + memcpy(&r->data[key.dsize], data.dptr, data.dsize); + + ctdb_queue_packet(ctdb, &r->hdr); + + talloc_free(tmp_ctx); +} + +/* + send a dmaster request (give another node the dmaster for a record) + + This is always sent to the lmaster, which ensures that the lmaster + always knows who the dmaster is. The lmaster will then send a + CTDB_REPLY_DMASTER to the new dmaster +*/ +static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, + struct ctdb_req_call *c, + struct ctdb_ltdb_header *header, + TDB_DATA *key, TDB_DATA *data) +{ + struct ctdb_req_dmaster *r; + struct ctdb_context *ctdb = ctdb_db->ctdb; + int len; + uint32_t lmaster = ctdb_lmaster(ctdb, key); + + if (lmaster == ctdb->vnn) { + ctdb_send_dmaster_reply(ctdb_db, header, *key, *data, + c->hdr.srcnode, c->hdr.reqid); + return; + } + + len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize; + r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len, + struct ctdb_req_dmaster); + CTDB_NO_MEMORY_FATAL(ctdb, r); + r->hdr.destnode = lmaster; + r->hdr.reqid = c->hdr.reqid; + r->db_id = c->db_id; + r->rsn = header->rsn; + r->dmaster = c->hdr.srcnode; + r->keylen = key->dsize; + r->datalen = data->dsize; + memcpy(&r->data[0], key->dptr, key->dsize); + memcpy(&r->data[key->dsize], data->dptr, data->dsize); + + header->dmaster = c->hdr.srcnode; + if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) { + ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster"); + } + + ctdb_queue_packet(ctdb, &r->hdr); + + talloc_free(r); +} + +/* + called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster + gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster. + + must be called with the chainlock held. This function releases the chainlock +*/ +static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db, + uint32_t reqid, TDB_DATA key, TDB_DATA data, + uint64_t rsn) +{ + struct ctdb_call_state *state; + struct ctdb_context *ctdb = ctdb_db->ctdb; + struct ctdb_ltdb_header header; + + DEBUG(2,("vnn %u dmaster response %08x\n", ctdb->vnn, ctdb_hash(&key))); + + ZERO_STRUCT(header); + header.rsn = rsn + 1; + header.dmaster = ctdb->vnn; + + if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) { + ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n"); + ctdb_ltdb_unlock(ctdb_db, key); + return; + } + + state = ctdb_reqid_find(ctdb, reqid, struct ctdb_call_state); + + if (state == NULL) { + DEBUG(0,("vnn %u Invalid reqid %u in ctdb_become_dmaster\n", + ctdb->vnn, reqid)); + ctdb_ltdb_unlock(ctdb_db, key); + return; + } + + if (reqid != state->reqid) { + /* we found a record but it was the wrong one */ + DEBUG(0, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n",reqid)); + ctdb_ltdb_unlock(ctdb_db, key); + return; + } + + ctdb_call_local(ctdb_db, &state->call, &header, state, &data, ctdb->vnn); + + ctdb_ltdb_unlock(ctdb_db, state->call.key); + + state->state = CTDB_CALL_DONE; + if (state->async.fn) { + state->async.fn(state); + } +} + + + +/* + called when a CTDB_REQ_DMASTER packet comes in + + this comes into the lmaster for a record when the current dmaster + wants to give up the dmaster role and give it to someone else +*/ +void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr; + TDB_DATA key, data, data2; + struct ctdb_ltdb_header header; + struct ctdb_db_context *ctdb_db; + int ret; + + key.dptr = c->data; + key.dsize = c->keylen; + data.dptr = c->data + c->keylen; + data.dsize = c->datalen; + + ctdb_db = find_ctdb_db(ctdb, c->db_id); + if (!ctdb_db) { + ctdb_send_error(ctdb, hdr, -1, + "Unknown database in request. db_id==0x%08x", + c->db_id); + return; + } + + /* fetch the current record */ + ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2, + ctdb_call_input_pkt, ctdb, False); + if (ret == -1) { + ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record"); + return; + } + if (ret == -2) { + DEBUG(2,(__location__ " deferring ctdb_request_dmaster\n")); + return; + } + + if (ctdb_lmaster(ctdb, &key) != ctdb->vnn) { + DEBUG(0,("vnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n", + ctdb->vnn, ctdb_lmaster(ctdb, &key), + hdr->generation, ctdb->vnn_map->generation)); + ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster"); + } + + DEBUG(2,("vnn %u dmaster request on %08x for %u from %u\n", + ctdb->vnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode)); + + /* its a protocol error if the sending node is not the current dmaster */ + if (header.dmaster != hdr->srcnode) { + DEBUG(0,("vnn %u dmaster request non-master %u dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u\n", + ctdb->vnn, hdr->srcnode, header.dmaster, ctdb_hash(&key), + ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation)); + ctdb_fatal(ctdb, "ctdb_req_dmaster from non-master"); + return; + } + + /* use the rsn from the sending node */ + header.rsn = c->rsn; + + /* check if the new dmaster is the lmaster, in which case we + skip the dmaster reply */ + if (c->dmaster == ctdb->vnn) { + ctdb_become_dmaster(ctdb_db, hdr->reqid, key, data, c->rsn); + } else { + ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid); + ctdb_ltdb_unlock(ctdb_db, key); + } +} + + +/* + called when a CTDB_REQ_CALL packet comes in +*/ +void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_req_call *c = (struct ctdb_req_call *)hdr; + TDB_DATA data; + struct ctdb_reply_call *r; + int ret, len; + struct ctdb_ltdb_header header; + struct ctdb_call call; + struct ctdb_db_context *ctdb_db; + + ctdb_db = find_ctdb_db(ctdb, c->db_id); + if (!ctdb_db) { + ctdb_send_error(ctdb, hdr, -1, + "Unknown database in request. db_id==0x%08x", + c->db_id); + return; + } + + call.call_id = c->callid; + call.key.dptr = c->data; + call.key.dsize = c->keylen; + call.call_data.dptr = c->data + c->keylen; + call.call_data.dsize = c->calldatalen; + + /* determine if we are the dmaster for this key. This also + fetches the record data (if any), thus avoiding a 2nd fetch of the data + if the call will be answered locally */ + + ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call.key, &header, hdr, &data, + ctdb_call_input_pkt, ctdb, False); + if (ret == -1) { + ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call"); + return; + } + if (ret == -2) { + DEBUG(2,(__location__ " deferred ctdb_request_call\n")); + return; + } + + /* if we are not the dmaster, then send a redirect to the + requesting node */ + if (header.dmaster != ctdb->vnn) { + talloc_free(data.dptr); + ctdb_call_send_redirect(ctdb, call.key, c, &header); + ctdb_ltdb_unlock(ctdb_db, call.key); + return; + } + + if (c->hopcount > ctdb->statistics.max_hop_count) { + ctdb->statistics.max_hop_count = c->hopcount; + } + + /* if this nodes has done enough consecutive calls on the same record + then give them the record + or if the node requested an immediate migration + */ + if ( c->hdr.srcnode != ctdb->vnn && + ((header.laccessor == c->hdr.srcnode + && header.lacount >= ctdb->tunable.max_lacount) + || (c->flags & CTDB_IMMEDIATE_MIGRATION)) ) { + DEBUG(2,("vnn %u starting migration of %08x to %u\n", + ctdb->vnn, ctdb_hash(&call.key), c->hdr.srcnode)); + ctdb_call_send_dmaster(ctdb_db, c, &header, &call.key, &data); + talloc_free(data.dptr); + ctdb_ltdb_unlock(ctdb_db, call.key); + return; + } + + ctdb_call_local(ctdb_db, &call, &header, hdr, &data, c->hdr.srcnode); + + ctdb_ltdb_unlock(ctdb_db, call.key); + + len = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize; + r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, + struct ctdb_reply_call); + CTDB_NO_MEMORY_FATAL(ctdb, r); + r->hdr.destnode = hdr->srcnode; + r->hdr.reqid = hdr->reqid; + r->status = call.status; + r->datalen = call.reply_data.dsize; + if (call.reply_data.dsize) { + memcpy(&r->data[0], call.reply_data.dptr, call.reply_data.dsize); + } + + ctdb_queue_packet(ctdb, &r->hdr); + + talloc_free(r); +} + +/* + called when a CTDB_REPLY_CALL packet comes in + + This packet comes in response to a CTDB_REQ_CALL request packet. It + contains any reply data from the call +*/ +void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr; + struct ctdb_call_state *state; + + state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state); + if (state == NULL) { + DEBUG(0, (__location__ " reqid %u not found\n", hdr->reqid)); + return; + } + + if (hdr->reqid != state->reqid) { + /* we found a record but it was the wrong one */ + DEBUG(0, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid)); + return; + } + + state->call.reply_data.dptr = c->data; + state->call.reply_data.dsize = c->datalen; + state->call.status = c->status; + + talloc_steal(state, c); + + state->state = CTDB_CALL_DONE; + if (state->async.fn) { + state->async.fn(state); + } +} + + +/* + called when a CTDB_REPLY_DMASTER packet comes in + + This packet comes in from the lmaster response to a CTDB_REQ_CALL + request packet. It means that the current dmaster wants to give us + the dmaster role +*/ +void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr; + struct ctdb_db_context *ctdb_db; + TDB_DATA key, data; + int ret; + + ctdb_db = find_ctdb_db(ctdb, c->db_id); + if (ctdb_db == NULL) { + DEBUG(0,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id)); + return; + } + + key.dptr = c->data; + key.dsize = c->keylen; + data.dptr = &c->data[key.dsize]; + data.dsize = c->datalen; + + ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, + ctdb_call_input_pkt, ctdb, False); + if (ret == -2) { + return; + } + if (ret != 0) { + DEBUG(0,(__location__ " Failed to get lock in ctdb_reply_dmaster\n")); + return; + } + + ctdb_become_dmaster(ctdb_db, hdr->reqid, key, data, c->rsn); +} + + +/* + called when a CTDB_REPLY_ERROR packet comes in +*/ +void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr; + struct ctdb_call_state *state; + + state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state); + if (state == NULL) { + DEBUG(0,("vnn %u Invalid reqid %u in ctdb_reply_error\n", + ctdb->vnn, hdr->reqid)); + return; + } + + if (hdr->reqid != state->reqid) { + /* we found a record but it was the wrong one */ + DEBUG(0, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid)); + return; + } + + talloc_steal(state, c); + + state->state = CTDB_CALL_ERROR; + state->errmsg = (char *)c->msg; + if (state->async.fn) { + state->async.fn(state); + } +} + + +/* + destroy a ctdb_call +*/ +static int ctdb_call_destructor(struct ctdb_call_state *state) +{ + DLIST_REMOVE(state->ctdb_db->ctdb->pending_calls, state); + ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid); + return 0; +} + + +/* + called when a ctdb_call needs to be resent after a reconfigure event +*/ +static void ctdb_call_resend(struct ctdb_call_state *state) +{ + struct ctdb_context *ctdb = state->ctdb_db->ctdb; + + state->generation = ctdb->vnn_map->generation; + + /* use a new reqid, in case the old reply does eventually come in */ + ctdb_reqid_remove(ctdb, state->reqid); + state->reqid = ctdb_reqid_new(ctdb, state); + state->c->hdr.reqid = state->reqid; + + /* update the generation count for this request, so its valid with the new vnn_map */ + state->c->hdr.generation = state->generation; + + /* send the packet to ourselves, it will be redirected appropriately */ + state->c->hdr.destnode = ctdb->vnn; + + ctdb_queue_packet(ctdb, &state->c->hdr); + DEBUG(0,("resent ctdb_call\n")); +} + +/* + resend all pending calls on recovery + */ +void ctdb_call_resend_all(struct ctdb_context *ctdb) +{ + struct ctdb_call_state *state, *next; + for (state=ctdb->pending_calls;state;state=next) { + next = state->next; + ctdb_call_resend(state); + } +} + +/* + this allows the caller to setup a async.fn +*/ +static void call_local_trigger(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state); + if (state->async.fn) { + state->async.fn(state); + } +} + + +/* + construct an event driven local ctdb_call + + this is used so that locally processed ctdb_call requests are processed + in an event driven manner +*/ +struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call, + struct ctdb_ltdb_header *header, + TDB_DATA *data) +{ + struct ctdb_call_state *state; + struct ctdb_context *ctdb = ctdb_db->ctdb; + int ret; + + state = talloc_zero(ctdb_db, struct ctdb_call_state); + CTDB_NO_MEMORY_NULL(ctdb, state); + + talloc_steal(state, data->dptr); + + state->state = CTDB_CALL_DONE; + state->call = *call; + state->ctdb_db = ctdb_db; + + ret = ctdb_call_local(ctdb_db, &state->call, header, state, data, ctdb->vnn); + + event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state); + + return state; +} + + +/* + make a remote ctdb call - async send. Called in daemon context. + + This constructs a ctdb_call request and queues it for processing. + This call never blocks. +*/ +struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call, + struct ctdb_ltdb_header *header) +{ + uint32_t len; + struct ctdb_call_state *state; + struct ctdb_context *ctdb = ctdb_db->ctdb; + + state = talloc_zero(ctdb_db, struct ctdb_call_state); + CTDB_NO_MEMORY_NULL(ctdb, state); + state->reqid = ctdb_reqid_new(ctdb, state); + state->ctdb_db = ctdb_db; + talloc_set_destructor(state, ctdb_call_destructor); + + len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize; + state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len, + struct ctdb_req_call); + CTDB_NO_MEMORY_NULL(ctdb, state->c); + state->c->hdr.destnode = header->dmaster; + + /* this limits us to 16k outstanding messages - not unreasonable */ + state->c->hdr.reqid = state->reqid; + state->c->flags = call->flags; + state->c->db_id = ctdb_db->db_id; + state->c->callid = call->call_id; + state->c->hopcount = 0; + state->c->keylen = call->key.dsize; + state->c->calldatalen = call->call_data.dsize; + memcpy(&state->c->data[0], call->key.dptr, call->key.dsize); + memcpy(&state->c->data[call->key.dsize], + call->call_data.dptr, call->call_data.dsize); + state->call = *call; + state->call.call_data.dptr = &state->c->data[call->key.dsize]; + state->call.key.dptr = &state->c->data[0]; + + state->state = CTDB_CALL_WAIT; + state->generation = ctdb->vnn_map->generation; + + DLIST_ADD(ctdb->pending_calls, state); + + ctdb_queue_packet(ctdb, &state->c->hdr); + + return state; +} + +/* + make a remote ctdb call - async recv - called in daemon context + + This is called when the program wants to wait for a ctdb_call to complete and get the + results. This call will block unless the call has already completed. +*/ +int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) +{ + while (state->state < CTDB_CALL_DONE) { + event_loop_once(state->ctdb_db->ctdb->ev); + } + if (state->state != CTDB_CALL_DONE) { + ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg); + talloc_free(state); + return -1; + } + + if (state->call.reply_data.dsize) { + call->reply_data.dptr = talloc_memdup(state->ctdb_db->ctdb, + state->call.reply_data.dptr, + state->call.reply_data.dsize); + call->reply_data.dsize = state->call.reply_data.dsize; + } else { + call->reply_data.dptr = NULL; + call->reply_data.dsize = 0; + } + call->status = state->call.status; + talloc_free(state); + return 0; +} + + +/* + send a keepalive packet to the other node +*/ +void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode) +{ + struct ctdb_req_keepalive *r; + + r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE, + sizeof(struct ctdb_req_keepalive), + struct ctdb_req_keepalive); + CTDB_NO_MEMORY_FATAL(ctdb, r); + r->hdr.destnode = destnode; + r->hdr.reqid = 0; + + ctdb->statistics.keepalive_packets_sent++; + + ctdb_queue_packet(ctdb, &r->hdr); + + talloc_free(r); +} diff --git a/source4/cluster/ctdb/server/ctdb_control.c b/source4/cluster/ctdb/server/ctdb_control.c new file mode 100644 index 0000000000..69848bb15c --- /dev/null +++ b/source4/cluster/ctdb/server/ctdb_control.c @@ -0,0 +1,499 @@ +/* + ctdb_control protocol code + + Copyright (C) Andrew Tridgell 2007 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ +#include "includes.h" +#include "lib/events/events.h" +#include "lib/tdb/include/tdb.h" +#include "system/network.h" +#include "system/filesys.h" +#include "system/wait.h" +#include "../include/ctdb_private.h" +#include "lib/util/dlinklist.h" +#include "db_wrap.h" + +struct ctdb_control_state { + struct ctdb_context *ctdb; + uint32_t reqid; + ctdb_control_callback_fn_t callback; + void *private_data; + unsigned flags; +}; + +/* + process a control request + */ +static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb, + struct ctdb_req_control *c, + TDB_DATA indata, + TDB_DATA *outdata, uint32_t srcnode, + const char **errormsg, + bool *async_reply) +{ + uint32_t opcode = c->opcode; + uint64_t srvid = c->srvid; + uint32_t client_id = c->client_id; + + switch (opcode) { + case CTDB_CONTROL_PROCESS_EXISTS: { + CHECK_CONTROL_DATA_SIZE(sizeof(pid_t)); + return kill(*(pid_t *)indata.dptr, 0); + } + + case CTDB_CONTROL_SET_DEBUG: { + CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t)); + LogLevel = *(uint32_t *)indata.dptr; + return 0; + } + + case CTDB_CONTROL_GET_DEBUG: { + CHECK_CONTROL_DATA_SIZE(0); + outdata->dptr = (uint8_t *)&LogLevel; + outdata->dsize = sizeof(LogLevel); + return 0; + } + + case CTDB_CONTROL_STATISTICS: { + CHECK_CONTROL_DATA_SIZE(0); + ctdb->statistics.memory_used = talloc_total_size(ctdb); + ctdb->statistics.frozen = (ctdb->freeze_mode == CTDB_FREEZE_FROZEN); + ctdb->statistics.recovering = (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE); + outdata->dptr = (uint8_t *)&ctdb->statistics; + outdata->dsize = sizeof(ctdb->statistics); + return 0; + } + + case CTDB_CONTROL_GET_ALL_TUNABLES: { + CHECK_CONTROL_DATA_SIZE(0); + outdata->dptr = (uint8_t *)&ctdb->tunable; + outdata->dsize = sizeof(ctdb->tunable); + return 0; + } + + case CTDB_CONTROL_DUMP_MEMORY: { + CHECK_CONTROL_DATA_SIZE(0); + talloc_report_full(ctdb, stdout); + return 0; + } + + case CTDB_CONTROL_STATISTICS_RESET: { + CHECK_CONTROL_DATA_SIZE(0); + ZERO_STRUCT(ctdb->statistics); + return 0; + } + + case CTDB_CONTROL_GETVNNMAP: + return ctdb_control_getvnnmap(ctdb, opcode, indata, outdata); + + case CTDB_CONTROL_GET_DBMAP: + return ctdb_control_getdbmap(ctdb, opcode, indata, outdata); + + case CTDB_CONTROL_GET_NODEMAP: + return ctdb_control_getnodemap(ctdb, opcode, indata, outdata); + + case CTDB_CONTROL_SETVNNMAP: + return ctdb_control_setvnnmap(ctdb, opcode, indata, outdata); + + case CTDB_CONTROL_PULL_DB: + CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_pulldb)); + return ctdb_control_pull_db(ctdb, indata, outdata); + + case CTDB_CONTROL_SET_DMASTER: + CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_set_dmaster)); + return ctdb_control_set_dmaster(ctdb, indata); + + case CTDB_CONTROL_PUSH_DB: + return ctdb_control_push_db(ctdb, indata); + + case CTDB_CONTROL_GET_RECMODE: { + return ctdb->recovery_mode; + } + + case CTDB_CONTROL_SET_RECMASTER: { + CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t)); + if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) { + DEBUG(0,("Attempt to set recmaster when not frozen\n")); + return -1; + } + ctdb->recovery_master = ((uint32_t *)(&indata.dptr[0]))[0]; + return 0; + } + + case CTDB_CONTROL_GET_RECMASTER: + return ctdb->recovery_master; + + case CTDB_CONTROL_GET_PID: + return getpid(); + + case CTDB_CONTROL_GET_VNN: + return ctdb->vnn; + + case CTDB_CONTROL_PING: + CHECK_CONTROL_DATA_SIZE(0); + return ctdb->statistics.num_clients; + + case CTDB_CONTROL_GET_DBNAME: { + uint32_t db_id; + struct ctdb_db_context *ctdb_db; + + CHECK_CONTROL_DATA_SIZE(sizeof(db_id)); + db_id = *(uint32_t *)indata.dptr; + ctdb_db = find_ctdb_db(ctdb, db_id); + if (ctdb_db == NULL) return -1; + outdata->dptr = discard_const(ctdb_db->db_name); + outdata->dsize = strlen(ctdb_db->db_name)+1; + return 0; + } + + case CTDB_CONTROL_GETDBPATH: { + uint32_t db_id; + struct ctdb_db_context *ctdb_db; + + CHECK_CONTROL_DATA_SIZE(sizeof(db_id)); + db_id = *(uint32_t *)indata.dptr; + ctdb_db = find_ctdb_db(ctdb, db_id); + if (ctdb_db == NULL) return -1; + outdata->dptr = discard_const(ctdb_db->db_path); + outdata->dsize = strlen(ctdb_db->db_path)+1; + return 0; + } + + case CTDB_CONTROL_DB_ATTACH: + return ctdb_control_db_attach(ctdb, indata, outdata); + + case CTDB_CONTROL_SET_CALL: { + struct ctdb_control_set_call *sc = + (struct ctdb_control_set_call *)indata.dptr; + CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_set_call)); + return ctdb_daemon_set_call(ctdb, sc->db_id, sc->fn, sc->id); + } + + case CTDB_CONTROL_TRAVERSE_START: + CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_traverse_start)); + return ctdb_control_traverse_start(ctdb, indata, outdata, srcnode); + + case CTDB_CONTROL_TRAVERSE_ALL: + return ctdb_control_traverse_all(ctdb, indata, outdata); + + case CTDB_CONTROL_TRAVERSE_DATA: + return ctdb_control_traverse_data(ctdb, indata, outdata); + + case CTDB_CONTROL_REGISTER_SRVID: + return daemon_register_message_handler(ctdb, client_id, srvid); + + case CTDB_CONTROL_DEREGISTER_SRVID: + return daemon_deregister_message_handler(ctdb, client_id, srvid); + + case CTDB_CONTROL_ENABLE_SEQNUM: + CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t)); + return ctdb_ltdb_enable_seqnum(ctdb, *(uint32_t *)indata.dptr); + + case CTDB_CONTROL_UPDATE_SEQNUM: + CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t)); + return ctdb_ltdb_update_seqnum(ctdb, *(uint32_t *)indata.dptr, srcnode); + + case CTDB_CONTROL_FREEZE: + CHECK_CONTROL_DATA_SIZE(0); + return ctdb_control_freeze(ctdb, c, async_reply); + + case CTDB_CONTROL_THAW: + CHECK_CONTROL_DATA_SIZE(0); + return ctdb_control_thaw(ctdb); + + case CTDB_CONTROL_SET_RECMODE: + CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t)); + return ctdb_control_set_recmode(ctdb, c, indata, async_reply, errormsg); + + case CTDB_CONTROL_SET_MONMODE: + CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t)); + ctdb->monitoring_mode = *(uint32_t *)indata.dptr; + return 0; + + case CTDB_CONTROL_GET_MONMODE: + return ctdb->monitoring_mode; + + case CTDB_CONTROL_SHUTDOWN: + ctdb_release_all_ips(ctdb); + ctdb->methods->shutdown(ctdb); + ctdb_event_script(ctdb, "shutdown"); + DEBUG(0,("shutting down\n")); + exit(0); + + case CTDB_CONTROL_MAX_RSN: + CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t)); + return ctdb_control_max_rsn(ctdb, indata, outdata); + + case CTDB_CONTROL_SET_RSN_NONEMPTY: + CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_set_rsn_nonempty)); + return ctdb_control_set_rsn_nonempty(ctdb, indata, outdata); + + case CTDB_CONTROL_TAKEOVER_IP: + CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_public_ip)); + return ctdb_control_takeover_ip(ctdb, c, indata, async_reply); + + case CTDB_CONTROL_RELEASE_IP: + CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_public_ip)); + return ctdb_control_release_ip(ctdb, c, indata, async_reply); + + case CTDB_CONTROL_GET_PUBLIC_IPS: + CHECK_CONTROL_DATA_SIZE(0); + return ctdb_control_get_public_ips(ctdb, c, outdata); + + case CTDB_CONTROL_DELETE_LOW_RSN: + CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_delete_low_rsn)); + return ctdb_control_delete_low_rsn(ctdb, indata, outdata); + + case CTDB_CONTROL_TCP_CLIENT: + CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_tcp)); + return ctdb_control_tcp_client(ctdb, client_id, srcnode, indata); + + case CTDB_CONTROL_STARTUP: + CHECK_CONTROL_DATA_SIZE(0); + return ctdb_control_startup(ctdb, srcnode); + + case CTDB_CONTROL_TCP_ADD: + CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_tcp_vnn)); + return ctdb_control_tcp_add(ctdb, indata); + + case CTDB_CONTROL_TCP_REMOVE: + CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_tcp_vnn)); + return ctdb_control_tcp_remove(ctdb, indata); + + case CTDB_CONTROL_SET_TUNABLE: + return ctdb_control_set_tunable(ctdb, indata); + + case CTDB_CONTROL_GET_TUNABLE: + return ctdb_control_get_tunable(ctdb, indata, outdata); + + case CTDB_CONTROL_LIST_TUNABLES: + return ctdb_control_list_tunables(ctdb, outdata); + + case CTDB_CONTROL_MODIFY_FLAGS: + CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_node_modflags)); + return ctdb_control_modflags(ctdb, indata); + + default: + DEBUG(0,(__location__ " Unknown CTDB control opcode %u\n", opcode)); + return -1; + } +} + + +/* + send a reply for a ctdb control + */ +void ctdb_request_control_reply(struct ctdb_context *ctdb, struct ctdb_req_control *c, + TDB_DATA *outdata, int32_t status, const char *errormsg) +{ + struct ctdb_reply_control *r; + size_t len; + + /* some controls send no reply */ + if (c->flags & CTDB_CTRL_FLAG_NOREPLY) { + return; + } + + len = offsetof(struct ctdb_reply_control, data) + (outdata?outdata->dsize:0); + if (errormsg) { + len += strlen(errormsg); + } + r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CONTROL, len, struct ctdb_reply_control); + CTDB_NO_MEMORY_VOID(ctdb, r); + + r->hdr.destnode = c->hdr.srcnode; + r->hdr.reqid = c->hdr.reqid; + r->status = status; + r->datalen = outdata?outdata->dsize:0; + if (outdata && outdata->dsize) { + memcpy(&r->data[0], outdata->dptr, outdata->dsize); + } + if (errormsg) { + r->errorlen = strlen(errormsg); + memcpy(&r->data[r->datalen], errormsg, r->errorlen); + } + + ctdb_queue_packet(ctdb, &r->hdr); + + talloc_free(r); +} + +/* + called when a CTDB_REQ_CONTROL packet comes in +*/ +void ctdb_request_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_req_control *c = (struct ctdb_req_control *)hdr; + TDB_DATA data, *outdata; + int32_t status; + bool async_reply = False; + const char *errormsg = NULL; + + data.dptr = &c->data[0]; + data.dsize = c->datalen; + + outdata = talloc_zero(c, TDB_DATA); + + status = ctdb_control_dispatch(ctdb, c, data, outdata, hdr->srcnode, + &errormsg, &async_reply); + + if (!async_reply) { + ctdb_request_control_reply(ctdb, c, outdata, status, errormsg); + } +} + +/* + called when a CTDB_REPLY_CONTROL packet comes in +*/ +void ctdb_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr; + TDB_DATA data; + struct ctdb_control_state *state; + const char *errormsg = NULL; + + state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_control_state); + if (state == NULL) { + DEBUG(0,("vnn %u Invalid reqid %u in ctdb_reply_control\n", + ctdb->vnn, hdr->reqid)); + return; + } + + if (hdr->reqid != state->reqid) { + /* we found a record but it was the wrong one */ + DEBUG(0, ("Dropped orphaned control reply with reqid:%u\n", hdr->reqid)); + return; + } + + data.dptr = &c->data[0]; + data.dsize = c->datalen; + if (c->errorlen) { + errormsg = talloc_strndup(state, + (char *)&c->data[c->datalen], c->errorlen); + } + + /* make state a child of the packet, so it goes away when the packet + is freed. */ + talloc_steal(hdr, state); + + state->callback(ctdb, c->status, data, errormsg, state->private_data); +} + +static int ctdb_control_destructor(struct ctdb_control_state *state) +{ + ctdb_reqid_remove(state->ctdb, state->reqid); + return 0; +} + +/* + handle a timeout of a control + */ +static void ctdb_control_timeout(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ctdb_control_state *state = talloc_get_type(private_data, struct ctdb_control_state); + TALLOC_CTX *tmp_ctx = talloc_new(ev); + + state->ctdb->statistics.timeouts.control++; + + talloc_steal(tmp_ctx, state); + + state->callback(state->ctdb, -1, tdb_null, + "ctdb_control timed out", + state->private_data); + talloc_free(tmp_ctx); +} + + +/* + send a control message to a node + */ +int ctdb_daemon_send_control(struct ctdb_context *ctdb, uint32_t destnode, + uint64_t srvid, uint32_t opcode, uint32_t client_id, + uint32_t flags, + TDB_DATA data, + ctdb_control_callback_fn_t callback, + void *private_data) +{ + struct ctdb_req_control *c; + struct ctdb_control_state *state; + size_t len; + + if (((destnode == CTDB_BROADCAST_VNNMAP) || + (destnode == CTDB_BROADCAST_ALL) || + (destnode == CTDB_BROADCAST_CONNECTED)) && + !(flags & CTDB_CTRL_FLAG_NOREPLY)) { + DEBUG(0,("Attempt to broadcast control without NOREPLY\n")); + return -1; + } + + if (destnode != CTDB_BROADCAST_VNNMAP && + destnode != CTDB_BROADCAST_ALL && + destnode != CTDB_BROADCAST_CONNECTED && + (!ctdb_validate_vnn(ctdb, destnode) || + (ctdb->nodes[destnode]->flags & NODE_FLAGS_DISCONNECTED))) { + if (!(flags & CTDB_CTRL_FLAG_NOREPLY)) { + callback(ctdb, -1, tdb_null, "ctdb_control to disconnected node", private_data); + } + return 0; + } + + /* the state is made a child of private_data if possible. This means any reply + will be discarded if the private_data goes away */ + state = talloc(private_data?private_data:ctdb, struct ctdb_control_state); + CTDB_NO_MEMORY(ctdb, state); + + state->reqid = ctdb_reqid_new(ctdb, state); + state->callback = callback; + state->private_data = private_data; + state->ctdb = ctdb; + state->flags = flags; + + talloc_set_destructor(state, ctdb_control_destructor); + + len = offsetof(struct ctdb_req_control, data) + data.dsize; + c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CONTROL, len, + struct ctdb_req_control); + CTDB_NO_MEMORY(ctdb, c); + talloc_set_name_const(c, "ctdb_req_control packet"); + + c->hdr.destnode = destnode; + c->hdr.reqid = state->reqid; + c->opcode = opcode; + c->client_id = client_id; + c->flags = flags; + c->srvid = srvid; + c->datalen = data.dsize; + if (data.dsize) { + memcpy(&c->data[0], data.dptr, data.dsize); + } + + ctdb_queue_packet(ctdb, &c->hdr); + + if (flags & CTDB_CTRL_FLAG_NOREPLY) { + talloc_free(state); + return 0; + } + + if (ctdb->tunable.control_timeout) { + event_add_timed(ctdb->ev, state, + timeval_current_ofs(ctdb->tunable.control_timeout, 0), + ctdb_control_timeout, state); + } + + talloc_free(c); + return 0; +} diff --git a/source4/cluster/ctdb/server/ctdb_daemon.c b/source4/cluster/ctdb/server/ctdb_daemon.c new file mode 100644 index 0000000000..2577970075 --- /dev/null +++ b/source4/cluster/ctdb/server/ctdb_daemon.c @@ -0,0 +1,927 @@ +/* + ctdb daemon code + + Copyright (C) Andrew Tridgell 2006 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "db_wrap.h" +#include "lib/tdb/include/tdb.h" +#include "lib/events/events.h" +#include "lib/util/dlinklist.h" +#include "system/network.h" +#include "system/filesys.h" +#include "system/wait.h" +#include "../include/ctdb.h" +#include "../include/ctdb_private.h" + +static void daemon_incoming_packet(void *, struct ctdb_req_header *); + +/* + handler for when a node changes its flags +*/ +static void flag_change_handler(struct ctdb_context *ctdb, uint64_t srvid, + TDB_DATA data, void *private_data) +{ + struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr; + + if (data.dsize != sizeof(*c) || !ctdb_validate_vnn(ctdb, c->vnn)) { + DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n")); + return; + } + + if (!ctdb_validate_vnn(ctdb, c->vnn)) { + DEBUG(0,("Bad vnn %u in flag_change_handler\n", c->vnn)); + return; + } + + /* don't get the disconnected flag from the other node */ + ctdb->nodes[c->vnn]->flags = + (ctdb->nodes[c->vnn]->flags&NODE_FLAGS_DISCONNECTED) + | (c->flags & ~NODE_FLAGS_DISCONNECTED); + DEBUG(2,("Node flags for node %u are now 0x%x\n", c->vnn, ctdb->nodes[c->vnn]->flags)); + + /* make sure we don't hold any IPs when we shouldn't */ + if (c->vnn == ctdb->vnn && + (ctdb->nodes[c->vnn]->flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_BANNED))) { + ctdb_release_all_ips(ctdb); + } +} + +/* called when the "startup" event script has finished */ +static void ctdb_start_transport(struct ctdb_context *ctdb, int status, void *p) +{ + if (status != 0) { + DEBUG(0,("startup event failed!\n")); + ctdb_fatal(ctdb, "startup event script failed"); + } + + /* start the transport running */ + if (ctdb->methods->start(ctdb) != 0) { + DEBUG(0,("transport failed to start!\n")); + ctdb_fatal(ctdb, "transport failed to start"); + } + + /* start the recovery daemon process */ + if (ctdb_start_recoverd(ctdb) != 0) { + DEBUG(0,("Failed to start recovery daemon\n")); + exit(11); + } + + /* a handler for when nodes are disabled/enabled */ + ctdb_register_message_handler(ctdb, ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, + flag_change_handler, NULL); + + /* start monitoring for dead nodes */ + ctdb_start_monitoring(ctdb); +} + +/* go into main ctdb loop */ +static void ctdb_main_loop(struct ctdb_context *ctdb) +{ + int ret = -1; + + if (strcmp(ctdb->transport, "tcp") == 0) { + int ctdb_tcp_init(struct ctdb_context *); + ret = ctdb_tcp_init(ctdb); + } +#ifdef USE_INFINIBAND + if (strcmp(ctdb->transport, "ib") == 0) { + int ctdb_ibw_init(struct ctdb_context *); + ret = ctdb_ibw_init(ctdb); + } +#endif + if (ret != 0) { + DEBUG(0,("Failed to initialise transport '%s'\n", ctdb->transport)); + return; + } + + /* initialise the transport */ + if (ctdb->methods->initialise(ctdb) != 0) { + DEBUG(0,("transport failed to initialise!\n")); + ctdb_fatal(ctdb, "transport failed to initialise"); + } + + /* tell all other nodes we've just started up */ + ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, + 0, CTDB_CONTROL_STARTUP, 0, + CTDB_CTRL_FLAG_NOREPLY, + tdb_null, NULL, NULL); + + /* release any IPs we hold from previous runs of the daemon */ + ctdb_release_all_ips(ctdb); + + ret = ctdb_event_script_callback(ctdb, timeval_zero(), ctdb, + ctdb_start_transport, NULL, "startup"); + if (ret != 0) { + DEBUG(0,("Failed startup event script\n")); + return; + } + + /* go into a wait loop to allow other nodes to complete */ + event_loop_wait(ctdb->ev); + + DEBUG(0,("event_loop_wait() returned. this should not happen\n")); + exit(1); +} + + +static void block_signal(int signum) +{ + struct sigaction act; + + memset(&act, 0, sizeof(act)); + + act.sa_handler = SIG_IGN; + sigemptyset(&act.sa_mask); + sigaddset(&act.sa_mask, signum); + sigaction(signum, &act, NULL); +} + + +/* + send a packet to a client + */ +static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr) +{ + client->ctdb->statistics.client_packets_sent++; + return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length); +} + +/* + message handler for when we are in daemon mode. This redirects the message + to the right client + */ +static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, + TDB_DATA data, void *private_data) +{ + struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client); + struct ctdb_req_message *r; + int len; + + /* construct a message to send to the client containing the data */ + len = offsetof(struct ctdb_req_message, data) + data.dsize; + r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, + len, struct ctdb_req_message); + CTDB_NO_MEMORY_VOID(ctdb, r); + + talloc_set_name_const(r, "req_message packet"); + + r->srvid = srvid; + r->datalen = data.dsize; + memcpy(&r->data[0], data.dptr, data.dsize); + + daemon_queue_send(client, &r->hdr); + + talloc_free(r); +} + + +/* + this is called when the ctdb daemon received a ctdb request to + set the srvid from the client + */ +int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid) +{ + struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); + int res; + if (client == NULL) { + DEBUG(0,("Bad client_id in daemon_request_register_message_handler\n")); + return -1; + } + res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client); + if (res != 0) { + DEBUG(0,(__location__ " Failed to register handler %llu in daemon\n", + (unsigned long long)srvid)); + } else { + DEBUG(2,(__location__ " Registered message handler for srvid=%llu\n", + (unsigned long long)srvid)); + } + + /* this is a hack for Samba - we now know the pid of the Samba client */ + if ((srvid & 0xFFFFFFFF) == srvid && + kill(srvid, 0) == 0) { + client->pid = srvid; + DEBUG(0,(__location__ " Registered PID %u for client %u\n", + (unsigned)client->pid, client_id)); + } + return res; +} + +/* + this is called when the ctdb daemon received a ctdb request to + remove a srvid from the client + */ +int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid) +{ + struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); + if (client == NULL) { + DEBUG(0,("Bad client_id in daemon_request_deregister_message_handler\n")); + return -1; + } + return ctdb_deregister_message_handler(ctdb, srvid, client); +} + + +/* + destroy a ctdb_client +*/ +static int ctdb_client_destructor(struct ctdb_client *client) +{ + ctdb_takeover_client_destructor_hook(client); + ctdb_reqid_remove(client->ctdb, client->client_id); + client->ctdb->statistics.num_clients--; + return 0; +} + + +/* + this is called when the ctdb daemon received a ctdb request message + from a local client over the unix domain socket + */ +static void daemon_request_message_from_client(struct ctdb_client *client, + struct ctdb_req_message *c) +{ + TDB_DATA data; + int res; + + /* maybe the message is for another client on this node */ + if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) { + ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c); + return; + } + + /* its for a remote node */ + data.dptr = &c->data[0]; + data.dsize = c->datalen; + res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode, + c->srvid, data); + if (res != 0) { + DEBUG(0,(__location__ " Failed to send message to remote node %u\n", + c->hdr.destnode)); + } +} + + +struct daemon_call_state { + struct ctdb_client *client; + uint32_t reqid; + struct ctdb_call *call; + struct timeval start_time; +}; + +/* + complete a call from a client +*/ +static void daemon_call_from_client_callback(struct ctdb_call_state *state) +{ + struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, + struct daemon_call_state); + struct ctdb_reply_call *r; + int res; + uint32_t length; + struct ctdb_client *client = dstate->client; + + talloc_steal(client, dstate); + talloc_steal(dstate, dstate->call); + + res = ctdb_daemon_call_recv(state, dstate->call); + if (res != 0) { + DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n")); + client->ctdb->statistics.pending_calls--; + ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time); + return; + } + + length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize; + r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, + length, struct ctdb_reply_call); + if (r == NULL) { + DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n")); + client->ctdb->statistics.pending_calls--; + ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time); + return; + } + r->hdr.reqid = dstate->reqid; + r->datalen = dstate->call->reply_data.dsize; + memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen); + + res = daemon_queue_send(client, &r->hdr); + if (res != 0) { + DEBUG(0, (__location__ " Failed to queue packet from daemon to client\n")); + } + ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time); + talloc_free(dstate); + client->ctdb->statistics.pending_calls--; +} + + +static void daemon_request_call_from_client(struct ctdb_client *client, + struct ctdb_req_call *c); + +/* + this is called when the ctdb daemon received a ctdb request call + from a local client over the unix domain socket + */ +static void daemon_request_call_from_client(struct ctdb_client *client, + struct ctdb_req_call *c) +{ + struct ctdb_call_state *state; + struct ctdb_db_context *ctdb_db; + struct daemon_call_state *dstate; + struct ctdb_call *call; + struct ctdb_ltdb_header header; + TDB_DATA key, data; + int ret; + struct ctdb_context *ctdb = client->ctdb; + + ctdb->statistics.total_calls++; + ctdb->statistics.pending_calls++; + + ctdb_db = find_ctdb_db(client->ctdb, c->db_id); + if (!ctdb_db) { + DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x", + c->db_id)); + ctdb->statistics.pending_calls--; + return; + } + + key.dptr = c->data; + key.dsize = c->keylen; + + ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, + (struct ctdb_req_header *)c, &data, + daemon_incoming_packet, client, True); + if (ret == -2) { + /* will retry later */ + ctdb->statistics.pending_calls--; + return; + } + + if (ret != 0) { + DEBUG(0,(__location__ " Unable to fetch record\n")); + ctdb->statistics.pending_calls--; + return; + } + + dstate = talloc(client, struct daemon_call_state); + if (dstate == NULL) { + ctdb_ltdb_unlock(ctdb_db, key); + DEBUG(0,(__location__ " Unable to allocate dstate\n")); + ctdb->statistics.pending_calls--; + return; + } + dstate->start_time = timeval_current(); + dstate->client = client; + dstate->reqid = c->hdr.reqid; + talloc_steal(dstate, data.dptr); + + call = dstate->call = talloc_zero(dstate, struct ctdb_call); + if (call == NULL) { + ctdb_ltdb_unlock(ctdb_db, key); + DEBUG(0,(__location__ " Unable to allocate call\n")); + ctdb->statistics.pending_calls--; + ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time); + return; + } + + call->call_id = c->callid; + call->key = key; + call->call_data.dptr = c->data + c->keylen; + call->call_data.dsize = c->calldatalen; + call->flags = c->flags; + + if (header.dmaster == ctdb->vnn) { + state = ctdb_call_local_send(ctdb_db, call, &header, &data); + } else { + state = ctdb_daemon_call_send_remote(ctdb_db, call, &header); + } + + ctdb_ltdb_unlock(ctdb_db, key); + + if (state == NULL) { + DEBUG(0,(__location__ " Unable to setup call send\n")); + ctdb->statistics.pending_calls--; + ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time); + return; + } + talloc_steal(state, dstate); + talloc_steal(client, state); + + state->async.fn = daemon_call_from_client_callback; + state->async.private_data = dstate; +} + + +static void daemon_request_control_from_client(struct ctdb_client *client, + struct ctdb_req_control *c); + +/* data contains a packet from the client */ +static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr) +{ + struct ctdb_client *client = talloc_get_type(p, struct ctdb_client); + TALLOC_CTX *tmp_ctx; + struct ctdb_context *ctdb = client->ctdb; + + /* place the packet as a child of a tmp_ctx. We then use + talloc_free() below to free it. If any of the calls want + to keep it, then they will steal it somewhere else, and the + talloc_free() will be a no-op */ + tmp_ctx = talloc_new(client); + talloc_steal(tmp_ctx, hdr); + + if (hdr->ctdb_magic != CTDB_MAGIC) { + ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n"); + goto done; + } + + if (hdr->ctdb_version != CTDB_VERSION) { + ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version); + goto done; + } + + switch (hdr->operation) { + case CTDB_REQ_CALL: + ctdb->statistics.client.req_call++; + daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr); + break; + + case CTDB_REQ_MESSAGE: + ctdb->statistics.client.req_message++; + daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr); + break; + + case CTDB_REQ_CONTROL: + ctdb->statistics.client.req_control++; + daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr); + break; + + default: + DEBUG(0,(__location__ " daemon: unrecognized operation %u\n", + hdr->operation)); + } + +done: + talloc_free(tmp_ctx); +} + +/* + called when the daemon gets a incoming packet + */ +static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args) +{ + struct ctdb_client *client = talloc_get_type(args, struct ctdb_client); + struct ctdb_req_header *hdr; + + if (cnt == 0) { + talloc_free(client); + return; + } + + client->ctdb->statistics.client_packets_recv++; + + if (cnt < sizeof(*hdr)) { + ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", + (unsigned)cnt); + return; + } + hdr = (struct ctdb_req_header *)data; + if (cnt != hdr->length) { + ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", + (unsigned)hdr->length, (unsigned)cnt); + return; + } + + if (hdr->ctdb_magic != CTDB_MAGIC) { + ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n"); + return; + } + + if (hdr->ctdb_version != CTDB_VERSION) { + ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version); + return; + } + + DEBUG(3,(__location__ " client request %u of type %u length %u from " + "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length, + hdr->srcnode, hdr->destnode)); + + /* it is the responsibility of the incoming packet function to free 'data' */ + daemon_incoming_packet(client, hdr); +} + +static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private_data) +{ + struct sockaddr_in addr; + socklen_t len; + int fd; + struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context); + struct ctdb_client *client; + + memset(&addr, 0, sizeof(addr)); + len = sizeof(addr); + fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len); + if (fd == -1) { + return; + } + + set_nonblocking(fd); + set_close_on_exec(fd); + + client = talloc_zero(ctdb, struct ctdb_client); + client->ctdb = ctdb; + client->fd = fd; + client->client_id = ctdb_reqid_new(ctdb, client); + ctdb->statistics.num_clients++; + + client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, + ctdb_daemon_read_cb, client); + + talloc_set_destructor(client, ctdb_client_destructor); +} + + + +/* + create a unix domain socket and bind it + return a file descriptor open on the socket +*/ +static int ux_socket_bind(struct ctdb_context *ctdb) +{ + struct sockaddr_un addr; + + ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0); + if (ctdb->daemon.sd == -1) { + return -1; + } + + set_nonblocking(ctdb->daemon.sd); + set_close_on_exec(ctdb->daemon.sd); + +#if 0 + /* AIX doesn't like this :( */ + if (fchown(ctdb->daemon.sd, geteuid(), getegid()) != 0 || + fchmod(ctdb->daemon.sd, 0700) != 0) { + DEBUG(0,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n")); + goto failed; + } +#endif + + set_nonblocking(ctdb->daemon.sd); + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)); + + if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + DEBUG(0,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name)); + goto failed; + } + if (listen(ctdb->daemon.sd, 10) != 0) { + DEBUG(0,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name)); + goto failed; + } + + return 0; + +failed: + close(ctdb->daemon.sd); + ctdb->daemon.sd = -1; + return -1; +} + +/* + delete the socket on exit - called on destruction of autofree context + */ +static int unlink_destructor(const char *name) +{ + unlink(name); + return 0; +} + + +/* + start the protocol going as a daemon +*/ +int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork) +{ + int res; + struct fd_event *fde; + const char *domain_socket_name; + + /* get rid of any old sockets */ + unlink(ctdb->daemon.name); + + /* create a unix domain stream socket to listen to */ + res = ux_socket_bind(ctdb); + if (res!=0) { + DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n")); + exit(10); + } + + if (do_fork && fork()) { + return 0; + } + + tdb_reopen_all(False); + + if (do_fork) { + setsid(); + } + block_signal(SIGPIPE); + + /* try to set us up as realtime */ + ctdb_set_realtime(true); + + /* ensure the socket is deleted on exit of the daemon */ + domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name); + talloc_set_destructor(domain_socket_name, unlink_destructor); + + ctdb->ev = event_context_init(NULL); + + /* start frozen, then let the first election sort things out */ + if (!ctdb_blocking_freeze(ctdb)) { + DEBUG(0,("Failed to get initial freeze\n")); + exit(12); + } + + /* force initial recovery for election */ + ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE; + + /* now start accepting clients, only can do this once frozen */ + fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, + EVENT_FD_READ|EVENT_FD_AUTOCLOSE, + ctdb_accept_client, ctdb); + + ctdb_main_loop(ctdb); + + return 0; +} + +/* + allocate a packet for use in daemon<->daemon communication + */ +struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb, + TALLOC_CTX *mem_ctx, + enum ctdb_operation operation, + size_t length, size_t slength, + const char *type) +{ + int size; + struct ctdb_req_header *hdr; + + length = MAX(length, slength); + size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1); + + hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size); + if (hdr == NULL) { + DEBUG(0,("Unable to allocate transport packet for operation %u of length %u\n", + operation, (unsigned)length)); + return NULL; + } + talloc_set_name_const(hdr, type); + memset(hdr, 0, slength); + hdr->length = length; + hdr->operation = operation; + hdr->ctdb_magic = CTDB_MAGIC; + hdr->ctdb_version = CTDB_VERSION; + hdr->generation = ctdb->vnn_map->generation; + hdr->srcnode = ctdb->vnn; + + return hdr; +} + +struct daemon_control_state { + struct daemon_control_state *next, *prev; + struct ctdb_client *client; + struct ctdb_req_control *c; + uint32_t reqid; + struct ctdb_node *node; +}; + +/* + callback when a control reply comes in + */ +static void daemon_control_callback(struct ctdb_context *ctdb, + int32_t status, TDB_DATA data, + const char *errormsg, + void *private_data) +{ + struct daemon_control_state *state = talloc_get_type(private_data, + struct daemon_control_state); + struct ctdb_client *client = state->client; + struct ctdb_reply_control *r; + size_t len; + + /* construct a message to send to the client containing the data */ + len = offsetof(struct ctdb_reply_control, data) + data.dsize; + if (errormsg) { + len += strlen(errormsg); + } + r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, + struct ctdb_reply_control); + CTDB_NO_MEMORY_VOID(ctdb, r); + + r->hdr.reqid = state->reqid; + r->status = status; + r->datalen = data.dsize; + r->errorlen = 0; + memcpy(&r->data[0], data.dptr, data.dsize); + if (errormsg) { + r->errorlen = strlen(errormsg); + memcpy(&r->data[r->datalen], errormsg, r->errorlen); + } + + daemon_queue_send(client, &r->hdr); + + talloc_free(state); +} + +/* + fail all pending controls to a disconnected node + */ +void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node) +{ + struct daemon_control_state *state; + while ((state = node->pending_controls)) { + DLIST_REMOVE(node->pending_controls, state); + daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, + "node is disconnected", state); + } +} + +/* + destroy a daemon_control_state + */ +static int daemon_control_destructor(struct daemon_control_state *state) +{ + if (state->node) { + DLIST_REMOVE(state->node->pending_controls, state); + } + return 0; +} + +/* + this is called when the ctdb daemon received a ctdb request control + from a local client over the unix domain socket + */ +static void daemon_request_control_from_client(struct ctdb_client *client, + struct ctdb_req_control *c) +{ + TDB_DATA data; + int res; + struct daemon_control_state *state; + TALLOC_CTX *tmp_ctx = talloc_new(client); + + if (c->hdr.destnode == CTDB_CURRENT_NODE) { + c->hdr.destnode = client->ctdb->vnn; + } + + state = talloc(client, struct daemon_control_state); + CTDB_NO_MEMORY_VOID(client->ctdb, state); + + state->client = client; + state->c = talloc_steal(state, c); + state->reqid = c->hdr.reqid; + if (ctdb_validate_vnn(client->ctdb, c->hdr.destnode)) { + state->node = client->ctdb->nodes[c->hdr.destnode]; + DLIST_ADD(state->node->pending_controls, state); + } else { + state->node = NULL; + } + + talloc_set_destructor(state, daemon_control_destructor); + + if (c->flags & CTDB_CTRL_FLAG_NOREPLY) { + talloc_steal(tmp_ctx, state); + } + + data.dptr = &c->data[0]; + data.dsize = c->datalen; + res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode, + c->srvid, c->opcode, client->client_id, + c->flags, + data, daemon_control_callback, + state); + if (res != 0) { + DEBUG(0,(__location__ " Failed to send control to remote node %u\n", + c->hdr.destnode)); + } + + talloc_free(tmp_ctx); +} + +/* + register a call function +*/ +int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id, + ctdb_fn_t fn, int id) +{ + struct ctdb_registered_call *call; + struct ctdb_db_context *ctdb_db; + + ctdb_db = find_ctdb_db(ctdb, db_id); + if (ctdb_db == NULL) { + return -1; + } + + call = talloc(ctdb_db, struct ctdb_registered_call); + call->fn = fn; + call->id = id; + + DLIST_ADD(ctdb_db->calls, call); + return 0; +} + + + +/* + this local messaging handler is ugly, but is needed to prevent + recursion in ctdb_send_message() when the destination node is the + same as the source node + */ +struct ctdb_local_message { + struct ctdb_context *ctdb; + uint64_t srvid; + TDB_DATA data; +}; + +static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ctdb_local_message *m = talloc_get_type(private_data, + struct ctdb_local_message); + int res; + + res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data); + if (res != 0) { + DEBUG(0, (__location__ " Failed to dispatch message for srvid=%llu\n", + (unsigned long long)m->srvid)); + } + talloc_free(m); +} + +static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data) +{ + struct ctdb_local_message *m; + m = talloc(ctdb, struct ctdb_local_message); + CTDB_NO_MEMORY(ctdb, m); + + m->ctdb = ctdb; + m->srvid = srvid; + m->data = data; + m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize); + if (m->data.dptr == NULL) { + talloc_free(m); + return -1; + } + + /* this needs to be done as an event to prevent recursion */ + event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m); + return 0; +} + +/* + send a ctdb message +*/ +int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn, + uint64_t srvid, TDB_DATA data) +{ + struct ctdb_req_message *r; + int len; + + /* see if this is a message to ourselves */ + if (vnn == ctdb->vnn) { + return ctdb_local_message(ctdb, srvid, data); + } + + len = offsetof(struct ctdb_req_message, data) + data.dsize; + r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len, + struct ctdb_req_message); + CTDB_NO_MEMORY(ctdb, r); + + r->hdr.destnode = vnn; + r->srvid = srvid; + r->datalen = data.dsize; + memcpy(&r->data[0], data.dptr, data.dsize); + + ctdb_queue_packet(ctdb, &r->hdr); + + talloc_free(r); + return 0; +} + diff --git a/source4/cluster/ctdb/server/ctdb_freeze.c b/source4/cluster/ctdb/server/ctdb_freeze.c new file mode 100644 index 0000000000..42ad975b53 --- /dev/null +++ b/source4/cluster/ctdb/server/ctdb_freeze.c @@ -0,0 +1,256 @@ +/* + ctdb freeze handling + + Copyright (C) Andrew Tridgell 2007 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ +#include "includes.h" +#include "lib/events/events.h" +#include "lib/tdb/include/tdb.h" +#include "system/network.h" +#include "system/filesys.h" +#include "system/wait.h" +#include "../include/ctdb_private.h" +#include "lib/util/dlinklist.h" +#include "db_wrap.h" + + +/* + lock all databases + */ +static int ctdb_lock_all_databases(struct ctdb_context *ctdb) +{ + struct ctdb_db_context *ctdb_db; + for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) { + if (tdb_lockall(ctdb_db->ltdb->tdb) != 0) { + return -1; + } + } + return 0; +} + +/* + a list of control requests waiting for a freeze lock child to get + the database locks + */ +struct ctdb_freeze_waiter { + struct ctdb_freeze_waiter *next, *prev; + struct ctdb_context *ctdb; + struct ctdb_req_control *c; + int32_t status; +}; + +/* a handle to a freeze lock child process */ +struct ctdb_freeze_handle { + struct ctdb_context *ctdb; + pid_t child; + int fd; + struct ctdb_freeze_waiter *waiters; +}; + +/* + destroy a freeze handle + */ +static int ctdb_freeze_handle_destructor(struct ctdb_freeze_handle *h) +{ + h->ctdb->freeze_mode = CTDB_FREEZE_NONE; + kill(h->child, SIGKILL); + waitpid(h->child, NULL, 0); + return 0; +} + +/* + called when the child writes its status to us + */ +static void ctdb_freeze_lock_handler(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private_data) +{ + struct ctdb_freeze_handle *h = talloc_get_type(private_data, struct ctdb_freeze_handle); + int32_t status; + struct ctdb_freeze_waiter *w; + + if (h->ctdb->freeze_mode == CTDB_FREEZE_FROZEN) { + DEBUG(0,("freeze child died - unfreezing\n")); + talloc_free(h); + return; + } + + if (read(h->fd, &status, sizeof(status)) != sizeof(status)) { + DEBUG(0,("read error from freeze lock child\n")); + status = -1; + } + + if (status == -1) { + DEBUG(0,("Failed to get locks in ctdb_freeze_child\n")); + /* we didn't get the locks - destroy the handle */ + talloc_free(h); + return; + } + + h->ctdb->freeze_mode = CTDB_FREEZE_FROZEN; + + /* notify the waiters */ + while ((w = h->ctdb->freeze_handle->waiters)) { + w->status = status; + DLIST_REMOVE(h->ctdb->freeze_handle->waiters, w); + talloc_free(w); + } +} + +/* + create a child which gets locks on all the open databases, then calls the callback telling the parent + that it is done + */ +static struct ctdb_freeze_handle *ctdb_freeze_lock(struct ctdb_context *ctdb) +{ + struct ctdb_freeze_handle *h; + int fd[2]; + struct fd_event *fde; + + h = talloc_zero(ctdb, struct ctdb_freeze_handle); + CTDB_NO_MEMORY_VOID(ctdb, h); + + h->ctdb = ctdb; + + /* use socketpair() instead of pipe() so we have bi-directional fds */ + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd) != 0) { + DEBUG(0,("Failed to create pipe for ctdb_freeze_lock\n")); + talloc_free(h); + return NULL; + } + + h->child = fork(); + if (h->child == -1) { + DEBUG(0,("Failed to fork child for ctdb_freeze_lock\n")); + talloc_free(h); + return NULL; + } + + if (h->child == 0) { + int ret; + /* in the child */ + close(fd[0]); + ret = ctdb_lock_all_databases(ctdb); + if (ret != 0) { + _exit(0); + } + write(fd[1], &ret, sizeof(ret)); + /* the read here means we will die if the parent exits */ + read(fd[1], &ret, sizeof(ret)); + _exit(0); + } + + talloc_set_destructor(h, ctdb_freeze_handle_destructor); + + close(fd[1]); + + h->fd = fd[0]; + + fde = event_add_fd(ctdb->ev, h, h->fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE, + ctdb_freeze_lock_handler, h); + if (fde == NULL) { + DEBUG(0,("Failed to setup fd event for ctdb_freeze_lock\n")); + close(fd[0]); + talloc_free(h); + return NULL; + } + + return h; +} + +/* + destroy a waiter for a freeze mode change + */ +static int ctdb_freeze_waiter_destructor(struct ctdb_freeze_waiter *w) +{ + DLIST_REMOVE(w->ctdb->freeze_handle->waiters, w); + ctdb_request_control_reply(w->ctdb, w->c, NULL, w->status, NULL); + return 0; +} + +/* + start the freeze process + */ +void ctdb_start_freeze(struct ctdb_context *ctdb) +{ + if (ctdb->freeze_mode == CTDB_FREEZE_FROZEN) { + /* we're already frozen */ + return; + } + + /* if there isn't a freeze lock child then create one */ + if (!ctdb->freeze_handle) { + ctdb->freeze_handle = ctdb_freeze_lock(ctdb); + CTDB_NO_MEMORY_VOID(ctdb, ctdb->freeze_handle); + ctdb->freeze_mode = CTDB_FREEZE_PENDING; + } +} + +/* + freeze the databases + */ +int32_t ctdb_control_freeze(struct ctdb_context *ctdb, struct ctdb_req_control *c, bool *async_reply) +{ + struct ctdb_freeze_waiter *w; + + if (ctdb->freeze_mode == CTDB_FREEZE_FROZEN) { + /* we're already frozen */ + return 0; + } + + ctdb_start_freeze(ctdb); + + /* add ourselves to list of waiters */ + w = talloc(ctdb->freeze_handle, struct ctdb_freeze_waiter); + CTDB_NO_MEMORY(ctdb, w); + w->ctdb = ctdb; + w->c = talloc_steal(w, c); + w->status = -1; + talloc_set_destructor(w, ctdb_freeze_waiter_destructor); + DLIST_ADD(ctdb->freeze_handle->waiters, w); + + /* we won't reply till later */ + *async_reply = True; + return 0; +} + + +/* + block until we are frozen, used during daemon startup + */ +bool ctdb_blocking_freeze(struct ctdb_context *ctdb) +{ + ctdb_start_freeze(ctdb); + + /* block until frozen */ + while (ctdb->freeze_mode == CTDB_FREEZE_PENDING) { + event_loop_once(ctdb->ev); + } + + return ctdb->freeze_mode == CTDB_FREEZE_FROZEN; +} + + + +/* + thaw the databases + */ +int32_t ctdb_control_thaw(struct ctdb_context *ctdb) +{ + talloc_free(ctdb->freeze_handle); + ctdb->freeze_handle = NULL; + ctdb_call_resend_all(ctdb); + return 0; +} diff --git a/source4/cluster/ctdb/server/ctdb_lockwait.c b/source4/cluster/ctdb/server/ctdb_lockwait.c new file mode 100644 index 0000000000..5b0019836e --- /dev/null +++ b/source4/cluster/ctdb/server/ctdb_lockwait.c @@ -0,0 +1,165 @@ +/* + wait for a tdb chain lock + + Copyright (C) Andrew Tridgell 2006 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "system/filesys.h" +#include "system/wait.h" +#include "db_wrap.h" +#include "lib/tdb/include/tdb.h" +#include "../include/ctdb_private.h" + + +struct lockwait_handle { + struct ctdb_context *ctdb; + struct ctdb_db_context *ctdb_db; + struct fd_event *fde; + int fd[2]; + pid_t child; + void *private_data; + void (*callback)(void *); + TDB_DATA key; + struct timeval start_time; +}; + +static void lockwait_handler(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private_data) +{ + struct lockwait_handle *h = talloc_get_type(private_data, + struct lockwait_handle); + void (*callback)(void *) = h->callback; + void *p = h->private_data; + pid_t child = h->child; + TDB_DATA key = h->key; + struct tdb_context *tdb = h->ctdb_db->ltdb->tdb; + TALLOC_CTX *tmp_ctx = talloc_new(ev); + + key.dptr = talloc_memdup(tmp_ctx, key.dptr, key.dsize); + + talloc_set_destructor(h, NULL); + ctdb_latency(&h->ctdb->statistics.max_lockwait_latency, h->start_time); + h->ctdb->statistics.pending_lockwait_calls--; + + /* the handle needs to go away when the context is gone - when + the handle goes away this implicitly closes the pipe, which + kills the child holding the lock */ + talloc_steal(tmp_ctx, h); + + if (h->ctdb->flags & CTDB_FLAG_TORTURE) { + if (tdb_chainlock_nonblock(tdb, key) == 0) { + ctdb_fatal(h->ctdb, "got chain lock while lockwait child active"); + } + } + + tdb_chainlock_mark(tdb, key); + callback(p); + tdb_chainlock_unmark(tdb, key); + + kill(child, SIGKILL); + waitpid(child, NULL, 0); + talloc_free(tmp_ctx); +} + +static int lockwait_destructor(struct lockwait_handle *h) +{ + h->ctdb->statistics.pending_lockwait_calls--; + kill(h->child, SIGKILL); + waitpid(h->child, NULL, 0); + return 0; +} + +/* + setup a non-blocking chainlock on a tdb record. If this function + returns NULL then it could not get the chainlock. Otherwise it + returns a opaque handle, and will call callback() once it has + managed to get the chainlock. You can cancel it by using talloc_free + on the returned handle. + + It is the callers responsibility to unlock the chainlock once + acquired + */ +struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db, + TDB_DATA key, + void (*callback)(void *private_data), + void *private_data) +{ + struct lockwait_handle *result; + int ret; + pid_t parent = getpid(); + + ctdb_db->ctdb->statistics.lockwait_calls++; + ctdb_db->ctdb->statistics.pending_lockwait_calls++; + + if (!(result = talloc_zero(private_data, struct lockwait_handle))) { + ctdb_db->ctdb->statistics.pending_lockwait_calls--; + return NULL; + } + + ret = pipe(result->fd); + + if (ret != 0) { + talloc_free(result); + ctdb_db->ctdb->statistics.pending_lockwait_calls--; + return NULL; + } + + result->child = fork(); + + if (result->child == (pid_t)-1) { + close(result->fd[0]); + close(result->fd[1]); + talloc_free(result); + ctdb_db->ctdb->statistics.pending_lockwait_calls--; + return NULL; + } + + result->callback = callback; + result->private_data = private_data; + result->ctdb = ctdb_db->ctdb; + result->ctdb_db = ctdb_db; + result->key = key; + + if (result->child == 0) { + char c = 0; + close(result->fd[0]); + tdb_chainlock(ctdb_db->ltdb->tdb, key); + write(result->fd[1], &c, 1); + /* make sure we die when our parent dies */ + while (kill(parent, 0) == 0 || errno != ESRCH) { + sleep(5); + } + _exit(0); + } + + close(result->fd[1]); + talloc_set_destructor(result, lockwait_destructor); + + result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0], + EVENT_FD_READ|EVENT_FD_AUTOCLOSE, lockwait_handler, + (void *)result); + if (result->fde == NULL) { + talloc_free(result); + ctdb_db->ctdb->statistics.pending_lockwait_calls--; + return NULL; + } + + result->start_time = timeval_current(); + + return result; +} diff --git a/source4/cluster/ctdb/server/ctdb_ltdb_server.c b/source4/cluster/ctdb/server/ctdb_ltdb_server.c new file mode 100644 index 0000000000..bd07f674db --- /dev/null +++ b/source4/cluster/ctdb/server/ctdb_ltdb_server.c @@ -0,0 +1,366 @@ +/* + ctdb ltdb code - server side + + Copyright (C) Andrew Tridgell 2007 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "lib/tdb/include/tdb.h" +#include "system/network.h" +#include "system/filesys.h" +#include "../include/ctdb_private.h" +#include "db_wrap.h" +#include "lib/util/dlinklist.h" + +/* + this is the dummy null procedure that all databases support +*/ +static int ctdb_null_func(struct ctdb_call_info *call) +{ + return 0; +} + +/* + this is a plain fetch procedure that all databases support +*/ +static int ctdb_fetch_func(struct ctdb_call_info *call) +{ + call->reply_data = &call->record_data; + return 0; +} + + + +struct lock_fetch_state { + struct ctdb_context *ctdb; + void (*recv_pkt)(void *, struct ctdb_req_header *); + void *recv_context; + struct ctdb_req_header *hdr; + uint32_t generation; + bool ignore_generation; +}; + +/* + called when we should retry the operation + */ +static void lock_fetch_callback(void *p) +{ + struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state); + if (!state->ignore_generation && + state->generation != state->ctdb->vnn_map->generation) { + DEBUG(0,("Discarding previous generation lockwait packet\n")); + talloc_free(state->hdr); + return; + } + state->recv_pkt(state->recv_context, state->hdr); + DEBUG(2,(__location__ " PACKET REQUEUED\n")); +} + + +/* + do a non-blocking ltdb_lock, deferring this ctdb request until we + have the chainlock + + It does the following: + + 1) tries to get the chainlock. If it succeeds, then it returns 0 + + 2) if it fails to get a chainlock immediately then it sets up a + non-blocking chainlock via ctdb_lockwait, and when it gets the + chainlock it re-submits this ctdb request to the main packet + receive function + + This effectively queues all ctdb requests that cannot be + immediately satisfied until it can get the lock. This means that + the main ctdb daemon will not block waiting for a chainlock held by + a client + + There are 3 possible return values: + + 0: means that it got the lock immediately. + -1: means that it failed to get the lock, and won't retry + -2: means that it failed to get the lock immediately, but will retry + */ +int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, + TDB_DATA key, struct ctdb_req_header *hdr, + void (*recv_pkt)(void *, struct ctdb_req_header *), + void *recv_context, bool ignore_generation) +{ + int ret; + struct tdb_context *tdb = ctdb_db->ltdb->tdb; + struct lockwait_handle *h; + struct lock_fetch_state *state; + + ret = tdb_chainlock_nonblock(tdb, key); + + if (ret != 0 && + !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) { + /* a hard failure - don't try again */ + return -1; + } + + /* when torturing, ensure we test the contended path */ + if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) && + random() % 5 == 0) { + ret = -1; + tdb_chainunlock(tdb, key); + } + + /* first the non-contended path */ + if (ret == 0) { + return 0; + } + + state = talloc(hdr, struct lock_fetch_state); + state->ctdb = ctdb_db->ctdb; + state->hdr = hdr; + state->recv_pkt = recv_pkt; + state->recv_context = recv_context; + state->generation = ctdb_db->ctdb->vnn_map->generation; + state->ignore_generation = ignore_generation; + + /* now the contended path */ + h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state); + if (h == NULL) { + tdb_chainunlock(tdb, key); + return -1; + } + + /* we need to move the packet off the temporary context in ctdb_input_pkt(), + so it won't be freed yet */ + talloc_steal(state, hdr); + talloc_steal(state, h); + + /* now tell the caller than we will retry asynchronously */ + return -2; +} + +/* + a varient of ctdb_ltdb_lock_requeue that also fetches the record + */ +int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, + TDB_DATA key, struct ctdb_ltdb_header *header, + struct ctdb_req_header *hdr, TDB_DATA *data, + void (*recv_pkt)(void *, struct ctdb_req_header *), + void *recv_context, bool ignore_generation) +{ + int ret; + + ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, + recv_context, ignore_generation); + if (ret == 0) { + ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data); + if (ret != 0) { + ctdb_ltdb_unlock(ctdb_db, key); + } + } + return ret; +} + + +/* + paraoid check to see if the db is empty + */ +static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db) +{ + struct tdb_context *tdb = ctdb_db->ltdb->tdb; + int count = tdb_traverse_read(tdb, NULL, NULL); + if (count != 0) { + DEBUG(0,(__location__ " tdb '%s' not empty on attach! aborting\n", + ctdb_db->db_path)); + ctdb_fatal(ctdb_db->ctdb, "database not empty on attach"); + } +} + +/* + a client has asked to attach a new database + */ +int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata, + TDB_DATA *outdata) +{ + const char *db_name = (const char *)indata.dptr; + struct ctdb_db_context *ctdb_db, *tmp_db; + int ret; + + /* see if we already have this name */ + for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) { + if (strcmp(db_name, tmp_db->db_name) == 0) { + /* this is not an error */ + outdata->dptr = (uint8_t *)&tmp_db->db_id; + outdata->dsize = sizeof(tmp_db->db_id); + return 0; + } + } + + ctdb_db = talloc_zero(ctdb, struct ctdb_db_context); + CTDB_NO_MEMORY(ctdb, ctdb_db); + + ctdb_db->ctdb = ctdb; + ctdb_db->db_name = talloc_strdup(ctdb_db, db_name); + CTDB_NO_MEMORY(ctdb, ctdb_db->db_name); + + ctdb_db->db_id = ctdb_hash(&indata); + + outdata->dptr = (uint8_t *)&ctdb_db->db_id; + outdata->dsize = sizeof(ctdb_db->db_id); + + /* check for hash collisions */ + for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) { + if (tmp_db->db_id == ctdb_db->db_id) { + DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n", + tmp_db->db_id, db_name, tmp_db->db_name)); + talloc_free(ctdb_db); + return -1; + } + } + + if (ctdb->db_directory == NULL) { + ctdb->db_directory = VARDIR "/ctdb"; + } + + /* make sure the db directory exists */ + if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) { + DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n", + ctdb->db_directory)); + talloc_free(ctdb_db); + return -1; + } + + /* open the database */ + ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", + ctdb->db_directory, + db_name, ctdb->vnn); + + ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, + ctdb->tunable.database_hash_size, + TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666); + if (ctdb_db->ltdb == NULL) { + DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path)); + talloc_free(ctdb_db); + return -1; + } + + ctdb_check_db_empty(ctdb_db); + + DLIST_ADD(ctdb->db_list, ctdb_db); + + /* + all databases support the "null" function. we need this in + order to do forced migration of records + */ + ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC); + if (ret != 0) { + DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name)); + talloc_free(ctdb_db); + return -1; + } + + /* + all databases support the "fetch" function. we need this + for efficient Samba3 ctdb fetch + */ + ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC); + if (ret != 0) { + DEBUG(0,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name)); + talloc_free(ctdb_db); + return -1; + } + + /* tell all the other nodes about this database */ + ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0, + CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY, + indata, NULL, NULL); + + DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path)); + + /* success */ + return 0; +} + +/* + called when a broadcast seqnum update comes in + */ +int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode) +{ + struct ctdb_db_context *ctdb_db; + if (srcnode == ctdb->vnn) { + /* don't update ourselves! */ + return 0; + } + + ctdb_db = find_ctdb_db(ctdb, db_id); + if (!ctdb_db) { + DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id)); + return -1; + } + + tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb); + ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb); + return 0; +} + +/* + timer to check for seqnum changes in a ltdb and propogate them + */ +static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, + struct timeval t, void *p) +{ + struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context); + struct ctdb_context *ctdb = ctdb_db->ctdb; + uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb); + if (new_seqnum != ctdb_db->seqnum) { + /* something has changed - propogate it */ + TDB_DATA data; + data.dptr = (uint8_t *)&ctdb_db->db_id; + data.dsize = sizeof(uint32_t); + ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0, + CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY, + data, NULL, NULL); + } + ctdb_db->seqnum = new_seqnum; + + /* setup a new timer */ + ctdb_db->te = + event_add_timed(ctdb->ev, ctdb_db, + timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0), + ctdb_ltdb_seqnum_check, ctdb_db); +} + +/* + enable seqnum handling on this db + */ +int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id) +{ + struct ctdb_db_context *ctdb_db; + ctdb_db = find_ctdb_db(ctdb, db_id); + if (!ctdb_db) { + DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id)); + return -1; + } + + if (ctdb_db->te == NULL) { + ctdb_db->te = + event_add_timed(ctdb->ev, ctdb_db, + timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0), + ctdb_ltdb_seqnum_check, ctdb_db); + } + + tdb_enable_seqnum(ctdb_db->ltdb->tdb); + ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb); + return 0; +} + diff --git a/source4/cluster/ctdb/server/ctdb_monitor.c b/source4/cluster/ctdb/server/ctdb_monitor.c new file mode 100644 index 0000000000..ec5244703c --- /dev/null +++ b/source4/cluster/ctdb/server/ctdb_monitor.c @@ -0,0 +1,227 @@ +/* + monitoring links to all other nodes to detect dead nodes + + + Copyright (C) Ronnie Sahlberg 2007 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "system/filesys.h" +#include "system/wait.h" +#include "../include/ctdb_private.h" + +/* + see if any nodes are dead + */ +static void ctdb_check_for_dead_nodes(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context); + int i; + + if (ctdb->monitoring_mode == CTDB_MONITORING_DISABLED) { + event_add_timed(ctdb->ev, ctdb->monitor_context, + timeval_current_ofs(ctdb->tunable.keepalive_interval, 0), + ctdb_check_for_dead_nodes, ctdb); + return; + } + + /* send a keepalive to all other nodes, unless */ + for (i=0;i<ctdb->num_nodes;i++) { + struct ctdb_node *node = ctdb->nodes[i]; + if (node->vnn == ctdb->vnn) { + continue; + } + + if (node->flags & NODE_FLAGS_DISCONNECTED) { + /* it might have come alive again */ + if (node->rx_cnt != 0) { + ctdb_node_connected(node); + } + continue; + } + + + if (node->rx_cnt == 0) { + node->dead_count++; + } else { + node->dead_count = 0; + } + + node->rx_cnt = 0; + + if (node->dead_count >= ctdb->tunable.keepalive_limit) { + DEBUG(0,("dead count reached for node %u\n", node->vnn)); + ctdb_node_dead(node); + ctdb_send_keepalive(ctdb, node->vnn); + /* maybe tell the transport layer to kill the + sockets as well? + */ + continue; + } + + if (node->tx_cnt == 0) { + DEBUG(5,("sending keepalive to %u\n", node->vnn)); + ctdb_send_keepalive(ctdb, node->vnn); + } + + node->tx_cnt = 0; + } + + event_add_timed(ctdb->ev, ctdb->monitor_context, + timeval_current_ofs(ctdb->tunable.keepalive_interval, 0), + ctdb_check_for_dead_nodes, ctdb); +} + +static void ctdb_check_health(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data); + +/* + called when a health monitoring event script finishes + */ +static void ctdb_health_callback(struct ctdb_context *ctdb, int status, void *p) +{ + struct ctdb_node *node = ctdb->nodes[ctdb->vnn]; + TDB_DATA data; + struct ctdb_node_flag_change c; + + event_add_timed(ctdb->ev, ctdb->monitor_context, + timeval_current_ofs(ctdb->tunable.monitor_interval, 0), + ctdb_check_health, ctdb); + + if (status != 0 && !(node->flags & NODE_FLAGS_UNHEALTHY)) { + DEBUG(0,("monitor event failed - disabling node\n")); + node->flags |= NODE_FLAGS_UNHEALTHY; + } else if (status == 0 && (node->flags & NODE_FLAGS_UNHEALTHY)) { + DEBUG(0,("monitor event OK - node re-enabled\n")); + ctdb->nodes[ctdb->vnn]->flags &= ~NODE_FLAGS_UNHEALTHY; + } else { + /* no change */ + return; + } + + c.vnn = ctdb->vnn; + c.flags = node->flags; + + data.dptr = (uint8_t *)&c; + data.dsize = sizeof(c); + + /* tell the other nodes that something has changed */ + ctdb_daemon_send_message(ctdb, CTDB_BROADCAST_CONNECTED, + CTDB_SRVID_NODE_FLAGS_CHANGED, data); + +} + + +/* + see if the event scripts think we are healthy + */ +static void ctdb_check_health(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context); + int ret; + + if (ctdb->monitoring_mode == CTDB_MONITORING_DISABLED) { + event_add_timed(ctdb->ev, ctdb->monitor_context, + timeval_current_ofs(ctdb->tunable.monitor_interval, 0), + ctdb_check_health, ctdb); + return; + } + + ret = ctdb_event_script_callback(ctdb, + timeval_current_ofs(ctdb->tunable.script_timeout, 0), + ctdb->monitor_context, ctdb_health_callback, ctdb, "monitor"); + if (ret != 0) { + DEBUG(0,("Unable to launch monitor event script\n")); + event_add_timed(ctdb->ev, ctdb->monitor_context, + timeval_current_ofs(ctdb->tunable.monitor_interval, 0), + ctdb_check_health, ctdb); + } +} + +/* stop any monitoring */ +void ctdb_stop_monitoring(struct ctdb_context *ctdb) +{ + talloc_free(ctdb->monitor_context); + ctdb->monitor_context = talloc_new(ctdb); + CTDB_NO_MEMORY_FATAL(ctdb, ctdb->monitor_context); +} + +/* + start watching for nodes that might be dead + */ +void ctdb_start_monitoring(struct ctdb_context *ctdb) +{ + struct timed_event *te; + + ctdb_stop_monitoring(ctdb); + + te = event_add_timed(ctdb->ev, ctdb->monitor_context, + timeval_current_ofs(ctdb->tunable.keepalive_interval, 0), + ctdb_check_for_dead_nodes, ctdb); + CTDB_NO_MEMORY_FATAL(ctdb, te); + + te = event_add_timed(ctdb->ev, ctdb->monitor_context, + timeval_current_ofs(ctdb->tunable.monitor_interval, 0), + ctdb_check_health, ctdb); + CTDB_NO_MEMORY_FATAL(ctdb, te); +} + + +/* + modify flags on a node + */ +int32_t ctdb_control_modflags(struct ctdb_context *ctdb, TDB_DATA indata) +{ + struct ctdb_node_modflags *m = (struct ctdb_node_modflags *)indata.dptr; + TDB_DATA data; + struct ctdb_node_flag_change c; + struct ctdb_node *node = ctdb->nodes[ctdb->vnn]; + uint32_t old_flags = node->flags; + + node->flags |= m->set; + node->flags &= ~m->clear; + + if (node->flags == old_flags) { + /* no change */ + return 0; + } + + DEBUG(0, ("Control modflags on node %u - flags now 0x%x\n", ctdb->vnn, node->flags)); + + /* if we have been banned, go into recovery mode */ + c.vnn = ctdb->vnn; + c.flags = node->flags; + + data.dptr = (uint8_t *)&c; + data.dsize = sizeof(c); + + /* tell the other nodes that something has changed */ + ctdb_daemon_send_message(ctdb, CTDB_BROADCAST_CONNECTED, + CTDB_SRVID_NODE_FLAGS_CHANGED, data); + + if ((node->flags & NODE_FLAGS_BANNED) && !(old_flags & NODE_FLAGS_BANNED)) { + /* make sure we are frozen */ + DEBUG(0,("This node has been banned - forcing freeze and recovery\n")); + ctdb_start_freeze(ctdb); + ctdb_release_all_ips(ctdb); + ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE; + } + + return 0; +} diff --git a/source4/cluster/ctdb/server/ctdb_recover.c b/source4/cluster/ctdb/server/ctdb_recover.c new file mode 100644 index 0000000000..82338d48ce --- /dev/null +++ b/source4/cluster/ctdb/server/ctdb_recover.c @@ -0,0 +1,680 @@ +/* + ctdb recovery code + + Copyright (C) Andrew Tridgell 2007 + Copyright (C) Ronnie Sahlberg 2007 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ +#include "includes.h" +#include "lib/events/events.h" +#include "lib/tdb/include/tdb.h" +#include "system/network.h" +#include "system/filesys.h" +#include "system/wait.h" +#include "../include/ctdb_private.h" +#include "lib/util/dlinklist.h" +#include "db_wrap.h" + +/* + lock all databases - mark only + */ +static int ctdb_lock_all_databases_mark(struct ctdb_context *ctdb) +{ + struct ctdb_db_context *ctdb_db; + if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) { + DEBUG(0,("Attempt to mark all databases locked when not frozen\n")); + return -1; + } + for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) { + if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) { + return -1; + } + } + return 0; +} + +/* + lock all databases - unmark only + */ +static int ctdb_lock_all_databases_unmark(struct ctdb_context *ctdb) +{ + struct ctdb_db_context *ctdb_db; + if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) { + DEBUG(0,("Attempt to unmark all databases locked when not frozen\n")); + return -1; + } + for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) { + if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) { + return -1; + } + } + return 0; +} + + +int +ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata) +{ + CHECK_CONTROL_DATA_SIZE(0); + struct ctdb_vnn_map_wire *map; + size_t len; + + len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size; + map = talloc_size(outdata, len); + CTDB_NO_MEMORY_VOID(ctdb, map); + + map->generation = ctdb->vnn_map->generation; + map->size = ctdb->vnn_map->size; + memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size); + + outdata->dsize = len; + outdata->dptr = (uint8_t *)map; + + return 0; +} + +int +ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata) +{ + struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr; + + if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) { + DEBUG(0,("Attempt to set vnnmap when not frozen\n")); + return -1; + } + + talloc_free(ctdb->vnn_map); + + ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map); + CTDB_NO_MEMORY(ctdb, ctdb->vnn_map); + + ctdb->vnn_map->generation = map->generation; + ctdb->vnn_map->size = map->size; + ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size); + CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map); + + memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size); + + return 0; +} + +int +ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata) +{ + uint32_t i, len; + struct ctdb_db_context *ctdb_db; + struct ctdb_dbid_map *dbid_map; + + CHECK_CONTROL_DATA_SIZE(0); + + len = 0; + for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){ + len++; + } + + + outdata->dsize = offsetof(struct ctdb_dbid_map, dbids) + 4*len; + outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize); + if (!outdata->dptr) { + DEBUG(0, (__location__ " Failed to allocate dbmap array\n")); + exit(1); + } + + dbid_map = (struct ctdb_dbid_map *)outdata->dptr; + dbid_map->num = len; + for(i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){ + dbid_map->dbids[i] = ctdb_db->db_id; + } + + return 0; +} + +int +ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata) +{ + uint32_t i, num_nodes; + struct ctdb_node_map *node_map; + + CHECK_CONTROL_DATA_SIZE(0); + + num_nodes = ctdb->num_nodes; + + outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags); + outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize); + if (!outdata->dptr) { + DEBUG(0, (__location__ " Failed to allocate nodemap array\n")); + exit(1); + } + + node_map = (struct ctdb_node_map *)outdata->dptr; + node_map->num = num_nodes; + for (i=0; i<num_nodes; i++) { + inet_aton(ctdb->nodes[i]->address.address, &node_map->nodes[i].sin.sin_addr); + node_map->nodes[i].vnn = ctdb->nodes[i]->vnn; + node_map->nodes[i].flags = ctdb->nodes[i]->flags; + } + + return 0; +} + +struct getkeys_params { + struct ctdb_context *ctdb; + uint32_t lmaster; + uint32_t rec_count; + struct getkeys_rec { + TDB_DATA key; + TDB_DATA data; + } *recs; +}; + +static int traverse_getkeys(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p) +{ + struct getkeys_params *params = (struct getkeys_params *)p; + uint32_t lmaster; + + lmaster = ctdb_lmaster(params->ctdb, &key); + + /* only include this record if the lmaster matches or if + the wildcard lmaster (-1) was specified. + */ + if ((params->lmaster != CTDB_LMASTER_ANY) && (params->lmaster != lmaster)) { + return 0; + } + + params->recs = talloc_realloc(NULL, params->recs, struct getkeys_rec, params->rec_count+1); + key.dptr = talloc_memdup(params->recs, key.dptr, key.dsize); + data.dptr = talloc_memdup(params->recs, data.dptr, data.dsize); + params->recs[params->rec_count].key = key; + params->recs[params->rec_count].data = data; + params->rec_count++; + + return 0; +} + +/* + pul a bunch of records from a ltdb, filtering by lmaster + */ +int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata) +{ + struct ctdb_control_pulldb *pull; + struct ctdb_db_context *ctdb_db; + struct getkeys_params params; + struct ctdb_control_pulldb_reply *reply; + int i; + size_t len = 0; + + if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) { + DEBUG(0,("rejecting ctdb_control_pull_db when not frozen\n")); + return -1; + } + + pull = (struct ctdb_control_pulldb *)indata.dptr; + + ctdb_db = find_ctdb_db(ctdb, pull->db_id); + if (!ctdb_db) { + DEBUG(0,(__location__ " Unknown db\n")); + return -1; + } + + params.ctdb = ctdb; + params.lmaster = pull->lmaster; + + params.rec_count = 0; + params.recs = talloc_array(outdata, struct getkeys_rec, 0); + CTDB_NO_MEMORY(ctdb, params.recs); + + if (ctdb_lock_all_databases_mark(ctdb) != 0) { + DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n")); + return -1; + } + + tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_getkeys, ¶ms); + + ctdb_lock_all_databases_unmark(ctdb); + + reply = talloc(outdata, struct ctdb_control_pulldb_reply); + CTDB_NO_MEMORY(ctdb, reply); + + reply->db_id = pull->db_id; + reply->count = params.rec_count; + + len = offsetof(struct ctdb_control_pulldb_reply, data); + + for (i=0;i<reply->count;i++) { + struct ctdb_rec_data *rec; + rec = ctdb_marshall_record(outdata, 0, params.recs[i].key, params.recs[i].data); + reply = talloc_realloc_size(outdata, reply, rec->length + len); + memcpy(len+(uint8_t *)reply, rec, rec->length); + len += rec->length; + talloc_free(rec); + } + + talloc_free(params.recs); + + outdata->dptr = (uint8_t *)reply; + outdata->dsize = len; + + return 0; +} + +/* + push a bunch of records into a ltdb, filtering by rsn + */ +int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata) +{ + struct ctdb_control_pulldb_reply *reply = (struct ctdb_control_pulldb_reply *)indata.dptr; + struct ctdb_db_context *ctdb_db; + int i, ret; + struct ctdb_rec_data *rec; + + if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) { + DEBUG(0,("rejecting ctdb_control_push_db when not frozen\n")); + return -1; + } + + if (indata.dsize < offsetof(struct ctdb_control_pulldb_reply, data)) { + DEBUG(0,(__location__ " invalid data in pulldb reply\n")); + return -1; + } + + ctdb_db = find_ctdb_db(ctdb, reply->db_id); + if (!ctdb_db) { + DEBUG(0,(__location__ " Unknown db 0x%08x\n", reply->db_id)); + return -1; + } + + if (ctdb_lock_all_databases_mark(ctdb) != 0) { + DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n")); + return -1; + } + + rec = (struct ctdb_rec_data *)&reply->data[0]; + + DEBUG(3,("starting push of %u records for dbid 0x%x\n", + reply->count, reply->db_id)); + + for (i=0;i<reply->count;i++) { + TDB_DATA key, data; + struct ctdb_ltdb_header *hdr, header; + + key.dptr = &rec->data[0]; + key.dsize = rec->keylen; + data.dptr = &rec->data[key.dsize]; + data.dsize = rec->datalen; + + if (data.dsize < sizeof(struct ctdb_ltdb_header)) { + DEBUG(0,(__location__ " bad ltdb record\n")); + goto failed; + } + hdr = (struct ctdb_ltdb_header *)data.dptr; + data.dptr += sizeof(*hdr); + data.dsize -= sizeof(*hdr); + + ret = ctdb_ltdb_fetch(ctdb_db, key, &header, NULL, NULL); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to fetch record\n")); + goto failed; + } + /* The check for dmaster gives priority to the dmaster + if the rsn values are equal */ + if (header.rsn < hdr->rsn || + (header.dmaster != ctdb->vnn && header.rsn == hdr->rsn)) { + ret = ctdb_ltdb_store(ctdb_db, key, hdr, data); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to store record\n")); + goto failed; + } + } + + rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec); + } + + DEBUG(3,("finished push of %u records for dbid 0x%x\n", + reply->count, reply->db_id)); + + ctdb_lock_all_databases_unmark(ctdb); + return 0; + +failed: + ctdb_lock_all_databases_unmark(ctdb); + return -1; +} + + +static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p) +{ + uint32_t *dmaster = (uint32_t *)p; + struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr; + int ret; + + header->dmaster = *dmaster; + + ret = tdb_store(tdb, key, data, TDB_REPLACE); + if (ret) { + DEBUG(0,(__location__ " failed to write tdb data back ret:%d\n",ret)); + return ret; + } + return 0; +} + +int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata) +{ + struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr; + struct ctdb_db_context *ctdb_db; + + if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) { + DEBUG(0,("rejecting ctdb_control_set_dmaster when not frozen\n")); + return -1; + } + + ctdb_db = find_ctdb_db(ctdb, p->db_id); + if (!ctdb_db) { + DEBUG(0,(__location__ " Unknown db 0x%08x\n", p->db_id)); + return -1; + } + + if (ctdb_lock_all_databases_mark(ctdb) != 0) { + DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n")); + return -1; + } + + tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster); + + ctdb_lock_all_databases_unmark(ctdb); + + return 0; +} + +struct ctdb_set_recmode_state { + struct ctdb_req_control *c; + uint32_t recmode; +}; + +/* + called when the 'recovered' event script has finished + */ +static void ctdb_recovered_callback(struct ctdb_context *ctdb, int status, void *p) +{ + struct ctdb_set_recmode_state *state = talloc_get_type(p, struct ctdb_set_recmode_state); + + ctdb_start_monitoring(ctdb); + + if (status == 0) { + ctdb->recovery_mode = state->recmode; + } else { + DEBUG(0,(__location__ " recovered event script failed (status %d)\n", status)); + } + + ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL); + talloc_free(state); +} + +/* + set the recovery mode + */ +int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb, + struct ctdb_req_control *c, + TDB_DATA indata, bool *async_reply, + const char **errormsg) +{ + uint32_t recmode = *(uint32_t *)indata.dptr; + int ret; + struct ctdb_set_recmode_state *state; + + if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) { + DEBUG(0,("Attempt to change recovery mode to %u when not frozen\n", + recmode)); + (*errormsg) = "Cannot change recovery mode while not frozen"; + return -1; + } + + if (recmode != CTDB_RECOVERY_NORMAL || + ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) { + ctdb->recovery_mode = recmode; + return 0; + } + + /* some special handling when ending recovery mode */ + state = talloc(ctdb, struct ctdb_set_recmode_state); + CTDB_NO_MEMORY(ctdb, state); + + /* we should not be able to get the lock on the nodes list, as it should be + held by the recovery master */ + if (ctdb_recovery_lock(ctdb, false)) { + DEBUG(0,("ERROR: recovery lock file %s not locked when recovering!\n", + ctdb->recovery_lock_file)); + return -1; + } + + state->c = talloc_steal(state, c); + state->recmode = recmode; + + ctdb_stop_monitoring(ctdb); + + /* call the events script to tell all subsystems that we have recovered */ + ret = ctdb_event_script_callback(ctdb, + timeval_current_ofs(ctdb->tunable.script_timeout, 0), + state, + ctdb_recovered_callback, + state, "recovered"); + if (ret != 0) { + return ret; + } + *async_reply = true; + + return 0; +} + +/* + callback for ctdb_control_max_rsn + */ +static int traverse_max_rsn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p) +{ + struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr; + uint64_t *max_rsn = (uint64_t *)p; + + if (data.dsize >= sizeof(*h)) { + (*max_rsn) = MAX(*max_rsn, h->rsn); + } + return 0; +} + +/* + get max rsn across an entire db + */ +int32_t ctdb_control_max_rsn(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata) +{ + struct ctdb_db_context *ctdb_db; + uint32_t db_id = *(uint32_t *)indata.dptr; + uint64_t max_rsn = 0; + int ret; + + if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) { + DEBUG(0,("rejecting ctdb_control_max_rsn when not frozen\n")); + return -1; + } + + ctdb_db = find_ctdb_db(ctdb, db_id); + if (!ctdb_db) { + DEBUG(0,(__location__ " Unknown db\n")); + return -1; + } + + if (ctdb_lock_all_databases_mark(ctdb) != 0) { + DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n")); + return -1; + } + + ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_max_rsn, &max_rsn); + if (ret < 0) { + DEBUG(0,(__location__ " traverse failed in ctdb_control_max_rsn\n")); + return -1; + } + + ctdb_lock_all_databases_unmark(ctdb); + + outdata->dptr = (uint8_t *)talloc(outdata, uint64_t); + if (!outdata->dptr) { + return -1; + } + (*(uint64_t *)outdata->dptr) = max_rsn; + outdata->dsize = sizeof(uint64_t); + + return 0; +} + + +/* + callback for ctdb_control_set_rsn_nonempty + */ +static int traverse_set_rsn_nonempty(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p) +{ + struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr; + uint64_t *rsn = (uint64_t *)p; + + if (data.dsize > sizeof(*h)) { + h->rsn = *rsn; + if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) { + return -1; + } + } + return 0; +} + +/* + set rsn for all non-empty records in a database to a given rsn + */ +int32_t ctdb_control_set_rsn_nonempty(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata) +{ + struct ctdb_control_set_rsn_nonempty *p = (struct ctdb_control_set_rsn_nonempty *)indata.dptr; + struct ctdb_db_context *ctdb_db; + int ret; + + if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) { + DEBUG(0,("rejecting ctdb_control_set_rsn_nonempty when not frozen\n")); + return -1; + } + + ctdb_db = find_ctdb_db(ctdb, p->db_id); + if (!ctdb_db) { + DEBUG(0,(__location__ " Unknown db\n")); + return -1; + } + + if (ctdb_lock_all_databases_mark(ctdb) != 0) { + DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n")); + return -1; + } + + ret = tdb_traverse(ctdb_db->ltdb->tdb, traverse_set_rsn_nonempty, &p->rsn); + if (ret < 0) { + DEBUG(0,(__location__ " traverse failed in ctdb_control_set_rsn_nonempty\n")); + return -1; + } + + ctdb_lock_all_databases_unmark(ctdb); + + return 0; +} + + +/* + callback for ctdb_control_delete_low_rsn + */ +static int traverse_delete_low_rsn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p) +{ + struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr; + uint64_t *rsn = (uint64_t *)p; + + if (data.dsize < sizeof(*h) || h->rsn < *rsn) { + if (tdb_delete(tdb, key) != 0) { + return -1; + } + } + return 0; +} + +/* + delete any records with a rsn < the given rsn + */ +int32_t ctdb_control_delete_low_rsn(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata) +{ + struct ctdb_control_delete_low_rsn *p = (struct ctdb_control_delete_low_rsn *)indata.dptr; + struct ctdb_db_context *ctdb_db; + int ret; + + if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) { + DEBUG(0,("rejecting ctdb_control_delete_low_rsn when not frozen\n")); + return -1; + } + + ctdb_db = find_ctdb_db(ctdb, p->db_id); + if (!ctdb_db) { + DEBUG(0,(__location__ " Unknown db\n")); + return -1; + } + + if (ctdb_lock_all_databases_mark(ctdb) != 0) { + DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n")); + return -1; + } + + ret = tdb_traverse(ctdb_db->ltdb->tdb, traverse_delete_low_rsn, &p->rsn); + if (ret < 0) { + DEBUG(0,(__location__ " traverse failed in ctdb_control_delete_low_rsn\n")); + return -1; + } + + ctdb_lock_all_databases_unmark(ctdb); + + return 0; +} + + +/* + try and get the recovery lock in shared storage - should only work + on the recovery master recovery daemon. Anywhere else is a bug + */ +bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep) +{ + struct flock lock; + + if (ctdb->recovery_lock_fd != -1) { + close(ctdb->recovery_lock_fd); + } + ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600); + if (ctdb->recovery_lock_fd == -1) { + DEBUG(0,("Unable to open %s - (%s)\n", + ctdb->recovery_lock_file, strerror(errno))); + return false; + } + + lock.l_type = F_WRLCK; + lock.l_whence = SEEK_SET; + lock.l_start = 0; + lock.l_len = 1; + lock.l_pid = 0; + + if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) { + return false; + } + + if (!keep) { + close(ctdb->recovery_lock_fd); + ctdb->recovery_lock_fd = -1; + } + + return true; +} diff --git a/source4/cluster/ctdb/server/ctdb_recoverd.c b/source4/cluster/ctdb/server/ctdb_recoverd.c new file mode 100644 index 0000000000..5cb985521d --- /dev/null +++ b/source4/cluster/ctdb/server/ctdb_recoverd.c @@ -0,0 +1,1511 @@ +/* + ctdb recovery daemon + + Copyright (C) Ronnie Sahlberg 2007 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "system/filesys.h" +#include "system/time.h" +#include "popt.h" +#include "cmdline.h" +#include "../include/ctdb.h" +#include "../include/ctdb_private.h" + + +struct ban_state { + struct ctdb_recoverd *rec; + uint32_t banned_node; +}; + +/* + private state of recovery daemon + */ +struct ctdb_recoverd { + struct ctdb_context *ctdb; + uint32_t last_culprit; + uint32_t culprit_counter; + struct timeval first_recover_time; + struct ban_state **banned_nodes; + struct timeval priority_time; +}; + +#define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0) +#define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0) + +/* + unban a node + */ +static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t vnn) +{ + struct ctdb_context *ctdb = rec->ctdb; + + if (!ctdb_validate_vnn(ctdb, vnn)) { + DEBUG(0,("Bad vnn %u in ctdb_ban_node\n", vnn)); + return; + } + + if (rec->banned_nodes[vnn] == NULL) { + return; + } + + ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), vnn, 0, NODE_FLAGS_BANNED); + + talloc_free(rec->banned_nodes[vnn]); + rec->banned_nodes[vnn] = NULL; +} + + +/* + called when a ban has timed out + */ +static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p) +{ + struct ban_state *state = talloc_get_type(p, struct ban_state); + struct ctdb_recoverd *rec = state->rec; + uint32_t vnn = state->banned_node; + + DEBUG(0,("Node %u is now unbanned\n", vnn)); + ctdb_unban_node(rec, vnn); +} + +/* + ban a node for a period of time + */ +static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t vnn, uint32_t ban_time) +{ + struct ctdb_context *ctdb = rec->ctdb; + + if (!ctdb_validate_vnn(ctdb, vnn)) { + DEBUG(0,("Bad vnn %u in ctdb_ban_node\n", vnn)); + return; + } + + if (vnn == ctdb->vnn) { + DEBUG(0,("self ban - lowering our election priority\n")); + /* banning ourselves - lower our election priority */ + rec->priority_time = timeval_current(); + } + + ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), vnn, NODE_FLAGS_BANNED, 0); + + rec->banned_nodes[vnn] = talloc(rec, struct ban_state); + CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[vnn]); + + rec->banned_nodes[vnn]->rec = rec; + rec->banned_nodes[vnn]->banned_node = vnn; + + if (ban_time != 0) { + event_add_timed(ctdb->ev, rec->banned_nodes[vnn], + timeval_current_ofs(ban_time, 0), + ctdb_ban_timeout, rec->banned_nodes[vnn]); + } +} + + +/* + change recovery mode on all nodes + */ +static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode) +{ + int j, ret; + + /* start the freeze process immediately on all nodes */ + ctdb_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, + CTDB_CONTROL_FREEZE, CTDB_CTRL_FLAG_NOREPLY, tdb_null, + NULL, NULL, NULL, NULL, NULL); + + /* set recovery mode to active on all nodes */ + for (j=0; j<nodemap->num; j++) { + /* dont change it for nodes that are unavailable */ + if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) { + continue; + } + + if (rec_mode == CTDB_RECOVERY_ACTIVE) { + ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to freeze node %u\n", nodemap->nodes[j].vnn)); + return -1; + } + } + + ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, rec_mode); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to set recmode on node %u\n", nodemap->nodes[j].vnn)); + return -1; + } + + if (rec_mode == CTDB_RECOVERY_NORMAL) { + ret = ctdb_ctrl_thaw(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to thaw node %u\n", nodemap->nodes[j].vnn)); + return -1; + } + } + } + + return 0; +} + +/* + change recovery master on all node + */ +static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t vnn) +{ + int j, ret; + + /* set recovery master to vnn on all nodes */ + for (j=0; j<nodemap->num; j++) { + /* dont change it for nodes that are unavailable */ + if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) { + continue; + } + + ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, vnn); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to set recmaster on node %u\n", nodemap->nodes[j].vnn)); + return -1; + } + } + + return 0; +} + + +/* + ensure all other nodes have attached to any databases that we have + */ +static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, + uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx) +{ + int i, j, db, ret; + struct ctdb_dbid_map *remote_dbmap; + + /* verify that all other nodes have all our databases */ + for (j=0; j<nodemap->num; j++) { + /* we dont need to ourself ourselves */ + if (nodemap->nodes[j].vnn == vnn) { + continue; + } + /* dont check nodes that are unavailable */ + if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) { + continue; + } + + ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, + mem_ctx, &remote_dbmap); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to get dbids from node %u\n", vnn)); + return -1; + } + + /* step through all local databases */ + for (db=0; db<dbmap->num;db++) { + const char *name; + + + for (i=0;i<remote_dbmap->num;i++) { + if (dbmap->dbids[db] == remote_dbmap->dbids[i]) { + break; + } + } + /* the remote node already have this database */ + if (i!=remote_dbmap->num) { + continue; + } + /* ok so we need to create this database */ + ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), vnn, dbmap->dbids[db], mem_ctx, &name); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to get dbname from node %u\n", vnn)); + return -1; + } + ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, mem_ctx, name); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to create remote db:%s\n", name)); + return -1; + } + } + } + + return 0; +} + + +/* + ensure we are attached to any databases that anyone else is attached to + */ +static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, + uint32_t vnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx) +{ + int i, j, db, ret; + struct ctdb_dbid_map *remote_dbmap; + + /* verify that we have all database any other node has */ + for (j=0; j<nodemap->num; j++) { + /* we dont need to ourself ourselves */ + if (nodemap->nodes[j].vnn == vnn) { + continue; + } + /* dont check nodes that are unavailable */ + if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) { + continue; + } + + ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, + mem_ctx, &remote_dbmap); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to get dbids from node %u\n", vnn)); + return -1; + } + + /* step through all databases on the remote node */ + for (db=0; db<remote_dbmap->num;db++) { + const char *name; + + for (i=0;i<(*dbmap)->num;i++) { + if (remote_dbmap->dbids[db] == (*dbmap)->dbids[i]) { + break; + } + } + /* we already have this db locally */ + if (i!=(*dbmap)->num) { + continue; + } + /* ok so we need to create this database and + rebuild dbmap + */ + ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, + remote_dbmap->dbids[db], mem_ctx, &name); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to get dbname from node %u\n", + nodemap->nodes[j].vnn)); + return -1; + } + ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, name); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to create local db:%s\n", name)); + return -1; + } + ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, dbmap); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to reread dbmap on node %u\n", vnn)); + return -1; + } + } + } + + return 0; +} + + +/* + pull all the remote database contents into ours + */ +static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, + uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx) +{ + int i, j, ret; + + /* pull all records from all other nodes across onto this node + (this merges based on rsn) + */ + for (i=0;i<dbmap->num;i++) { + for (j=0; j<nodemap->num; j++) { + /* we dont need to merge with ourselves */ + if (nodemap->nodes[j].vnn == vnn) { + continue; + } + /* dont merge from nodes that are unavailable */ + if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) { + continue; + } + ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, + vnn, dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", + nodemap->nodes[j].vnn, vnn)); + return -1; + } + } + } + + return 0; +} + + +/* + change the dmaster on all databases to point to us + */ +static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, + uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx) +{ + int i, j, ret; + + /* update dmaster to point to this node for all databases/nodes */ + for (i=0;i<dbmap->num;i++) { + for (j=0; j<nodemap->num; j++) { + /* dont repoint nodes that are unavailable */ + if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) { + continue; + } + ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, ctdb, dbmap->dbids[i], vnn); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n", nodemap->nodes[j].vnn, dbmap->dbids[i])); + return -1; + } + } + } + + return 0; +} + + +/* + update flags on all active nodes + */ +static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap) +{ + int i; + for (i=0;i<nodemap->num;i++) { + struct ctdb_node_flag_change c; + TDB_DATA data; + + c.vnn = nodemap->nodes[i].vnn; + c.flags = nodemap->nodes[i].flags; + + data.dptr = (uint8_t *)&c; + data.dsize = sizeof(c); + + ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, + CTDB_SRVID_NODE_FLAGS_CHANGED, data); + + } + return 0; +} + +/* + vacuum one database + */ +static int vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *nodemap) +{ + uint64_t max_rsn; + int ret, i; + + /* find max rsn on our local node for this db */ + ret = ctdb_ctrl_get_max_rsn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, db_id, &max_rsn); + if (ret != 0) { + return -1; + } + + /* set rsn on non-empty records to max_rsn+1 */ + for (i=0;i<nodemap->num;i++) { + if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) { + continue; + } + ret = ctdb_ctrl_set_rsn_nonempty(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].vnn, + db_id, max_rsn+1); + if (ret != 0) { + DEBUG(0,(__location__ " Failed to set rsn on node %u to %llu\n", + nodemap->nodes[i].vnn, (unsigned long long)max_rsn+1)); + return -1; + } + } + + /* delete records with rsn < max_rsn+1 on all nodes */ + for (i=0;i<nodemap->num;i++) { + if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) { + continue; + } + ret = ctdb_ctrl_delete_low_rsn(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].vnn, + db_id, max_rsn+1); + if (ret != 0) { + DEBUG(0,(__location__ " Failed to delete records on node %u with rsn below %llu\n", + nodemap->nodes[i].vnn, (unsigned long long)max_rsn+1)); + return -1; + } + } + + + return 0; +} + + +/* + vacuum all attached databases + */ +static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, + struct ctdb_dbid_map *dbmap) +{ + int i; + + /* update dmaster to point to this node for all databases/nodes */ + for (i=0;i<dbmap->num;i++) { + if (vacuum_db(ctdb, dbmap->dbids[i], nodemap) != 0) { + return -1; + } + } + return 0; +} + + +/* + push out all our database contents to all other nodes + */ +static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, + uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx) +{ + int i, j, ret; + + /* push all records out to the nodes again */ + for (i=0;i<dbmap->num;i++) { + for (j=0; j<nodemap->num; j++) { + /* we dont need to push to ourselves */ + if (nodemap->nodes[j].vnn == vnn) { + continue; + } + /* dont push to nodes that are unavailable */ + if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) { + continue; + } + ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), vnn, nodemap->nodes[j].vnn, + dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", + vnn, nodemap->nodes[j].vnn)); + return -1; + } + } + } + + return 0; +} + + +/* + ensure all nodes have the same vnnmap we do + */ +static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, + uint32_t vnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx) +{ + int j, ret; + + /* push the new vnn map out to all the nodes */ + for (j=0; j<nodemap->num; j++) { + /* dont push to nodes that are unavailable */ + if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) { + continue; + } + + ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, mem_ctx, vnnmap); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", vnn)); + return -1; + } + } + + return 0; +} + + +/* + handler for when the admin bans a node +*/ +static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, + TDB_DATA data, void *private_data) +{ + struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd); + struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr; + uint32_t recmaster; + int ret; + + if (data.dsize != sizeof(*b)) { + DEBUG(0,("Bad data in ban_handler\n")); + return; + } + + ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster); + if (ret != 0) { + DEBUG(0,(__location__ " Failed to find the recmaster\n")); + return; + } + + if (recmaster != ctdb->vnn) { + DEBUG(0,("We are not the recmaster - ignoring ban request\n")); + return; + } + + DEBUG(0,("Node %u has been banned for %u seconds by the administrator\n", + b->vnn, b->ban_time)); + ctdb_ban_node(rec, b->vnn, b->ban_time); +} + +/* + handler for when the admin unbans a node +*/ +static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, + TDB_DATA data, void *private_data) +{ + struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd); + uint32_t vnn; + int ret; + uint32_t recmaster; + + if (data.dsize != sizeof(uint32_t)) { + DEBUG(0,("Bad data in unban_handler\n")); + return; + } + vnn = *(uint32_t *)data.dptr; + + ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster); + if (ret != 0) { + DEBUG(0,(__location__ " Failed to find the recmaster\n")); + return; + } + + if (recmaster != ctdb->vnn) { + DEBUG(0,("We are not the recmaster - ignoring unban request\n")); + return; + } + + DEBUG(0,("Node %u has been unbanned by the administrator\n", vnn)); + ctdb_unban_node(rec, vnn); +} + + + +/* + called when ctdb_wait_timeout should finish + */ +static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, + struct timeval yt, void *p) +{ + uint32_t *timed_out = (uint32_t *)p; + (*timed_out) = 1; +} + +/* + wait for a given number of seconds + */ +static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs) +{ + uint32_t timed_out = 0; + event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out); + while (!timed_out) { + event_loop_once(ctdb->ev); + } +} + +/* + we are the recmaster, and recovery is needed - start a recovery run + */ +static int do_recovery(struct ctdb_recoverd *rec, + TALLOC_CTX *mem_ctx, uint32_t vnn, uint32_t num_active, + struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap, + uint32_t culprit) +{ + struct ctdb_context *ctdb = rec->ctdb; + int i, j, ret; + uint32_t generation; + struct ctdb_dbid_map *dbmap; + + if (rec->last_culprit != culprit || + timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) { + /* either a new node is the culprit, or we've decide to forgive them */ + rec->last_culprit = culprit; + rec->first_recover_time = timeval_current(); + rec->culprit_counter = 0; + } + rec->culprit_counter++; + + if (rec->culprit_counter > 2*nodemap->num) { + DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n", + culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time), + ctdb->tunable.recovery_ban_period)); + ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period); + } + + if (!ctdb_recovery_lock(ctdb, true)) { + DEBUG(0,("Unable to get recovery lock - aborting recovery\n")); + return -1; + } + + /* set recovery mode to active on all nodes */ + ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE); + if (ret!=0) { + DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n")); + return -1; + } + + DEBUG(0, (__location__ " Recovery initiated due to problem with node %u\n", culprit)); + + /* pick a new generation number */ + generation = random(); + + /* change the vnnmap on this node to use the new generation + number but not on any other nodes. + this guarantees that if we abort the recovery prematurely + for some reason (a node stops responding?) + that we can just return immediately and we will reenter + recovery shortly again. + I.e. we deliberately leave the cluster with an inconsistent + generation id to allow us to abort recovery at any stage and + just restart it from scratch. + */ + vnnmap->generation = generation; + ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, vnnmap); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", vnn)); + return -1; + } + + /* get a list of all databases */ + ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, &dbmap); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to get dbids from node :%u\n", vnn)); + return -1; + } + + + + /* verify that all other nodes have all our databases */ + ret = create_missing_remote_databases(ctdb, nodemap, vnn, dbmap, mem_ctx); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to create missing remote databases\n")); + return -1; + } + + /* verify that we have all the databases any other node has */ + ret = create_missing_local_databases(ctdb, nodemap, vnn, &dbmap, mem_ctx); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to create missing local databases\n")); + return -1; + } + + + + /* verify that all other nodes have all our databases */ + ret = create_missing_remote_databases(ctdb, nodemap, vnn, dbmap, mem_ctx); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to create missing remote databases\n")); + return -1; + } + + + DEBUG(1, (__location__ " Recovery - created remote databases\n")); + + /* pull all remote databases onto the local node */ + ret = pull_all_remote_databases(ctdb, nodemap, vnn, dbmap, mem_ctx); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to pull remote databases\n")); + return -1; + } + + DEBUG(1, (__location__ " Recovery - pulled remote databases\n")); + + /* push all local databases to the remote nodes */ + ret = push_all_local_databases(ctdb, nodemap, vnn, dbmap, mem_ctx); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to push local databases\n")); + return -1; + } + + DEBUG(1, (__location__ " Recovery - pushed remote databases\n")); + + /* build a new vnn map with all the currently active and + unbanned nodes */ + generation = random(); + vnnmap = talloc(mem_ctx, struct ctdb_vnn_map); + CTDB_NO_MEMORY(ctdb, vnnmap); + vnnmap->generation = generation; + vnnmap->size = num_active; + vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size); + for (i=j=0;i<nodemap->num;i++) { + if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) { + vnnmap->map[j++] = nodemap->nodes[i].vnn; + } + } + + + + /* update to the new vnnmap on all nodes */ + ret = update_vnnmap_on_all_nodes(ctdb, nodemap, vnn, vnnmap, mem_ctx); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to update vnnmap on all nodes\n")); + return -1; + } + + DEBUG(1, (__location__ " Recovery - updated vnnmap\n")); + + /* update recmaster to point to us for all nodes */ + ret = set_recovery_master(ctdb, nodemap, vnn); + if (ret!=0) { + DEBUG(0, (__location__ " Unable to set recovery master\n")); + return -1; + } + + DEBUG(1, (__location__ " Recovery - updated recmaster\n")); + + /* repoint all local and remote database records to the local + node as being dmaster + */ + ret = update_dmaster_on_all_databases(ctdb, nodemap, vnn, dbmap, mem_ctx); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to update dmaster on all databases\n")); + return -1; + } + + DEBUG(1, (__location__ " Recovery - updated dmaster on all databases\n")); + + /* + update all nodes to have the same flags that we have + */ + ret = update_flags_on_all_nodes(ctdb, nodemap); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to update flags on all nodes\n")); + return -1; + } + + DEBUG(1, (__location__ " Recovery - updated flags\n")); + + /* + run a vacuum operation on empty records + */ + ret = vacuum_all_databases(ctdb, nodemap, dbmap); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to vacuum all databases\n")); + return -1; + } + + DEBUG(1, (__location__ " Recovery - vacuumed all databases\n")); + + /* + if enabled, tell nodes to takeover their public IPs + */ + if (ctdb->takeover.enabled) { + ret = ctdb_takeover_run(ctdb, nodemap); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to setup public takeover addresses\n")); + return -1; + } + DEBUG(1, (__location__ " Recovery - done takeover\n")); + } + + + /* disable recovery mode */ + ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL); + if (ret!=0) { + DEBUG(0, (__location__ " Unable to set recovery mode to normal on cluster\n")); + return -1; + } + + /* send a message to all clients telling them that the cluster + has been reconfigured */ + ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, CTDB_SRVID_RECONFIGURE, tdb_null); + + DEBUG(0, (__location__ " Recovery complete\n")); + + /* We just finished a recovery successfully. + We now wait for rerecovery_timeout before we allow + another recovery to take place. + */ + DEBUG(0, (__location__ " New recoveries supressed for the rerecovery timeout\n")); + ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout); + DEBUG(0, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n")); + + return 0; +} + + +/* + elections are won by first checking the number of connected nodes, then + the priority time, then the vnn + */ +struct election_message { + uint32_t num_connected; + struct timeval priority_time; + uint32_t vnn; +}; + +/* + form this nodes election data + */ +static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em) +{ + int ret, i; + struct ctdb_node_map *nodemap; + struct ctdb_context *ctdb = rec->ctdb; + + ZERO_STRUCTP(em); + + em->vnn = rec->ctdb->vnn; + em->priority_time = rec->priority_time; + + ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap); + if (ret != 0) { + return; + } + + for (i=0;i<nodemap->num;i++) { + if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) { + em->num_connected++; + } + } + talloc_free(nodemap); +} + +/* + see if the given election data wins + */ +static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em) +{ + struct election_message myem; + int cmp; + + ctdb_election_data(rec, &myem); + + /* try to use the most connected node */ + cmp = (int)myem.num_connected - (int)em->num_connected; + + /* then the longest running node */ + if (cmp == 0) { + cmp = timeval_compare(&em->priority_time, &myem.priority_time); + } + + if (cmp == 0) { + cmp = (int)myem.vnn - (int)em->vnn; + } + + return cmp > 0; +} + +/* + send out an election request + */ +static int send_election_request(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t vnn) +{ + int ret; + TDB_DATA election_data; + struct election_message emsg; + uint64_t srvid; + struct ctdb_context *ctdb = rec->ctdb; + + srvid = CTDB_SRVID_RECOVERY; + + ctdb_election_data(rec, &emsg); + + election_data.dsize = sizeof(struct election_message); + election_data.dptr = (unsigned char *)&emsg; + + + /* first we assume we will win the election and set + recoverymaster to be ourself on the current node + */ + ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), vnn, vnn); + if (ret != 0) { + DEBUG(0, (__location__ " failed to send recmaster election request\n")); + return -1; + } + + + /* send an election message to all active nodes */ + ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data); + + return 0; +} + +/* + this function will unban all nodes in the cluster +*/ +static void unban_all_nodes(struct ctdb_context *ctdb) +{ + int ret, i; + struct ctdb_node_map *nodemap; + TALLOC_CTX *tmp_ctx = talloc_new(ctdb); + + ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap); + if (ret != 0) { + DEBUG(0,(__location__ " failed to get nodemap to unban all nodes\n")); + return; + } + + for (i=0;i<nodemap->num;i++) { + if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) + && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) { + ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].vnn, 0, NODE_FLAGS_BANNED); + } + } + + talloc_free(tmp_ctx); +} + +/* + handler for recovery master elections +*/ +static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, + TDB_DATA data, void *private_data) +{ + struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd); + int ret; + struct election_message *em = (struct election_message *)data.dptr; + TALLOC_CTX *mem_ctx; + + mem_ctx = talloc_new(ctdb); + + /* someone called an election. check their election data + and if we disagree and we would rather be the elected node, + send a new election message to all other nodes + */ + if (ctdb_election_win(rec, em)) { + ret = send_election_request(rec, mem_ctx, ctdb_get_vnn(ctdb)); + if (ret!=0) { + DEBUG(0, (__location__ " failed to initiate recmaster election")); + } + talloc_free(mem_ctx); + /*unban_all_nodes(ctdb);*/ + return; + } + + /* release the recmaster lock */ + if (em->vnn != ctdb->vnn && + ctdb->recovery_lock_fd != -1) { + close(ctdb->recovery_lock_fd); + ctdb->recovery_lock_fd = -1; + unban_all_nodes(ctdb); + } + + /* ok, let that guy become recmaster then */ + ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_vnn(ctdb), em->vnn); + if (ret != 0) { + DEBUG(0, (__location__ " failed to send recmaster election request")); + talloc_free(mem_ctx); + return; + } + + /* release any bans */ + rec->last_culprit = (uint32_t)-1; + talloc_free(rec->banned_nodes); + rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes); + CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes); + + talloc_free(mem_ctx); + return; +} + + +/* + force the start of the election process + */ +static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t vnn, + struct ctdb_node_map *nodemap) +{ + int ret; + struct ctdb_context *ctdb = rec->ctdb; + + /* set all nodes to recovery mode to stop all internode traffic */ + ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE); + if (ret!=0) { + DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n")); + return; + } + + ret = send_election_request(rec, mem_ctx, vnn); + if (ret!=0) { + DEBUG(0, (__location__ " failed to initiate recmaster election")); + return; + } + + /* wait for a few seconds to collect all responses */ + ctdb_wait_timeout(ctdb, ctdb->tunable.election_timeout); +} + + + +/* + handler for when a node changes its flags +*/ +static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, + TDB_DATA data, void *private_data) +{ + int ret; + struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr; + struct ctdb_node_map *nodemap=NULL; + TALLOC_CTX *tmp_ctx; + int i; + + if (data.dsize != sizeof(*c)) { + DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n")); + return; + } + + tmp_ctx = talloc_new(ctdb); + CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx); + + ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap); + + for (i=0;i<nodemap->num;i++) { + if (nodemap->nodes[i].vnn == c->vnn) break; + } + + if (i == nodemap->num) { + DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->vnn)); + talloc_free(tmp_ctx); + return; + } + + /* Dont let messages from remote nodes change the DISCONNECTED flag. + This flag is handled locally based on whether the local node + can communicate with the node or not. + */ + c->flags &= ~NODE_FLAGS_DISCONNECTED; + if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) { + c->flags |= NODE_FLAGS_DISCONNECTED; + } + + if (nodemap->nodes[i].flags != c->flags) { + DEBUG(0,("Node %u has changed flags - now 0x%x\n", c->vnn, c->flags)); + } + + nodemap->nodes[i].flags = c->flags; + + ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), + CTDB_CURRENT_NODE, &ctdb->recovery_master); + + if (ret == 0) { + ret = ctdb_ctrl_getrecmode(ctdb, CONTROL_TIMEOUT(), + CTDB_CURRENT_NODE, &ctdb->recovery_mode); + } + + if (ret == 0 && + ctdb->recovery_master == ctdb->vnn && + ctdb->recovery_mode == CTDB_RECOVERY_NORMAL && + ctdb->takeover.enabled) { + ret = ctdb_takeover_run(ctdb, nodemap); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to setup public takeover addresses\n")); + } + } + + talloc_free(tmp_ctx); +} + + + +/* + the main monitoring loop + */ +static void monitor_cluster(struct ctdb_context *ctdb) +{ + uint32_t vnn, num_active, recmode, recmaster; + TALLOC_CTX *mem_ctx=NULL; + struct ctdb_node_map *nodemap=NULL; + struct ctdb_node_map *remote_nodemap=NULL; + struct ctdb_vnn_map *vnnmap=NULL; + struct ctdb_vnn_map *remote_vnnmap=NULL; + int i, j, ret; + bool need_takeover_run; + struct ctdb_recoverd *rec; + + rec = talloc_zero(ctdb, struct ctdb_recoverd); + CTDB_NO_MEMORY_FATAL(ctdb, rec); + + rec->ctdb = ctdb; + rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes); + CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes); + + rec->priority_time = timeval_current(); + + /* register a message port for recovery elections */ + ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec); + + /* and one for when nodes are disabled/enabled */ + ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec); + + /* and one for when nodes are banned */ + ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec); + + /* and one for when nodes are unbanned */ + ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec); + +again: + need_takeover_run = false; + + if (mem_ctx) { + talloc_free(mem_ctx); + mem_ctx = NULL; + } + mem_ctx = talloc_new(ctdb); + if (!mem_ctx) { + DEBUG(0,("Failed to create temporary context\n")); + exit(-1); + } + + /* we only check for recovery once every second */ + ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval); + + /* get relevant tunables */ + ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable); + if (ret != 0) { + DEBUG(0,("Failed to get tunables - retrying\n")); + goto again; + } + + vnn = ctdb_ctrl_getvnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE); + if (vnn == (uint32_t)-1) { + DEBUG(0,("Failed to get local vnn - retrying\n")); + goto again; + } + + /* get the vnnmap */ + ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, &vnnmap); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to get vnnmap from node %u\n", vnn)); + goto again; + } + + + /* get number of nodes */ + ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, &nodemap); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to get nodemap from node %u\n", vnn)); + goto again; + } + + + /* count how many active nodes there are */ + num_active = 0; + for (i=0; i<nodemap->num; i++) { + if (rec->banned_nodes[nodemap->nodes[i].vnn] != NULL) { + nodemap->nodes[i].flags |= NODE_FLAGS_BANNED; + } else { + nodemap->nodes[i].flags &= ~NODE_FLAGS_BANNED; + } + if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) { + num_active++; + } + } + + + /* check which node is the recovery master */ + ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), vnn, &recmaster); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", vnn)); + goto again; + } + + if (recmaster == (uint32_t)-1) { + DEBUG(0,(__location__ " Initial recovery master set - forcing election\n")); + force_election(rec, mem_ctx, vnn, nodemap); + goto again; + } + + /* verify that the recmaster node is still active */ + for (j=0; j<nodemap->num; j++) { + if (nodemap->nodes[j].vnn==recmaster) { + break; + } + } + + if (j == nodemap->num) { + DEBUG(0, ("Recmaster node %u not in list. Force reelection\n", recmaster)); + force_election(rec, mem_ctx, vnn, nodemap); + goto again; + } + + if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) { + DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].vnn)); + force_election(rec, mem_ctx, vnn, nodemap); + goto again; + } + + + /* if we are not the recmaster then we do not need to check + if recovery is needed + */ + if (vnn!=recmaster) { + goto again; + } + + + /* verify that all active nodes agree that we are the recmaster */ + for (j=0; j<nodemap->num; j++) { + if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) { + continue; + } + if (nodemap->nodes[j].vnn == vnn) { + continue; + } + + ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, &recmaster); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", vnn)); + goto again; + } + + if (recmaster!=vnn) { + DEBUG(0, ("Node %u does not agree we are the recmaster. Force reelection\n", + nodemap->nodes[j].vnn)); + force_election(rec, mem_ctx, vnn, nodemap); + goto again; + } + } + + + /* verify that all active nodes are in normal mode + and not in recovery mode + */ + for (j=0; j<nodemap->num; j++) { + if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) { + continue; + } + + ret = ctdb_ctrl_getrecmode(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, &recmode); + if (ret != 0) { + DEBUG(0, ("Unable to get recmode from node %u\n", vnn)); + goto again; + } + if (recmode != CTDB_RECOVERY_NORMAL) { + DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", + nodemap->nodes[j].vnn)); + do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn); + goto again; + } + } + + + /* get the nodemap for all active remote nodes and verify + they are the same as for this node + */ + for (j=0; j<nodemap->num; j++) { + if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) { + continue; + } + if (nodemap->nodes[j].vnn == vnn) { + continue; + } + + ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, + mem_ctx, &remote_nodemap); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", + nodemap->nodes[j].vnn)); + goto again; + } + + /* if the nodes disagree on how many nodes there are + then this is a good reason to try recovery + */ + if (remote_nodemap->num != nodemap->num) { + DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n", + nodemap->nodes[j].vnn, remote_nodemap->num, nodemap->num)); + do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn); + goto again; + } + + /* if the nodes disagree on which nodes exist and are + active, then that is also a good reason to do recovery + */ + for (i=0;i<nodemap->num;i++) { + if (remote_nodemap->nodes[i].vnn != nodemap->nodes[i].vnn) { + DEBUG(0, (__location__ " Remote node:%u has different nodemap vnn for %d (%u vs %u).\n", + nodemap->nodes[j].vnn, i, + remote_nodemap->nodes[i].vnn, nodemap->nodes[i].vnn)); + do_recovery(rec, mem_ctx, vnn, num_active, nodemap, + vnnmap, nodemap->nodes[j].vnn); + goto again; + } + if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != + (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) { + DEBUG(0, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", + nodemap->nodes[j].vnn, i, + remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags)); + do_recovery(rec, mem_ctx, vnn, num_active, nodemap, + vnnmap, nodemap->nodes[j].vnn); + goto again; + } + } + + /* update our nodemap flags according to the other + server - this gets the NODE_FLAGS_DISABLED + flag. Note that the remote node is authoritative + for its flags (except CONNECTED, which we know + matches in this code) */ + if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) { + nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags; + need_takeover_run = true; + } + } + + + /* there better be the same number of lmasters in the vnn map + as there are active nodes or we will have to do a recovery + */ + if (vnnmap->size != num_active) { + DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", + vnnmap->size, num_active)); + do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, ctdb->vnn); + goto again; + } + + /* verify that all active nodes in the nodemap also exist in + the vnnmap. + */ + for (j=0; j<nodemap->num; j++) { + if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) { + continue; + } + if (nodemap->nodes[j].vnn == vnn) { + continue; + } + + for (i=0; i<vnnmap->size; i++) { + if (vnnmap->map[i] == nodemap->nodes[j].vnn) { + break; + } + } + if (i == vnnmap->size) { + DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", + nodemap->nodes[j].vnn)); + do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn); + goto again; + } + } + + + /* verify that all other nodes have the same vnnmap + and are from the same generation + */ + for (j=0; j<nodemap->num; j++) { + if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) { + continue; + } + if (nodemap->nodes[j].vnn == vnn) { + continue; + } + + ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, + mem_ctx, &remote_vnnmap); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to get vnnmap from remote node %u\n", + nodemap->nodes[j].vnn)); + goto again; + } + + /* verify the vnnmap generation is the same */ + if (vnnmap->generation != remote_vnnmap->generation) { + DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", + nodemap->nodes[j].vnn, remote_vnnmap->generation, vnnmap->generation)); + do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn); + goto again; + } + + /* verify the vnnmap size is the same */ + if (vnnmap->size != remote_vnnmap->size) { + DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", + nodemap->nodes[j].vnn, remote_vnnmap->size, vnnmap->size)); + do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn); + goto again; + } + + /* verify the vnnmap is the same */ + for (i=0;i<vnnmap->size;i++) { + if (remote_vnnmap->map[i] != vnnmap->map[i]) { + DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", + nodemap->nodes[j].vnn)); + do_recovery(rec, mem_ctx, vnn, num_active, nodemap, + vnnmap, nodemap->nodes[j].vnn); + goto again; + } + } + } + + /* we might need to change who has what IP assigned */ + if (need_takeover_run && ctdb->takeover.enabled) { + ret = ctdb_takeover_run(ctdb, nodemap); + if (ret != 0) { + DEBUG(0, (__location__ " Unable to setup public takeover addresses\n")); + } + } + + goto again; + +} + +/* + event handler for when the main ctdbd dies + */ +static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private_data) +{ + DEBUG(0,("recovery daemon parent died - exiting\n")); + _exit(1); +} + + + +/* + startup the recovery daemon as a child of the main ctdb daemon + */ +int ctdb_start_recoverd(struct ctdb_context *ctdb) +{ + int ret; + int fd[2]; + pid_t child; + + if (pipe(fd) != 0) { + return -1; + } + + child = fork(); + if (child == -1) { + return -1; + } + + if (child != 0) { + close(fd[0]); + return 0; + } + + close(fd[1]); + + /* shutdown the transport */ + ctdb->methods->shutdown(ctdb); + + /* get a new event context */ + talloc_free(ctdb->ev); + ctdb->ev = event_context_init(ctdb); + + event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, + ctdb_recoverd_parent, &fd[0]); + + close(ctdb->daemon.sd); + ctdb->daemon.sd = -1; + + srandom(getpid() ^ time(NULL)); + + /* initialise ctdb */ + ret = ctdb_socket_connect(ctdb); + if (ret != 0) { + DEBUG(0, (__location__ " Failed to init ctdb\n")); + exit(1); + } + + monitor_cluster(ctdb); + + DEBUG(0,("ERROR: ctdb_recoverd finished!?\n")); + return -1; +} diff --git a/source4/cluster/ctdb/server/ctdb_server.c b/source4/cluster/ctdb/server/ctdb_server.c new file mode 100644 index 0000000000..1480127327 --- /dev/null +++ b/source4/cluster/ctdb/server/ctdb_server.c @@ -0,0 +1,469 @@ +/* + ctdb main protocol code + + Copyright (C) Andrew Tridgell 2006 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "lib/tdb/include/tdb.h" +#include "lib/events/events.h" +#include "lib/util/dlinklist.h" +#include "system/network.h" +#include "system/filesys.h" +#include "../include/ctdb_private.h" + +/* + choose the transport we will use +*/ +int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport) +{ + ctdb->transport = talloc_strdup(ctdb, transport); + return 0; +} + +/* + choose the recovery lock file +*/ +int ctdb_set_recovery_lock_file(struct ctdb_context *ctdb, const char *file) +{ + ctdb->recovery_lock_file = talloc_strdup(ctdb, file); + return 0; +} + +/* + choose the logfile location +*/ +int ctdb_set_logfile(struct ctdb_context *ctdb, const char *logfile) +{ + ctdb->logfile = talloc_strdup(ctdb, logfile); + if (ctdb->logfile != NULL && strcmp(logfile, "-") != 0) { + int fd; + fd = open(ctdb->logfile, O_WRONLY|O_APPEND|O_CREAT, 0666); + if (fd == -1) { + printf("Failed to open logfile %s\n", ctdb->logfile); + abort(); + } + close(1); + close(2); + if (fd != 1) { + dup2(fd, 1); + close(fd); + } + /* also catch stderr of subcommands to the log file */ + dup2(1, 2); + } + return 0; +} + + +/* + set the directory for the local databases +*/ +int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir) +{ + ctdb->db_directory = talloc_strdup(ctdb, dir); + if (ctdb->db_directory == NULL) { + return -1; + } + return 0; +} + +/* + add a node to the list of active nodes +*/ +static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr) +{ + struct ctdb_node *node, **nodep; + + nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1); + CTDB_NO_MEMORY(ctdb, nodep); + + ctdb->nodes = nodep; + nodep = &ctdb->nodes[ctdb->num_nodes]; + (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node); + CTDB_NO_MEMORY(ctdb, *nodep); + node = *nodep; + + if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) { + return -1; + } + node->ctdb = ctdb; + node->name = talloc_asprintf(node, "%s:%u", + node->address.address, + node->address.port); + /* this assumes that the nodes are kept in sorted order, and no gaps */ + node->vnn = ctdb->num_nodes; + + /* nodes start out disconnected */ + node->flags |= NODE_FLAGS_DISCONNECTED; + + if (ctdb->address.address && + ctdb_same_address(&ctdb->address, &node->address)) { + ctdb->vnn = node->vnn; + node->flags &= ~NODE_FLAGS_DISCONNECTED; + } + + ctdb->num_nodes++; + node->dead_count = 0; + + return 0; +} + +/* + setup the node list from a file +*/ +int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist) +{ + char **lines; + int nlines; + int i; + + talloc_free(ctdb->node_list_file); + ctdb->node_list_file = talloc_strdup(ctdb, nlist); + + lines = file_lines_load(nlist, &nlines, ctdb); + if (lines == NULL) { + ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist); + return -1; + } + while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) { + nlines--; + } + + for (i=0;i<nlines;i++) { + if (ctdb_add_node(ctdb, lines[i]) != 0) { + talloc_free(lines); + return -1; + } + } + + /* initialize the vnn mapping table now that we have num_nodes setup */ +/* +XXX we currently initialize it to the maximum number of nodes to +XXX make it behave the same way as previously. +XXX Once we have recovery working we should initialize this always to +XXX generation==0 (==invalid) and let the recovery tool populate this +XXX table for the daemons. +*/ + ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map); + CTDB_NO_MEMORY(ctdb, ctdb->vnn_map); + + ctdb->vnn_map->generation = 1; + ctdb->vnn_map->size = ctdb->num_nodes; + ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size); + CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map); + + for(i=0;i<ctdb->vnn_map->size;i++) { + ctdb->vnn_map->map[i] = i; + } + + talloc_free(lines); + return 0; +} + + +/* + setup the local node address +*/ +int ctdb_set_address(struct ctdb_context *ctdb, const char *address) +{ + if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) { + return -1; + } + + ctdb->name = talloc_asprintf(ctdb, "%s:%u", + ctdb->address.address, + ctdb->address.port); + return 0; +} + + +/* + return the number of active nodes +*/ +uint32_t ctdb_get_num_active_nodes(struct ctdb_context *ctdb) +{ + int i; + uint32_t count=0; + for (i=0;i<ctdb->vnn_map->size;i++) { + struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]]; + if (!(node->flags & NODE_FLAGS_INACTIVE)) { + count++; + } + } + return count; +} + + +/* + called when we need to process a packet. This can be a requeued packet + after a lockwait, or a real packet from another node +*/ +void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + TALLOC_CTX *tmp_ctx; + + /* place the packet as a child of the tmp_ctx. We then use + talloc_free() below to free it. If any of the calls want + to keep it, then they will steal it somewhere else, and the + talloc_free() will only free the tmp_ctx */ + tmp_ctx = talloc_new(ctdb); + talloc_steal(tmp_ctx, hdr); + + DEBUG(3,(__location__ " ctdb request %u of type %u length %u from " + "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length, + hdr->srcnode, hdr->destnode)); + + switch (hdr->operation) { + case CTDB_REQ_CALL: + case CTDB_REPLY_CALL: + case CTDB_REQ_DMASTER: + case CTDB_REPLY_DMASTER: + /* for ctdb_call inter-node operations verify that the + remote node that sent us the call is running in the + same generation instance as this node + */ + if (ctdb->vnn_map->generation != hdr->generation) { + DEBUG(0,(__location__ " ctdb request %u" + " length %u from node %u to %u had an" + " invalid generation id:%u while our" + " generation id is:%u\n", + hdr->reqid, hdr->length, + hdr->srcnode, hdr->destnode, + hdr->generation, ctdb->vnn_map->generation)); + goto done; + } + } + + switch (hdr->operation) { + case CTDB_REQ_CALL: + ctdb->statistics.node.req_call++; + ctdb_request_call(ctdb, hdr); + break; + + case CTDB_REPLY_CALL: + ctdb->statistics.node.reply_call++; + ctdb_reply_call(ctdb, hdr); + break; + + case CTDB_REPLY_ERROR: + ctdb->statistics.node.reply_error++; + ctdb_reply_error(ctdb, hdr); + break; + + case CTDB_REQ_DMASTER: + ctdb->statistics.node.req_dmaster++; + ctdb_request_dmaster(ctdb, hdr); + break; + + case CTDB_REPLY_DMASTER: + ctdb->statistics.node.reply_dmaster++; + ctdb_reply_dmaster(ctdb, hdr); + break; + + case CTDB_REQ_MESSAGE: + ctdb->statistics.node.req_message++; + ctdb_request_message(ctdb, hdr); + break; + + case CTDB_REQ_CONTROL: + ctdb->statistics.node.req_control++; + ctdb_request_control(ctdb, hdr); + break; + + case CTDB_REPLY_CONTROL: + ctdb->statistics.node.reply_control++; + ctdb_reply_control(ctdb, hdr); + break; + + case CTDB_REQ_KEEPALIVE: + ctdb->statistics.keepalive_packets_recv++; + break; + + default: + DEBUG(0,("%s: Packet with unknown operation %u\n", + __location__, hdr->operation)); + break; + } + +done: + talloc_free(tmp_ctx); +} + + +/* + called by the transport layer when a node is dead +*/ +void ctdb_node_dead(struct ctdb_node *node) +{ + if (node->flags & NODE_FLAGS_DISCONNECTED) { + DEBUG(1,("%s: node %s is already marked disconnected: %u connected\n", + node->ctdb->name, node->name, + node->ctdb->num_connected)); + return; + } + node->ctdb->num_connected--; + node->flags |= NODE_FLAGS_DISCONNECTED; + node->rx_cnt = 0; + node->dead_count = 0; + DEBUG(1,("%s: node %s is dead: %u connected\n", + node->ctdb->name, node->name, node->ctdb->num_connected)); + ctdb_daemon_cancel_controls(node->ctdb, node); +} + +/* + called by the transport layer when a node is connected +*/ +void ctdb_node_connected(struct ctdb_node *node) +{ + if (!(node->flags & NODE_FLAGS_DISCONNECTED)) { + DEBUG(1,("%s: node %s is already marked connected: %u connected\n", + node->ctdb->name, node->name, + node->ctdb->num_connected)); + return; + } + node->ctdb->num_connected++; + node->dead_count = 0; + node->flags &= ~NODE_FLAGS_DISCONNECTED; + DEBUG(1,("%s: connected to %s - %u connected\n", + node->ctdb->name, node->name, node->ctdb->num_connected)); +} + +struct queue_next { + struct ctdb_context *ctdb; + struct ctdb_req_header *hdr; +}; + + +/* + trigered when a deferred packet is due + */ +static void queue_next_trigger(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct queue_next *q = talloc_get_type(private_data, struct queue_next); + ctdb_input_pkt(q->ctdb, q->hdr); + talloc_free(q); +} + +/* + defer a packet, so it is processed on the next event loop + this is used for sending packets to ourselves + */ +static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct queue_next *q; + q = talloc(ctdb, struct queue_next); + if (q == NULL) { + DEBUG(0,(__location__ " Failed to allocate deferred packet\n")); + return; + } + q->ctdb = ctdb; + q->hdr = talloc_memdup(ctdb, hdr, hdr->length); + if (q->hdr == NULL) { + DEBUG(0,("Error copying deferred packet to self\n")); + return; + } +#if 0 + /* use this to put packets directly into our recv function */ + ctdb_input_pkt(q->ctdb, q->hdr); +#else + event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q); +#endif +} + + +/* + broadcast a packet to all nodes +*/ +static void ctdb_broadcast_packet_all(struct ctdb_context *ctdb, + struct ctdb_req_header *hdr) +{ + int i; + for (i=0;i<ctdb->num_nodes;i++) { + hdr->destnode = ctdb->nodes[i]->vnn; + ctdb_queue_packet(ctdb, hdr); + } +} + +/* + broadcast a packet to all nodes in the current vnnmap +*/ +static void ctdb_broadcast_packet_vnnmap(struct ctdb_context *ctdb, + struct ctdb_req_header *hdr) +{ + int i; + for (i=0;i<ctdb->vnn_map->size;i++) { + hdr->destnode = ctdb->vnn_map->map[i]; + ctdb_queue_packet(ctdb, hdr); + } +} + +/* + broadcast a packet to all connected nodes +*/ +static void ctdb_broadcast_packet_connected(struct ctdb_context *ctdb, + struct ctdb_req_header *hdr) +{ + int i; + for (i=0;i<ctdb->num_nodes;i++) { + if (!(ctdb->nodes[i]->flags & NODE_FLAGS_DISCONNECTED)) { + hdr->destnode = ctdb->nodes[i]->vnn; + ctdb_queue_packet(ctdb, hdr); + } + } +} + +/* + queue a packet or die +*/ +void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_node *node; + + switch (hdr->destnode) { + case CTDB_BROADCAST_ALL: + ctdb_broadcast_packet_all(ctdb, hdr); + return; + case CTDB_BROADCAST_VNNMAP: + ctdb_broadcast_packet_vnnmap(ctdb, hdr); + return; + case CTDB_BROADCAST_CONNECTED: + ctdb_broadcast_packet_connected(ctdb, hdr); + return; + } + + ctdb->statistics.node_packets_sent++; + + if (!ctdb_validate_vnn(ctdb, hdr->destnode)) { + DEBUG(0,(__location__ " cant send to node %u that does not exist\n", + hdr->destnode)); + return; + } + + node = ctdb->nodes[hdr->destnode]; + + if (hdr->destnode == ctdb->vnn) { + ctdb_defer_packet(ctdb, hdr); + } else { + node->tx_cnt++; + if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) { + ctdb_fatal(ctdb, "Unable to queue packet\n"); + } + } +} + + diff --git a/source4/cluster/ctdb/server/ctdb_takeover.c b/source4/cluster/ctdb/server/ctdb_takeover.c new file mode 100644 index 0000000000..42a23808dd --- /dev/null +++ b/source4/cluster/ctdb/server/ctdb_takeover.c @@ -0,0 +1,822 @@ +/* + ctdb recovery code + + Copyright (C) Ronnie Sahlberg 2007 + Copyright (C) Andrew Tridgell 2007 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ +#include "includes.h" +#include "lib/events/events.h" +#include "lib/tdb/include/tdb.h" +#include "lib/util/dlinklist.h" +#include "system/network.h" +#include "system/filesys.h" +#include "system/wait.h" +#include "../include/ctdb_private.h" + + +#define TAKEOVER_TIMEOUT() timeval_current_ofs(ctdb->tunable.takeover_timeout,0) + +#define CTDB_ARP_INTERVAL 1 +#define CTDB_ARP_REPEAT 3 + +struct ctdb_takeover_arp { + struct ctdb_context *ctdb; + uint32_t count; + struct sockaddr_in sin; + struct ctdb_tcp_list *tcp_list; +}; + +/* + lists of tcp endpoints + */ +struct ctdb_tcp_list { + struct ctdb_tcp_list *prev, *next; + uint32_t vnn; + struct sockaddr_in saddr; + struct sockaddr_in daddr; +}; + + +/* + list of clients to kill on IP release + */ +struct ctdb_client_ip { + struct ctdb_client_ip *prev, *next; + struct ctdb_context *ctdb; + struct sockaddr_in ip; + uint32_t client_id; +}; + + +/* + send a gratuitous arp + */ +static void ctdb_control_send_arp(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ctdb_takeover_arp *arp = talloc_get_type(private_data, + struct ctdb_takeover_arp); + int ret; + struct ctdb_tcp_list *tcp; + + ret = ctdb_sys_send_arp(&arp->sin, arp->ctdb->takeover.interface); + if (ret != 0) { + DEBUG(0,(__location__ " sending of arp failed (%s)\n", strerror(errno))); + } + + for (tcp=arp->tcp_list;tcp;tcp=tcp->next) { + DEBUG(2,("sending tcp tickle ack for %u->%s:%u\n", + (unsigned)ntohs(tcp->daddr.sin_port), + inet_ntoa(tcp->saddr.sin_addr), + (unsigned)ntohs(tcp->saddr.sin_port))); + ret = ctdb_sys_send_tcp(&tcp->saddr, &tcp->daddr, 0, 0, 0); + if (ret != 0) { + DEBUG(0,(__location__ " Failed to send tcp tickle ack for %s\n", + inet_ntoa(tcp->saddr.sin_addr))); + } + } + + arp->count++; + + if (arp->count == CTDB_ARP_REPEAT) { + talloc_free(arp); + return; + } + + event_add_timed(arp->ctdb->ev, arp->ctdb->takeover.last_ctx, + timeval_current_ofs(CTDB_ARP_INTERVAL, 0), + ctdb_control_send_arp, arp); +} + +struct takeover_callback_state { + struct ctdb_req_control *c; + struct sockaddr_in *sin; +}; + +/* + called when takeip event finishes + */ +static void takeover_ip_callback(struct ctdb_context *ctdb, int status, + void *private_data) +{ + struct takeover_callback_state *state = + talloc_get_type(private_data, struct takeover_callback_state); + struct ctdb_takeover_arp *arp; + char *ip = inet_ntoa(state->sin->sin_addr); + struct ctdb_tcp_list *tcp; + + ctdb_start_monitoring(ctdb); + + if (status != 0) { + DEBUG(0,(__location__ " Failed to takeover IP %s on interface %s\n", + ip, ctdb->takeover.interface)); + ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL); + talloc_free(state); + return; + } + + if (!ctdb->takeover.last_ctx) { + ctdb->takeover.last_ctx = talloc_new(ctdb); + if (!ctdb->takeover.last_ctx) goto failed; + } + + arp = talloc_zero(ctdb->takeover.last_ctx, struct ctdb_takeover_arp); + if (!arp) goto failed; + + arp->ctdb = ctdb; + arp->sin = *state->sin; + + /* add all of the known tcp connections for this IP to the + list of tcp connections to send tickle acks for */ + for (tcp=ctdb->tcp_list;tcp;tcp=tcp->next) { + if (state->sin->sin_addr.s_addr == tcp->daddr.sin_addr.s_addr) { + struct ctdb_tcp_list *t2 = talloc(arp, struct ctdb_tcp_list); + if (t2 == NULL) goto failed; + *t2 = *tcp; + DLIST_ADD(arp->tcp_list, t2); + } + } + + event_add_timed(arp->ctdb->ev, arp->ctdb->takeover.last_ctx, + timeval_zero(), ctdb_control_send_arp, arp); + + /* the control succeeded */ + ctdb_request_control_reply(ctdb, state->c, NULL, 0, NULL); + talloc_free(state); + return; + +failed: + ctdb_request_control_reply(ctdb, state->c, NULL, -1, NULL); + talloc_free(state); + return; +} + +/* + take over an ip address + */ +int32_t ctdb_control_takeover_ip(struct ctdb_context *ctdb, + struct ctdb_req_control *c, + TDB_DATA indata, + bool *async_reply) +{ + int ret; + struct takeover_callback_state *state; + struct ctdb_public_ip *pip = (struct ctdb_public_ip *)indata.dptr; + char *ip = inet_ntoa(pip->sin.sin_addr); + + + /* update out node table */ + ctdb->nodes[pip->vnn]->takeover_vnn = pip->takeover_vnn; + + /* if our kernel already has this IP, do nothing */ + if (ctdb_sys_have_ip(ip)) { + return 0; + } + + state = talloc(ctdb, struct takeover_callback_state); + CTDB_NO_MEMORY(ctdb, state); + + state->c = talloc_steal(ctdb, c); + state->sin = talloc(ctdb, struct sockaddr_in); + CTDB_NO_MEMORY(ctdb, state->sin); + *state->sin = pip->sin; + + DEBUG(0,("Takover of IP %s/%u on interface %s\n", + ip, ctdb->nodes[ctdb->vnn]->public_netmask_bits, + ctdb->takeover.interface)); + + ctdb_stop_monitoring(ctdb); + + ret = ctdb_event_script_callback(ctdb, + timeval_current_ofs(ctdb->tunable.script_timeout, 0), + state, takeover_ip_callback, state, + "takeip %s %s %u", + ctdb->takeover.interface, + ip, + ctdb->nodes[ctdb->vnn]->public_netmask_bits); + if (ret != 0) { + DEBUG(0,(__location__ " Failed to takeover IP %s on interface %s\n", + ip, ctdb->takeover.interface)); + talloc_free(state); + return -1; + } + + /* tell ctdb_control.c that we will be replying asynchronously */ + *async_reply = true; + + return 0; +} + +/* + kill any clients that are registered with a IP that is being released + */ +static void release_kill_clients(struct ctdb_context *ctdb, struct in_addr in) +{ + struct ctdb_client_ip *ip; + + for (ip=ctdb->client_ip_list; ip; ip=ip->next) { + if (ip->ip.sin_addr.s_addr == in.s_addr) { + struct ctdb_client *client = ctdb_reqid_find(ctdb, + ip->client_id, + struct ctdb_client); + if (client->pid != 0) { + DEBUG(0,(__location__ " Killing client pid %u for IP %s on client_id %u\n", + (unsigned)client->pid, inet_ntoa(in), + ip->client_id)); + kill(client->pid, SIGKILL); + } + } + } +} + +/* + called when releaseip event finishes + */ +static void release_ip_callback(struct ctdb_context *ctdb, int status, + void *private_data) +{ + struct takeover_callback_state *state = + talloc_get_type(private_data, struct takeover_callback_state); + char *ip = inet_ntoa(state->sin->sin_addr); + TDB_DATA data; + struct ctdb_tcp_list *tcp; + + ctdb_start_monitoring(ctdb); + + /* send a message to all clients of this node telling them + that the cluster has been reconfigured and they should + release any sockets on this IP */ + data.dptr = (uint8_t *)ip; + data.dsize = strlen(ip)+1; + + ctdb_daemon_send_message(ctdb, ctdb->vnn, CTDB_SRVID_RELEASE_IP, data); + + /* kill clients that have registered with this IP */ + release_kill_clients(ctdb, state->sin->sin_addr); + + + /* tell other nodes about any tcp connections we were holding with this IP */ + for (tcp=ctdb->tcp_list;tcp;tcp=tcp->next) { + if (tcp->vnn == ctdb->vnn && + state->sin->sin_addr.s_addr == tcp->daddr.sin_addr.s_addr) { + struct ctdb_control_tcp_vnn t; + + t.vnn = ctdb->vnn; + t.src = tcp->saddr; + t.dest = tcp->daddr; + + data.dptr = (uint8_t *)&t; + data.dsize = sizeof(t); + + ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, + CTDB_CONTROL_TCP_ADD, + 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL); + } + } + + /* the control succeeded */ + ctdb_request_control_reply(ctdb, state->c, NULL, 0, NULL); + talloc_free(state); +} + + +/* + release an ip address + */ +int32_t ctdb_control_release_ip(struct ctdb_context *ctdb, + struct ctdb_req_control *c, + TDB_DATA indata, + bool *async_reply) +{ + int ret; + struct takeover_callback_state *state; + struct ctdb_public_ip *pip = (struct ctdb_public_ip *)indata.dptr; + char *ip = inet_ntoa(pip->sin.sin_addr); + + /* update out node table */ + ctdb->nodes[pip->vnn]->takeover_vnn = pip->takeover_vnn; + + if (!ctdb_sys_have_ip(ip)) { + return 0; + } + + DEBUG(0,("Release of IP %s/%u on interface %s\n", + ip, ctdb->nodes[ctdb->vnn]->public_netmask_bits, + ctdb->takeover.interface)); + + /* stop any previous arps */ + talloc_free(ctdb->takeover.last_ctx); + ctdb->takeover.last_ctx = NULL; + + state = talloc(ctdb, struct takeover_callback_state); + CTDB_NO_MEMORY(ctdb, state); + + state->c = talloc_steal(state, c); + state->sin = talloc(state, struct sockaddr_in); + CTDB_NO_MEMORY(ctdb, state->sin); + *state->sin = pip->sin; + + ctdb_stop_monitoring(ctdb); + + ret = ctdb_event_script_callback(ctdb, + timeval_current_ofs(ctdb->tunable.script_timeout, 0), + state, release_ip_callback, state, + "releaseip %s %s %u", + ctdb->takeover.interface, + ip, + ctdb->nodes[ctdb->vnn]->public_netmask_bits); + if (ret != 0) { + DEBUG(0,(__location__ " Failed to release IP %s on interface %s\n", + ip, ctdb->takeover.interface)); + talloc_free(state); + return -1; + } + + /* tell the control that we will be reply asynchronously */ + *async_reply = true; + + return 0; +} + + +/* + setup the event script +*/ +int ctdb_set_event_script(struct ctdb_context *ctdb, const char *script) +{ + ctdb->takeover.event_script = talloc_strdup(ctdb, script); + CTDB_NO_MEMORY(ctdb, ctdb->takeover.event_script); + return 0; +} + +/* + setup the public address list from a file +*/ +int ctdb_set_public_addresses(struct ctdb_context *ctdb, const char *alist) +{ + char **lines; + int nlines; + int i; + + lines = file_lines_load(alist, &nlines, ctdb); + if (lines == NULL) { + ctdb_set_error(ctdb, "Failed to load public address list '%s'\n", alist); + return -1; + } + while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) { + nlines--; + } + + if (nlines != ctdb->num_nodes) { + DEBUG(0,("Number of lines in %s does not match number of nodes!\n", alist)); + talloc_free(lines); + return -1; + } + + for (i=0;i<nlines;i++) { + char *p; + struct in_addr in; + + ctdb->nodes[i]->public_address = talloc_strdup(ctdb->nodes[i], lines[i]); + CTDB_NO_MEMORY(ctdb, ctdb->nodes[i]->public_address); + ctdb->nodes[i]->takeover_vnn = -1; + + /* see if they supplied a netmask length */ + p = strchr(ctdb->nodes[i]->public_address, '/'); + if (!p) { + DEBUG(0,("You must supply a netmask for public address %s\n", + ctdb->nodes[i]->public_address)); + return -1; + } + *p = 0; + ctdb->nodes[i]->public_netmask_bits = atoi(p+1); + + if (ctdb->nodes[i]->public_netmask_bits > 32) { + DEBUG(0, ("Illegal netmask for IP %s\n", ctdb->nodes[i]->public_address)); + return -1; + } + + if (inet_aton(ctdb->nodes[i]->public_address, &in) == 0) { + DEBUG(0,("Badly formed IP '%s' in public address list\n", ctdb->nodes[i]->public_address)); + return -1; + } + } + + talloc_free(lines); + return 0; +} + +/* + see if two IPs are on the same subnet + */ +static bool ctdb_same_subnet(const char *ip1, const char *ip2, uint8_t netmask_bits) +{ + struct in_addr in1, in2; + uint32_t mask; + + inet_aton(ip1, &in1); + inet_aton(ip2, &in2); + + mask = ~((1LL<<(32-netmask_bits))-1); + + if ((ntohl(in1.s_addr) & mask) != (ntohl(in2.s_addr) & mask)) { + return false; + } + + return true; +} + + +/* + try to find an available node to take a given nodes IP that meets the + criterion given by the flags + */ +static void ctdb_takeover_find_node(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, + int start_node, uint32_t mask_flags) +{ + int j; + for (j=(start_node+1)%nodemap->num; + j != start_node; + j=(j+1)%nodemap->num) { + if (!(nodemap->nodes[j].flags & mask_flags) && + ctdb_same_subnet(ctdb->nodes[j]->public_address, + ctdb->nodes[start_node]->public_address, + ctdb->nodes[j]->public_netmask_bits)) { + ctdb->nodes[start_node]->takeover_vnn = nodemap->nodes[j].vnn; + break; + } + } +} + + +/* + make any IP alias changes for public addresses that are necessary + */ +int ctdb_takeover_run(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap) +{ + int i, j; + int ret; + struct ctdb_public_ip ip; + + ZERO_STRUCT(ip); + + /* Work out which node will look after each public IP. + * takeover_node cycles over the nodes and is incremented each time a + * node has been assigned to take over for another node. + * This spreads the failed nodes out across the remaining + * nodes more evenly + */ + for (i=0;i<nodemap->num;i++) { + if (!(nodemap->nodes[i].flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED))) { + ctdb->nodes[i]->takeover_vnn = nodemap->nodes[i].vnn; + } else { + ctdb->nodes[i]->takeover_vnn = (uint32_t)-1; + + ctdb_takeover_find_node(ctdb, nodemap, i, NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED); + + /* if no enabled node can take it, then we + might as well use any active node. It + probably means that some subsystem (such as + NFS) is sick on all nodes. Best we can do + is to keep the other services up. */ + if (ctdb->nodes[i]->takeover_vnn == (uint32_t)-1) { + ctdb_takeover_find_node(ctdb, nodemap, i, NODE_FLAGS_INACTIVE); + } + + if (ctdb->nodes[i]->takeover_vnn == (uint32_t)-1) { + DEBUG(0,(__location__ " No node available on same network to take %s\n", + ctdb->nodes[i]->public_address)); + } + } + } + + /* at this point ctdb->nodes[i]->takeover_vnn is the vnn which will own each IP */ + + /* now tell all nodes to delete any alias that they should not + have. This will be a NOOP on nodes that don't currently + hold the given alias */ + for (i=0;i<nodemap->num;i++) { + /* don't talk to unconnected nodes, but do talk to banned nodes */ + if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) { + continue; + } + + /* tell this node to delete all of the aliases that it should not have */ + for (j=0;j<nodemap->num;j++) { + if (ctdb->nodes[j]->takeover_vnn != nodemap->nodes[i].vnn) { + ip.vnn = j; + ip.takeover_vnn = ctdb->nodes[j]->takeover_vnn; + ip.sin.sin_family = AF_INET; + inet_aton(ctdb->nodes[j]->public_address, &ip.sin.sin_addr); + + ret = ctdb_ctrl_release_ip(ctdb, TAKEOVER_TIMEOUT(), + nodemap->nodes[i].vnn, + &ip); + if (ret != 0) { + DEBUG(0,("Failed to tell vnn %u to release IP %s\n", + nodemap->nodes[i].vnn, + ctdb->nodes[j]->public_address)); + return -1; + } + } + } + } + + /* tell all nodes to get their own IPs */ + for (i=0;i<nodemap->num;i++) { + if (ctdb->nodes[i]->takeover_vnn == -1) { + /* this IP won't be taken over */ + continue; + } + ip.vnn = i; + ip.takeover_vnn = ctdb->nodes[i]->takeover_vnn; + ip.sin.sin_family = AF_INET; + inet_aton(ctdb->nodes[i]->public_address, &ip.sin.sin_addr); + + ret = ctdb_ctrl_takeover_ip(ctdb, TAKEOVER_TIMEOUT(), + ctdb->nodes[i]->takeover_vnn, + &ip); + if (ret != 0) { + DEBUG(0,("Failed asking vnn %u to take over IP %s\n", + ctdb->nodes[i]->takeover_vnn, + ctdb->nodes[i]->public_address)); + return -1; + } + } + + return 0; +} + + +/* + destroy a ctdb_client_ip structure + */ +static int ctdb_client_ip_destructor(struct ctdb_client_ip *ip) +{ + DLIST_REMOVE(ip->ctdb->client_ip_list, ip); + return 0; +} + +/* + called by a client to inform us of a TCP connection that it is managing + that should tickled with an ACK when IP takeover is done + */ +int32_t ctdb_control_tcp_client(struct ctdb_context *ctdb, uint32_t client_id, uint32_t vnn, + TDB_DATA indata) +{ + struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); + struct ctdb_control_tcp *p = (struct ctdb_control_tcp *)indata.dptr; + struct ctdb_tcp_list *tcp; + struct ctdb_control_tcp_vnn t; + int ret; + TDB_DATA data; + struct ctdb_client_ip *ip; + + ip = talloc(client, struct ctdb_client_ip); + CTDB_NO_MEMORY(ctdb, ip); + + ip->ctdb = ctdb; + ip->ip = p->dest; + ip->client_id = client_id; + talloc_set_destructor(ip, ctdb_client_ip_destructor); + DLIST_ADD(ctdb->client_ip_list, ip); + + tcp = talloc(client, struct ctdb_tcp_list); + CTDB_NO_MEMORY(ctdb, tcp); + + tcp->vnn = vnn; + tcp->saddr = p->src; + tcp->daddr = p->dest; + + DLIST_ADD(client->tcp_list, tcp); + + t.vnn = vnn; + t.src = p->src; + t.dest = p->dest; + + data.dptr = (uint8_t *)&t; + data.dsize = sizeof(t); + + /* tell all nodes about this tcp connection */ + ret = ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, + CTDB_CONTROL_TCP_ADD, + 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL); + if (ret != 0) { + DEBUG(0,(__location__ " Failed to send CTDB_CONTROL_TCP_ADD\n")); + return -1; + } + + return 0; +} + +/* + see if two sockaddr_in are the same + */ +static bool same_sockaddr_in(struct sockaddr_in *in1, struct sockaddr_in *in2) +{ + return in1->sin_family == in2->sin_family && + in1->sin_port == in2->sin_port && + in1->sin_addr.s_addr == in2->sin_addr.s_addr; +} + +/* + find a tcp address on a list + */ +static struct ctdb_tcp_list *ctdb_tcp_find(struct ctdb_tcp_list *list, + struct ctdb_tcp_list *tcp) +{ + while (list) { + if (same_sockaddr_in(&list->saddr, &tcp->saddr) && + same_sockaddr_in(&list->daddr, &tcp->daddr)) { + return list; + } + list = list->next; + } + return NULL; +} + +/* + called by a daemon to inform us of a TCP connection that one of its + clients managing that should tickled with an ACK when IP takeover is + done + */ +int32_t ctdb_control_tcp_add(struct ctdb_context *ctdb, TDB_DATA indata) +{ + struct ctdb_control_tcp_vnn *p = (struct ctdb_control_tcp_vnn *)indata.dptr; + struct ctdb_tcp_list *tcp; + + tcp = talloc(ctdb, struct ctdb_tcp_list); + CTDB_NO_MEMORY(ctdb, tcp); + + tcp->vnn = p->vnn; + tcp->saddr = p->src; + tcp->daddr = p->dest; + + if (NULL == ctdb_tcp_find(ctdb->tcp_list, tcp)) { + DLIST_ADD(ctdb->tcp_list, tcp); + DEBUG(2,("Added tickle info for %s:%u from vnn %u\n", + inet_ntoa(tcp->daddr.sin_addr), ntohs(tcp->daddr.sin_port), + tcp->vnn)); + } else { + DEBUG(4,("Already had tickle info for %s:%u from vnn %u\n", + inet_ntoa(tcp->daddr.sin_addr), ntohs(tcp->daddr.sin_port), + tcp->vnn)); + } + + return 0; +} + +/* + called by a daemon to inform us of a TCP connection that one of its + clients managing that should tickled with an ACK when IP takeover is + done + */ +int32_t ctdb_control_tcp_remove(struct ctdb_context *ctdb, TDB_DATA indata) +{ + struct ctdb_control_tcp_vnn *p = (struct ctdb_control_tcp_vnn *)indata.dptr; + struct ctdb_tcp_list t, *tcp; + + t.vnn = p->vnn; + t.saddr = p->src; + t.daddr = p->dest; + + tcp = ctdb_tcp_find(ctdb->tcp_list, &t); + if (tcp) { + DEBUG(2,("Removed tickle info for %s:%u from vnn %u\n", + inet_ntoa(tcp->daddr.sin_addr), ntohs(tcp->daddr.sin_port), + tcp->vnn)); + DLIST_REMOVE(ctdb->tcp_list, tcp); + talloc_free(tcp); + } + + return 0; +} + + +/* + called when a daemon restarts - wipes all tcp entries from that vnn + */ +int32_t ctdb_control_startup(struct ctdb_context *ctdb, uint32_t vnn) +{ + struct ctdb_tcp_list *tcp, *next; + for (tcp=ctdb->tcp_list;tcp;tcp=next) { + next = tcp->next; + if (tcp->vnn == vnn) { + DLIST_REMOVE(ctdb->tcp_list, tcp); + talloc_free(tcp); + } + + /* and tell the new guy about any that he should have + from us */ + if (tcp->vnn == ctdb->vnn) { + struct ctdb_control_tcp_vnn t; + TDB_DATA data; + + t.vnn = tcp->vnn; + t.src = tcp->saddr; + t.dest = tcp->daddr; + + data.dptr = (uint8_t *)&t; + data.dsize = sizeof(t); + + ctdb_daemon_send_control(ctdb, vnn, 0, + CTDB_CONTROL_TCP_ADD, + 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL); + } + } + return 0; +} + + +/* + called when a client structure goes away - hook to remove + elements from the tcp_list in all daemons + */ +void ctdb_takeover_client_destructor_hook(struct ctdb_client *client) +{ + while (client->tcp_list) { + TDB_DATA data; + struct ctdb_control_tcp_vnn p; + struct ctdb_tcp_list *tcp = client->tcp_list; + DLIST_REMOVE(client->tcp_list, tcp); + p.vnn = tcp->vnn; + p.src = tcp->saddr; + p.dest = tcp->daddr; + data.dptr = (uint8_t *)&p; + data.dsize = sizeof(p); + ctdb_daemon_send_control(client->ctdb, CTDB_BROADCAST_CONNECTED, 0, + CTDB_CONTROL_TCP_REMOVE, + 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL); + talloc_free(tcp); + } +} + + +/* + release all IPs on shutdown + */ +void ctdb_release_all_ips(struct ctdb_context *ctdb) +{ + int i; + + if (!ctdb->takeover.enabled) { + return; + } + + for (i=0;i<ctdb->num_nodes;i++) { + struct ctdb_node *node = ctdb->nodes[i]; + if (ctdb_sys_have_ip(node->public_address)) { + struct in_addr in; + ctdb_event_script(ctdb, "releaseip %s %s %u", + ctdb->takeover.interface, + node->public_address, + node->public_netmask_bits); + if (inet_aton(node->public_address, &in) != 0) { + release_kill_clients(ctdb, in); + } + } + } +} + + +/* + get list of public IPs + */ +int32_t ctdb_control_get_public_ips(struct ctdb_context *ctdb, struct ctdb_req_control *c, TDB_DATA *outdata) +{ + int i, len; + struct ctdb_all_public_ips *ips; + + len = offsetof(struct ctdb_all_public_ips, ips) + ctdb->num_nodes*sizeof(struct ctdb_public_ip); + + ips = talloc_zero_size(outdata, len); + CTDB_NO_MEMORY(ctdb, ips); + + outdata->dsize = len; + outdata->dptr = (uint8_t *)ips; + + ips->num = ctdb->num_nodes; + for(i=0;i<ctdb->num_nodes;i++){ + ips->ips[i].vnn = i; + ips->ips[i].takeover_vnn = ctdb->nodes[i]->takeover_vnn; + ips->ips[i].sin.sin_family = AF_INET; + if (ctdb->nodes[i]->public_address) { + inet_aton(ctdb->nodes[i]->public_address, &ips->ips[i].sin.sin_addr); + } + } + + return 0; +} diff --git a/source4/cluster/ctdb/server/ctdb_traverse.c b/source4/cluster/ctdb/server/ctdb_traverse.c new file mode 100644 index 0000000000..d44c27401a --- /dev/null +++ b/source4/cluster/ctdb/server/ctdb_traverse.c @@ -0,0 +1,463 @@ +/* + efficient async ctdb traverse + + Copyright (C) Andrew Tridgell 2007 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "system/filesys.h" +#include "system/wait.h" +#include "db_wrap.h" +#include "lib/tdb/include/tdb.h" +#include "../include/ctdb_private.h" + +typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data); + +/* + handle returned to caller - freeing this handler will kill the child and + terminate the traverse + */ +struct ctdb_traverse_local_handle { + struct ctdb_db_context *ctdb_db; + int fd[2]; + pid_t child; + void *private_data; + ctdb_traverse_fn_t callback; + struct timeval start_time; + struct ctdb_queue *queue; +}; + +/* + called when data is available from the child + */ +static void ctdb_traverse_local_handler(uint8_t *rawdata, size_t length, void *private_data) +{ + struct ctdb_traverse_local_handle *h = talloc_get_type(private_data, + struct ctdb_traverse_local_handle); + TDB_DATA key, data; + ctdb_traverse_fn_t callback = h->callback; + void *p = h->private_data; + struct ctdb_rec_data *tdata = (struct ctdb_rec_data *)rawdata; + + if (rawdata == NULL || length < 4 || length != tdata->length) { + /* end of traverse */ + talloc_free(h); + callback(p, tdb_null, tdb_null); + return; + } + + key.dsize = tdata->keylen; + key.dptr = &tdata->data[0]; + data.dsize = tdata->datalen; + data.dptr = &tdata->data[tdata->keylen]; + + callback(p, key, data); +} + +/* + destroy a in-flight traverse operation + */ +static int traverse_local_destructor(struct ctdb_traverse_local_handle *h) +{ + kill(h->child, SIGKILL); + waitpid(h->child, NULL, 0); + return 0; +} + +/* + callback from tdb_traverse_read() + */ +static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p) +{ + struct ctdb_traverse_local_handle *h = talloc_get_type(p, + struct ctdb_traverse_local_handle); + struct ctdb_rec_data *d; + struct ctdb_ltdb_header *hdr; + + /* filter out non-authoritative and zero-length records */ + hdr = (struct ctdb_ltdb_header *)data.dptr; + if (data.dsize <= sizeof(struct ctdb_ltdb_header) || + hdr->dmaster != h->ctdb_db->ctdb->vnn) { + return 0; + } + + d = ctdb_marshall_record(h, 0, key, data); + if (d == NULL) { + /* error handling is tricky in this child code .... */ + return -1; + } + + if (write(h->fd[1], (uint8_t *)d, d->length) != d->length) { + return -1; + } + return 0; +} + + +/* + setup a non-blocking traverse of a local ltdb. The callback function + will be called on every record in the local ltdb. To stop the + travserse, talloc_free() the travserse_handle. + + The traverse is finished when the callback is called with tdb_null for key and data + */ +static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db, + ctdb_traverse_fn_t callback, + void *private_data) +{ + struct ctdb_traverse_local_handle *h; + int ret; + + h = talloc_zero(ctdb_db, struct ctdb_traverse_local_handle); + if (h == NULL) { + return NULL; + } + + ret = pipe(h->fd); + + if (ret != 0) { + talloc_free(h); + return NULL; + } + + h->child = fork(); + + if (h->child == (pid_t)-1) { + close(h->fd[0]); + close(h->fd[1]); + talloc_free(h); + return NULL; + } + + h->callback = callback; + h->private_data = private_data; + h->ctdb_db = ctdb_db; + + if (h->child == 0) { + /* start the traverse in the child */ + close(h->fd[0]); + tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h); + _exit(0); + } + + close(h->fd[1]); + talloc_set_destructor(h, traverse_local_destructor); + + /* + setup a packet queue between the child and the parent. This + copes with all the async and packet boundary issues + */ + h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h); + if (h->queue == NULL) { + talloc_free(h); + return NULL; + } + + h->start_time = timeval_current(); + + return h; +} + + +struct ctdb_traverse_all_handle { + struct ctdb_context *ctdb; + uint32_t reqid; + ctdb_traverse_fn_t callback; + void *private_data; + uint32_t null_count; +}; + +/* + destroy a traverse_all op + */ +static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state) +{ + ctdb_reqid_remove(state->ctdb, state->reqid); + return 0; +} + +struct ctdb_traverse_all { + uint32_t db_id; + uint32_t reqid; + uint32_t vnn; +}; + +/* called when a traverse times out */ +static void ctdb_traverse_all_timeout(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle); + + state->ctdb->statistics.timeouts.traverse++; + + state->callback(state->private_data, tdb_null, tdb_null); + talloc_free(state); +} + +/* + setup a cluster-wide non-blocking traverse of a ctdb. The + callback function will be called on every record in the local + ltdb. To stop the travserse, talloc_free() the traverse_handle. + + The traverse is finished when the callback is called with tdb_null + for key and data + */ +static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db, + ctdb_traverse_fn_t callback, + void *private_data) +{ + struct ctdb_traverse_all_handle *state; + struct ctdb_context *ctdb = ctdb_db->ctdb; + int ret; + TDB_DATA data; + struct ctdb_traverse_all r; + + state = talloc(ctdb_db, struct ctdb_traverse_all_handle); + if (state == NULL) { + return NULL; + } + + state->ctdb = ctdb; + state->reqid = ctdb_reqid_new(ctdb_db->ctdb, state); + state->callback = callback; + state->private_data = private_data; + state->null_count = 0; + + talloc_set_destructor(state, ctdb_traverse_all_destructor); + + r.db_id = ctdb_db->db_id; + r.reqid = state->reqid; + r.vnn = ctdb->vnn; + + data.dptr = (uint8_t *)&r; + data.dsize = sizeof(r); + + /* tell all the nodes in the cluster to start sending records to this node */ + ret = ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0, + CTDB_CONTROL_TRAVERSE_ALL, + 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL); + if (ret != 0) { + talloc_free(state); + return NULL; + } + + /* timeout the traverse */ + event_add_timed(ctdb->ev, state, + timeval_current_ofs(ctdb->tunable.traverse_timeout, 0), + ctdb_traverse_all_timeout, state); + + return state; +} + +struct traverse_all_state { + struct ctdb_context *ctdb; + struct ctdb_traverse_local_handle *h; + uint32_t reqid; + uint32_t srcnode; +}; + +/* + called for each record during a traverse all + */ +static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data) +{ + struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state); + int ret; + struct ctdb_rec_data *d; + TDB_DATA cdata; + + d = ctdb_marshall_record(state, state->reqid, key, data); + if (d == NULL) { + /* darn .... */ + DEBUG(0,("Out of memory in traverse_all_callback\n")); + return; + } + + cdata.dptr = (uint8_t *)d; + cdata.dsize = d->length; + + ret = ctdb_daemon_send_control(state->ctdb, state->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA, + 0, CTDB_CTRL_FLAG_NOREPLY, cdata, NULL, NULL); + if (ret != 0) { + DEBUG(0,("Failed to send traverse data\n")); + } + + if (key.dsize == 0 && data.dsize == 0) { + /* we're done */ + talloc_free(state); + } +} + +/* + called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then + setup a traverse of our local ltdb, sending the records as + CTDB_CONTROL_TRAVERSE_DATA records back to the originator + */ +int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata) +{ + struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr; + struct traverse_all_state *state; + struct ctdb_db_context *ctdb_db; + + if (data.dsize != sizeof(struct ctdb_traverse_all)) { + DEBUG(0,("Invalid size in ctdb_control_traverse_all\n")); + return -1; + } + + ctdb_db = find_ctdb_db(ctdb, c->db_id); + if (ctdb_db == NULL) { + return -1; + } + + state = talloc(ctdb_db, struct traverse_all_state); + if (state == NULL) { + return -1; + } + + state->reqid = c->reqid; + state->srcnode = c->vnn; + state->ctdb = ctdb; + + state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state); + if (state->h == NULL) { + talloc_free(state); + return -1; + } + + return 0; +} + + +/* + called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then + call the traverse_all callback with the record + */ +int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata) +{ + struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr; + struct ctdb_traverse_all_handle *state; + TDB_DATA key; + ctdb_traverse_fn_t callback; + void *private_data; + + if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) { + DEBUG(0,("Bad record size in ctdb_control_traverse_data\n")); + return -1; + } + + state = ctdb_reqid_find(ctdb, d->reqid, struct ctdb_traverse_all_handle); + if (state == NULL || d->reqid != state->reqid) { + /* traverse might have been terminated already */ + return -1; + } + + key.dsize = d->keylen; + key.dptr = &d->data[0]; + data.dsize = d->datalen; + data.dptr = &d->data[d->keylen]; + + if (key.dsize == 0 && data.dsize == 0) { + state->null_count++; + if (state->null_count != ctdb_get_num_active_nodes(ctdb)) { + return 0; + } + } + + callback = state->callback; + private_data = state->private_data; + + callback(private_data, key, data); + if (key.dsize == 0 && data.dsize == 0) { + /* we've received all of the null replies, so all + nodes are finished */ + talloc_free(state); + } + return 0; +} + +struct traverse_start_state { + struct ctdb_context *ctdb; + struct ctdb_traverse_all_handle *h; + uint32_t srcnode; + uint32_t reqid; + uint64_t srvid; +}; + +/* + callback which sends records as messages to the client + */ +static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data) +{ + struct traverse_start_state *state; + struct ctdb_rec_data *d; + TDB_DATA cdata; + + state = talloc_get_type(p, struct traverse_start_state); + + d = ctdb_marshall_record(state, state->reqid, key, data); + if (d == NULL) { + return; + } + + cdata.dptr = (uint8_t *)d; + cdata.dsize = d->length; + + ctdb_dispatch_message(state->ctdb, state->srvid, cdata); + if (key.dsize == 0 && data.dsize == 0) { + /* end of traverse */ + talloc_free(state); + } +} + +/* + start a traverse_all - called as a control from a client + */ +int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data, + TDB_DATA *outdata, uint32_t srcnode) +{ + struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr; + struct traverse_start_state *state; + struct ctdb_db_context *ctdb_db; + + if (data.dsize != sizeof(*d)) { + DEBUG(0,("Bad record size in ctdb_control_traverse_start\n")); + return -1; + } + + ctdb_db = find_ctdb_db(ctdb, d->db_id); + if (ctdb_db == NULL) { + return -1; + } + + state = talloc(ctdb_db, struct traverse_start_state); + if (state == NULL) { + return -1; + } + + state->srcnode = srcnode; + state->reqid = d->reqid; + state->srvid = d->srvid; + state->ctdb = ctdb; + + state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state); + if (state->h == NULL) { + talloc_free(state); + return -1; + } + + return 0; +} diff --git a/source4/cluster/ctdb/server/ctdb_tunables.c b/source4/cluster/ctdb/server/ctdb_tunables.c new file mode 100644 index 0000000000..491c965656 --- /dev/null +++ b/source4/cluster/ctdb/server/ctdb_tunables.c @@ -0,0 +1,163 @@ +/* + ctdb tunables code + + Copyright (C) Andrew Tridgell 2007 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ +#include "includes.h" +#include "../include/ctdb_private.h" + +static const struct { + const char *name; + uint32_t default_v; + size_t offset; +} tunable_map[] = { + { "MaxRedirectCount", 3, offsetof(struct ctdb_tunable, max_redirect_count) }, + { "SeqnumFrequency", 1, offsetof(struct ctdb_tunable, seqnum_frequency) }, + { "ControlTimeout", 60, offsetof(struct ctdb_tunable, control_timeout) }, + { "TraverseTimeout", 20, offsetof(struct ctdb_tunable, traverse_timeout) }, + { "KeepaliveInterval", 2, offsetof(struct ctdb_tunable, keepalive_interval) }, + { "KeepaliveLimit", 5, offsetof(struct ctdb_tunable, keepalive_limit) }, + { "MaxLACount", 7, offsetof(struct ctdb_tunable, max_lacount) }, + { "RecoverTimeout", 5, offsetof(struct ctdb_tunable, recover_timeout) }, + { "RecoverInterval", 1, offsetof(struct ctdb_tunable, recover_interval) }, + { "ElectionTimeout", 3, offsetof(struct ctdb_tunable, election_timeout) }, + { "TakeoverTimeout", 5, offsetof(struct ctdb_tunable, takeover_timeout) }, + { "MonitorInterval", 15, offsetof(struct ctdb_tunable, monitor_interval) }, + { "EventScriptTimeout", 20, offsetof(struct ctdb_tunable, script_timeout) }, + { "RecoveryGracePeriod", 60, offsetof(struct ctdb_tunable, recovery_grace_period) }, + { "RecoveryBanPeriod", 300, offsetof(struct ctdb_tunable, recovery_ban_period) }, + { "DatabaseHashSize", 10000, offsetof(struct ctdb_tunable, database_hash_size) }, + { "RerecoveryTimeout", 10, offsetof(struct ctdb_tunable, rerecovery_timeout) }, +}; + +/* + set all tunables to defaults + */ +void ctdb_tunables_set_defaults(struct ctdb_context *ctdb) +{ + int i; + for (i=0;i<ARRAY_SIZE(tunable_map);i++) { + *(uint32_t *)(tunable_map[i].offset + (uint8_t*)&ctdb->tunable) = tunable_map[i].default_v; + } +} + + +/* + get a tunable + */ +int32_t ctdb_control_get_tunable(struct ctdb_context *ctdb, TDB_DATA indata, + TDB_DATA *outdata) +{ + struct ctdb_control_get_tunable *t = + (struct ctdb_control_get_tunable *)indata.dptr; + char *name; + uint32_t val; + int i; + + if (indata.dsize < sizeof(*t) || + t->length > indata.dsize - offsetof(struct ctdb_control_get_tunable, name)) { + DEBUG(0,("Bad indata in ctdb_control_get_tunable\n")); + return -1; + } + + name = talloc_strndup(ctdb, (char*)t->name, t->length); + CTDB_NO_MEMORY(ctdb, name); + + for (i=0;i<ARRAY_SIZE(tunable_map);i++) { + if (strcasecmp(name, tunable_map[i].name) == 0) break; + } + talloc_free(name); + + if (i == ARRAY_SIZE(tunable_map)) { + return -1; + } + + val = *(uint32_t *)(tunable_map[i].offset + (uint8_t*)&ctdb->tunable); + + outdata->dptr = (uint8_t *)talloc(outdata, uint32_t); + CTDB_NO_MEMORY(ctdb, outdata->dptr); + + *(uint32_t *)outdata->dptr = val; + outdata->dsize = sizeof(uint32_t); + + return 0; +} + + +/* + set a tunable + */ +int32_t ctdb_control_set_tunable(struct ctdb_context *ctdb, TDB_DATA indata) +{ + struct ctdb_control_set_tunable *t = + (struct ctdb_control_set_tunable *)indata.dptr; + char *name; + int i; + + if (indata.dsize < sizeof(*t) || + t->length > indata.dsize - offsetof(struct ctdb_control_set_tunable, name)) { + DEBUG(0,("Bad indata in ctdb_control_set_tunable\n")); + return -1; + } + + name = talloc_strndup(ctdb, (char *)t->name, t->length); + CTDB_NO_MEMORY(ctdb, name); + + for (i=0;i<ARRAY_SIZE(tunable_map);i++) { + if (strcasecmp(name, tunable_map[i].name) == 0) break; + } + + talloc_free(name); + + if (i == ARRAY_SIZE(tunable_map)) { + return -1; + } + + *(uint32_t *)(tunable_map[i].offset + (uint8_t*)&ctdb->tunable) = t->value; + + return 0; +} + +/* + list tunables + */ +int32_t ctdb_control_list_tunables(struct ctdb_context *ctdb, TDB_DATA *outdata) +{ + char *list = NULL; + int i; + struct ctdb_control_list_tunable *t; + + list = talloc_strdup(outdata, tunable_map[0].name); + CTDB_NO_MEMORY(ctdb, list); + + for (i=1;i<ARRAY_SIZE(tunable_map);i++) { + list = talloc_asprintf_append(list, ":%s", tunable_map[i].name); + CTDB_NO_MEMORY(ctdb, list); + } + + outdata->dsize = offsetof(struct ctdb_control_list_tunable, data) + + strlen(list) + 1; + outdata->dptr = talloc_size(outdata, outdata->dsize); + CTDB_NO_MEMORY(ctdb, outdata->dptr); + + t = (struct ctdb_control_list_tunable *)outdata->dptr; + t->length = strlen(list)+1; + + memcpy(t->data, list, t->length); + talloc_free(list); + + return 0; +} diff --git a/source4/cluster/ctdb/server/ctdbd.c b/source4/cluster/ctdb/server/ctdbd.c new file mode 100644 index 0000000000..2c4a23b673 --- /dev/null +++ b/source4/cluster/ctdb/server/ctdbd.c @@ -0,0 +1,229 @@ +/* + standalone ctdb daemon + + Copyright (C) Andrew Tridgell 2006 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "system/filesys.h" +#include "popt.h" +#include "system/wait.h" +#include "cmdline.h" +#include "../include/ctdb_private.h" + +static void block_signal(int signum) +{ + struct sigaction act; + + memset(&act, 0, sizeof(act)); + + act.sa_handler = SIG_IGN; + sigemptyset(&act.sa_mask); + sigaddset(&act.sa_mask, signum); + sigaction(signum, &act, NULL); +} + +static struct { + const char *nlist; + const char *transport; + const char *myaddress; + const char *public_address_list; + const char *public_interface; + const char *event_script; + const char *logfile; + const char *recovery_lock_file; + const char *db_dir; +} options = { + .nlist = ETCDIR "/ctdb/nodes", + .transport = "tcp", + .event_script = ETCDIR "/ctdb/events", + .logfile = VARDIR "/log/log.ctdb", + .db_dir = VARDIR "/ctdb", +}; + + +/* + called by the transport layer when a packet comes in +*/ +static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length) +{ + struct ctdb_req_header *hdr = (struct ctdb_req_header *)data; + + ctdb->statistics.node_packets_recv++; + + /* up the counter for this source node, so we know its alive */ + if (ctdb_validate_vnn(ctdb, hdr->srcnode)) { + /* as a special case, redirected calls don't increment the rx_cnt */ + if (hdr->operation != CTDB_REQ_CALL || + ((struct ctdb_req_call *)hdr)->hopcount == 0) { + ctdb->nodes[hdr->srcnode]->rx_cnt++; + } + } + + ctdb_input_pkt(ctdb, hdr); +} + + + +static const struct ctdb_upcalls ctdb_upcalls = { + .recv_pkt = ctdb_recv_pkt, + .node_dead = ctdb_node_dead, + .node_connected = ctdb_node_connected +}; + + + +/* + main program +*/ +int main(int argc, const char *argv[]) +{ + struct ctdb_context *ctdb; + int interactive = 0; + + struct poptOption popt_options[] = { + POPT_AUTOHELP + POPT_CTDB_CMDLINE + { "interactive", 'i', POPT_ARG_NONE, &interactive, 0, "don't fork", NULL }, + { "public-addresses", 0, POPT_ARG_STRING, &options.public_address_list, 0, "public address list file", "filename" }, + { "public-interface", 0, POPT_ARG_STRING, &options.public_interface, 0, "public interface", "interface"}, + { "event-script", 0, POPT_ARG_STRING, &options.event_script, 0, "event script", "filename" }, + { "logfile", 0, POPT_ARG_STRING, &options.logfile, 0, "log file location", "filename" }, + { "nlist", 0, POPT_ARG_STRING, &options.nlist, 0, "node list file", "filename" }, + { "listen", 0, POPT_ARG_STRING, &options.myaddress, 0, "address to listen on", "address" }, + { "transport", 0, POPT_ARG_STRING, &options.transport, 0, "protocol transport", NULL }, + { "dbdir", 0, POPT_ARG_STRING, &options.db_dir, 0, "directory for the tdb files", NULL }, + { "reclock", 0, POPT_ARG_STRING, &options.recovery_lock_file, 0, "location of recovery lock file", "filename" }, + POPT_TABLEEND + }; + int opt, ret; + const char **extra_argv; + int extra_argc = 0; + poptContext pc; + struct event_context *ev; + + pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST); + + while ((opt = poptGetNextOpt(pc)) != -1) { + switch (opt) { + default: + fprintf(stderr, "Invalid option %s: %s\n", + poptBadOption(pc, 0), poptStrerror(opt)); + exit(1); + } + } + + /* setup the remaining options for the main program to use */ + extra_argv = poptGetArgs(pc); + if (extra_argv) { + extra_argv++; + while (extra_argv[extra_argc]) extra_argc++; + } + + if (!options.recovery_lock_file) { + DEBUG(0,("You must specifiy the location of a recovery lock file with --reclock\n")); + exit(1); + } + + block_signal(SIGPIPE); + + ev = event_context_init(NULL); + + ctdb = ctdb_cmdline_init(ev); + + ctdb->recovery_mode = CTDB_RECOVERY_NORMAL; + ctdb->recovery_master = (uint32_t)-1; + ctdb->upcalls = &ctdb_upcalls; + ctdb->idr = idr_init(ctdb); + ctdb->recovery_lock_fd = -1; + ctdb->monitoring_mode = CTDB_MONITORING_ACTIVE; + + ctdb_tunables_set_defaults(ctdb); + + ret = ctdb_set_recovery_lock_file(ctdb, options.recovery_lock_file); + if (ret == -1) { + printf("ctdb_set_recovery_lock_file failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + ret = ctdb_set_transport(ctdb, options.transport); + if (ret == -1) { + printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* tell ctdb what address to listen on */ + if (options.myaddress) { + ret = ctdb_set_address(ctdb, options.myaddress); + if (ret == -1) { + printf("ctdb_set_address failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + } + + /* tell ctdb what nodes are available */ + ret = ctdb_set_nlist(ctdb, options.nlist); + if (ret == -1) { + printf("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + if (options.db_dir) { + ret = ctdb_set_tdb_dir(ctdb, options.db_dir); + if (ret == -1) { + printf("ctdb_set_tdb_dir failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + } + + ret = ctdb_set_logfile(ctdb, options.logfile); + if (ret == -1) { + printf("ctdb_set_logfile to %s failed - %s\n", options.logfile, ctdb_errstr(ctdb)); + exit(1); + } + + if (options.public_interface) { + ctdb->takeover.interface = talloc_strdup(ctdb, options.public_interface); + CTDB_NO_MEMORY(ctdb, ctdb->takeover.interface); + } + + if (options.public_address_list) { + ret = ctdb_set_public_addresses(ctdb, options.public_address_list); + if (ret == -1) { + printf("Unable to setup public address list\n"); + exit(1); + } + ctdb->takeover.enabled = true; + } + + ret = ctdb_set_event_script(ctdb, options.event_script); + if (ret == -1) { + printf("Unable to setup event script\n"); + exit(1); + } + + /* useful default logfile */ + if (ctdb->logfile == NULL) { + char *name = talloc_asprintf(ctdb, "%s/log.ctdb.vnn%u", + VARDIR, ctdb->vnn); + ctdb_set_logfile(ctdb, name); + talloc_free(name); + } + + /* start the protocol running (as a child) */ + return ctdb_start_daemon(ctdb, interactive?False:True); +} diff --git a/source4/cluster/ctdb/server/eventscript.c b/source4/cluster/ctdb/server/eventscript.c new file mode 100644 index 0000000000..e23157056c --- /dev/null +++ b/source4/cluster/ctdb/server/eventscript.c @@ -0,0 +1,191 @@ +/* + event script handling + + Copyright (C) Andrew Tridgell 2007 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "system/filesys.h" +#include "system/wait.h" +#include "../include/ctdb_private.h" +#include "lib/events/events.h" + +/* + run the event script - varargs version + */ +static int ctdb_event_script_v(struct ctdb_context *ctdb, const char *fmt, va_list ap) +{ + char *options, *cmdstr; + int ret; + va_list ap2; + struct stat st; + + if (stat(ctdb->takeover.event_script, &st) != 0 && + errno == ENOENT) { + DEBUG(0,("No event script found at '%s'\n", ctdb->takeover.event_script)); + return 0; + } + + va_copy(ap2, ap); + options = talloc_vasprintf(ctdb, fmt, ap2); + va_end(ap2); + CTDB_NO_MEMORY(ctdb, options); + + cmdstr = talloc_asprintf(ctdb, "%s %s", ctdb->takeover.event_script, options); + CTDB_NO_MEMORY(ctdb, cmdstr); + + ret = system(cmdstr); + if (ret != -1) { + ret = WEXITSTATUS(ret); + } + + talloc_free(cmdstr); + talloc_free(options); + + return ret; +} + +/* + run the event script + */ +int ctdb_event_script(struct ctdb_context *ctdb, const char *fmt, ...) +{ + va_list ap; + int ret; + + va_start(ap, fmt); + ret = ctdb_event_script_v(ctdb, fmt, ap); + va_end(ap); + + return ret; +} + + +struct ctdb_event_script_state { + struct ctdb_context *ctdb; + pid_t child; + void (*callback)(struct ctdb_context *, int, void *); + int fd[2]; + void *private_data; +}; + +/* called when child is finished */ +static void ctdb_event_script_handler(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *p) +{ + struct ctdb_event_script_state *state = + talloc_get_type(p, struct ctdb_event_script_state); + int status = -1; + void (*callback)(struct ctdb_context *, int, void *) = state->callback; + void *private_data = state->private_data; + struct ctdb_context *ctdb = state->ctdb; + + waitpid(state->child, &status, 0); + if (status != -1) { + status = WEXITSTATUS(status); + } + talloc_set_destructor(state, NULL); + talloc_free(state); + callback(ctdb, status, private_data); +} + + +/* called when child times out */ +static void ctdb_event_script_timeout(struct event_context *ev, struct timed_event *te, + struct timeval t, void *p) +{ + struct ctdb_event_script_state *state = talloc_get_type(p, struct ctdb_event_script_state); + void (*callback)(struct ctdb_context *, int, void *) = state->callback; + void *private_data = state->private_data; + struct ctdb_context *ctdb = state->ctdb; + + DEBUG(0,("event script timed out\n")); + talloc_free(state); + callback(ctdb, -1, private_data); +} + +/* + destroy a running event script + */ +static int event_script_destructor(struct ctdb_event_script_state *state) +{ + kill(state->child, SIGKILL); + waitpid(state->child, NULL, 0); + return 0; +} + +/* + run the event script in the background, calling the callback when + finished + */ +int ctdb_event_script_callback(struct ctdb_context *ctdb, + struct timeval timeout, + TALLOC_CTX *mem_ctx, + void (*callback)(struct ctdb_context *, int, void *), + void *private_data, + const char *fmt, ...) +{ + struct ctdb_event_script_state *state; + va_list ap; + int ret; + + state = talloc(mem_ctx, struct ctdb_event_script_state); + CTDB_NO_MEMORY(ctdb, state); + + state->ctdb = ctdb; + state->callback = callback; + state->private_data = private_data; + + ret = pipe(state->fd); + if (ret != 0) { + talloc_free(state); + return -1; + } + + state->child = fork(); + + if (state->child == (pid_t)-1) { + close(state->fd[0]); + close(state->fd[1]); + talloc_free(state); + return -1; + } + + if (state->child == 0) { + close(state->fd[0]); + ctdb_set_realtime(false); + set_close_on_exec(state->fd[1]); + va_start(ap, fmt); + ret = ctdb_event_script_v(ctdb, fmt, ap); + va_end(ap); + _exit(ret); + } + + talloc_set_destructor(state, event_script_destructor); + + close(state->fd[1]); + + event_add_fd(ctdb->ev, state, state->fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, + ctdb_event_script_handler, state); + + if (!timeval_is_zero(&timeout)) { + event_add_timed(ctdb->ev, state, timeout, ctdb_event_script_timeout, state); + } + + return 0; +} + + |