From 6504900f1f52927adab3489b8d04b6644ceaee7d Mon Sep 17 00:00:00 2001
From: Andrew Tridgell <tridge@samba.org>
Date: Tue, 10 Jul 2007 08:06:51 +0000
Subject: r23806: update Samba4 with the latest ctdb code.

This doesn't get the ctdb code fully working in Samba4, it just gets
it building and not breaking non-clustered use of Samba. It will take
a bit longer to update some of the calling ctdb_cluster.c code to make
it work correctly in Samba4.

Note also that Samba4 now only links to the client portion of
ctdb. For the moment I am leaving the ctdbd as a separate daemon,
which you install separately from http://ctdb.samba.org/.
(This used to be commit b196077cbb55cbecad87065133c2d67198e31066)
---
 source4/cluster/ctdb/server/ctdb_call.c        |  737 ++++++++++++
 source4/cluster/ctdb/server/ctdb_control.c     |  499 ++++++++
 source4/cluster/ctdb/server/ctdb_daemon.c      |  927 +++++++++++++++
 source4/cluster/ctdb/server/ctdb_freeze.c      |  256 ++++
 source4/cluster/ctdb/server/ctdb_lockwait.c    |  165 +++
 source4/cluster/ctdb/server/ctdb_ltdb_server.c |  366 ++++++
 source4/cluster/ctdb/server/ctdb_monitor.c     |  227 ++++
 source4/cluster/ctdb/server/ctdb_recover.c     |  680 +++++++++++
 source4/cluster/ctdb/server/ctdb_recoverd.c    | 1511 ++++++++++++++++++++++++
 source4/cluster/ctdb/server/ctdb_server.c      |  469 ++++++++
 source4/cluster/ctdb/server/ctdb_takeover.c    |  822 +++++++++++++
 source4/cluster/ctdb/server/ctdb_traverse.c    |  463 ++++++++
 source4/cluster/ctdb/server/ctdb_tunables.c    |  163 +++
 source4/cluster/ctdb/server/ctdbd.c            |  229 ++++
 source4/cluster/ctdb/server/eventscript.c      |  191 +++
 15 files changed, 7705 insertions(+)
 create mode 100644 source4/cluster/ctdb/server/ctdb_call.c
 create mode 100644 source4/cluster/ctdb/server/ctdb_control.c
 create mode 100644 source4/cluster/ctdb/server/ctdb_daemon.c
 create mode 100644 source4/cluster/ctdb/server/ctdb_freeze.c
 create mode 100644 source4/cluster/ctdb/server/ctdb_lockwait.c
 create mode 100644 source4/cluster/ctdb/server/ctdb_ltdb_server.c
 create mode 100644 source4/cluster/ctdb/server/ctdb_monitor.c
 create mode 100644 source4/cluster/ctdb/server/ctdb_recover.c
 create mode 100644 source4/cluster/ctdb/server/ctdb_recoverd.c
 create mode 100644 source4/cluster/ctdb/server/ctdb_server.c
 create mode 100644 source4/cluster/ctdb/server/ctdb_takeover.c
 create mode 100644 source4/cluster/ctdb/server/ctdb_traverse.c
 create mode 100644 source4/cluster/ctdb/server/ctdb_tunables.c
 create mode 100644 source4/cluster/ctdb/server/ctdbd.c
 create mode 100644 source4/cluster/ctdb/server/eventscript.c

(limited to 'source4/cluster/ctdb/server')

diff --git a/source4/cluster/ctdb/server/ctdb_call.c b/source4/cluster/ctdb/server/ctdb_call.c
new file mode 100644
index 0000000000..bbe07717ed
--- /dev/null
+++ b/source4/cluster/ctdb/server/ctdb_call.c
@@ -0,0 +1,737 @@
+/* 
+   ctdb_call protocol code
+
+   Copyright (C) Andrew Tridgell  2006
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+/*
+  see http://wiki.samba.org/index.php/Samba_%26_Clustering for
+  protocol design and packet details
+*/
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/tdb/include/tdb.h"
+#include "lib/util/dlinklist.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "../include/ctdb_private.h"
+
+/*
+  find the ctdb_db from a db index
+ */
+ struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
+{
+	struct ctdb_db_context *ctdb_db;
+
+	for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
+		if (ctdb_db->db_id == id) {
+			break;
+		}
+	}
+	return ctdb_db;
+}
+
+
+/*
+  a varient of input packet that can be used in lock requeue
+*/
+static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
+{
+	struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
+	ctdb_input_pkt(ctdb, hdr);
+}
+
+
+/*
+  send an error reply
+*/
+static void ctdb_send_error(struct ctdb_context *ctdb, 
+			    struct ctdb_req_header *hdr, uint32_t status,
+			    const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
+static void ctdb_send_error(struct ctdb_context *ctdb, 
+			    struct ctdb_req_header *hdr, uint32_t status,
+			    const char *fmt, ...)
+{
+	va_list ap;
+	struct ctdb_reply_error *r;
+	char *msg;
+	int msglen, len;
+
+	va_start(ap, fmt);
+	msg = talloc_vasprintf(ctdb, fmt, ap);
+	if (msg == NULL) {
+		ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
+	}
+	va_end(ap);
+
+	msglen = strlen(msg)+1;
+	len = offsetof(struct ctdb_reply_error, msg);
+	r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen, 
+				    struct ctdb_reply_error);
+	CTDB_NO_MEMORY_FATAL(ctdb, r);
+
+	r->hdr.destnode  = hdr->srcnode;
+	r->hdr.reqid     = hdr->reqid;
+	r->status        = status;
+	r->msglen        = msglen;
+	memcpy(&r->msg[0], msg, msglen);
+
+	ctdb_queue_packet(ctdb, &r->hdr);
+
+	talloc_free(msg);
+}
+
+
+/*
+  send a redirect reply
+*/
+static void ctdb_call_send_redirect(struct ctdb_context *ctdb, 
+				    TDB_DATA key,
+				    struct ctdb_req_call *c, 
+				    struct ctdb_ltdb_header *header)
+{
+	
+	uint32_t lmaster = ctdb_lmaster(ctdb, &key);
+	if (ctdb->vnn == lmaster) {
+		c->hdr.destnode = header->dmaster;
+	} else if ((c->hopcount % ctdb->tunable.max_redirect_count) == 0) {
+		c->hdr.destnode = lmaster;
+	} else {
+		c->hdr.destnode = header->dmaster;
+	}
+	c->hopcount++;
+	ctdb_queue_packet(ctdb, &c->hdr);
+}
+
+
+/*
+  send a dmaster reply
+
+  caller must have the chainlock before calling this routine. Caller must be
+  the lmaster
+*/
+static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
+				    struct ctdb_ltdb_header *header,
+				    TDB_DATA key, TDB_DATA data,
+				    uint32_t new_dmaster,
+				    uint32_t reqid)
+{
+	struct ctdb_context *ctdb = ctdb_db->ctdb;
+	struct ctdb_reply_dmaster *r;
+	int ret, len;
+	TALLOC_CTX *tmp_ctx;
+
+	if (ctdb->vnn != ctdb_lmaster(ctdb, &key)) {
+		DEBUG(0,(__location__ " Caller is not lmaster!\n"));
+		return;
+	}
+
+	header->dmaster = new_dmaster;
+	ret = ctdb_ltdb_store(ctdb_db, key, header, data);
+	if (ret != 0) {
+		ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster");
+		return;
+	}
+
+	/* put the packet on a temporary context, allowing us to safely free
+	   it below even if ctdb_reply_dmaster() has freed it already */
+	tmp_ctx = talloc_new(ctdb);
+
+	/* send the CTDB_REPLY_DMASTER */
+	len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize;
+	r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
+				    struct ctdb_reply_dmaster);
+	CTDB_NO_MEMORY_FATAL(ctdb, r);
+
+	r->hdr.destnode  = new_dmaster;
+	r->hdr.reqid     = reqid;
+	r->rsn           = header->rsn;
+	r->keylen        = key.dsize;
+	r->datalen       = data.dsize;
+	r->db_id         = ctdb_db->db_id;
+	memcpy(&r->data[0], key.dptr, key.dsize);
+	memcpy(&r->data[key.dsize], data.dptr, data.dsize);
+
+	ctdb_queue_packet(ctdb, &r->hdr);
+
+	talloc_free(tmp_ctx);
+}
+
+/*
+  send a dmaster request (give another node the dmaster for a record)
+
+  This is always sent to the lmaster, which ensures that the lmaster
+  always knows who the dmaster is. The lmaster will then send a
+  CTDB_REPLY_DMASTER to the new dmaster
+*/
+static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, 
+				   struct ctdb_req_call *c, 
+				   struct ctdb_ltdb_header *header,
+				   TDB_DATA *key, TDB_DATA *data)
+{
+	struct ctdb_req_dmaster *r;
+	struct ctdb_context *ctdb = ctdb_db->ctdb;
+	int len;
+	uint32_t lmaster = ctdb_lmaster(ctdb, key);
+
+	if (lmaster == ctdb->vnn) {
+		ctdb_send_dmaster_reply(ctdb_db, header, *key, *data, 
+					c->hdr.srcnode, c->hdr.reqid);
+		return;
+	}
+	
+	len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize;
+	r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len, 
+				    struct ctdb_req_dmaster);
+	CTDB_NO_MEMORY_FATAL(ctdb, r);
+	r->hdr.destnode  = lmaster;
+	r->hdr.reqid     = c->hdr.reqid;
+	r->db_id         = c->db_id;
+	r->rsn           = header->rsn;
+	r->dmaster       = c->hdr.srcnode;
+	r->keylen        = key->dsize;
+	r->datalen       = data->dsize;
+	memcpy(&r->data[0], key->dptr, key->dsize);
+	memcpy(&r->data[key->dsize], data->dptr, data->dsize);
+
+	header->dmaster = c->hdr.srcnode;
+	if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
+		ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
+	}
+	
+	ctdb_queue_packet(ctdb, &r->hdr);
+
+	talloc_free(r);
+}
+
+/*
+  called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
+  gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
+
+  must be called with the chainlock held. This function releases the chainlock
+*/
+static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db, 
+				uint32_t reqid, TDB_DATA key, TDB_DATA data,
+				uint64_t rsn)
+{
+	struct ctdb_call_state *state;
+	struct ctdb_context *ctdb = ctdb_db->ctdb;
+	struct ctdb_ltdb_header header;
+
+	DEBUG(2,("vnn %u dmaster response %08x\n", ctdb->vnn, ctdb_hash(&key)));
+
+	ZERO_STRUCT(header);
+	header.rsn = rsn + 1;
+	header.dmaster = ctdb->vnn;
+
+	if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
+		ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
+		ctdb_ltdb_unlock(ctdb_db, key);
+		return;
+	}
+
+	state = ctdb_reqid_find(ctdb, reqid, struct ctdb_call_state);
+
+	if (state == NULL) {
+		DEBUG(0,("vnn %u Invalid reqid %u in ctdb_become_dmaster\n",
+			 ctdb->vnn, reqid));
+		ctdb_ltdb_unlock(ctdb_db, key);
+		return;
+	}
+
+	if (reqid != state->reqid) {
+		/* we found a record  but it was the wrong one */
+		DEBUG(0, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n",reqid));
+		ctdb_ltdb_unlock(ctdb_db, key);
+		return;
+	}
+
+	ctdb_call_local(ctdb_db, &state->call, &header, state, &data, ctdb->vnn);
+
+	ctdb_ltdb_unlock(ctdb_db, state->call.key);
+
+	state->state = CTDB_CALL_DONE;
+	if (state->async.fn) {
+		state->async.fn(state);
+	}
+}
+
+
+
+/*
+  called when a CTDB_REQ_DMASTER packet comes in
+
+  this comes into the lmaster for a record when the current dmaster
+  wants to give up the dmaster role and give it to someone else
+*/
+void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+	struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
+	TDB_DATA key, data, data2;
+	struct ctdb_ltdb_header header;
+	struct ctdb_db_context *ctdb_db;
+	int ret;
+
+	key.dptr = c->data;
+	key.dsize = c->keylen;
+	data.dptr = c->data + c->keylen;
+	data.dsize = c->datalen;
+
+	ctdb_db = find_ctdb_db(ctdb, c->db_id);
+	if (!ctdb_db) {
+		ctdb_send_error(ctdb, hdr, -1,
+				"Unknown database in request. db_id==0x%08x",
+				c->db_id);
+		return;
+	}
+	
+	/* fetch the current record */
+	ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
+					   ctdb_call_input_pkt, ctdb, False);
+	if (ret == -1) {
+		ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
+		return;
+	}
+	if (ret == -2) {
+		DEBUG(2,(__location__ " deferring ctdb_request_dmaster\n"));
+		return;
+	}
+
+	if (ctdb_lmaster(ctdb, &key) != ctdb->vnn) {
+		DEBUG(0,("vnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
+			 ctdb->vnn, ctdb_lmaster(ctdb, &key), 
+			 hdr->generation, ctdb->vnn_map->generation));
+		ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
+	}
+
+	DEBUG(2,("vnn %u dmaster request on %08x for %u from %u\n", 
+		 ctdb->vnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
+
+	/* its a protocol error if the sending node is not the current dmaster */
+	if (header.dmaster != hdr->srcnode) {
+		DEBUG(0,("vnn %u dmaster request non-master %u dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u\n",
+			 ctdb->vnn, hdr->srcnode, header.dmaster, ctdb_hash(&key),
+			 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation));
+		ctdb_fatal(ctdb, "ctdb_req_dmaster from non-master");
+		return;
+	}
+
+	/* use the rsn from the sending node */
+	header.rsn = c->rsn;
+
+	/* check if the new dmaster is the lmaster, in which case we
+	   skip the dmaster reply */
+	if (c->dmaster == ctdb->vnn) {
+		ctdb_become_dmaster(ctdb_db, hdr->reqid, key, data, c->rsn);
+	} else {
+		ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
+		ctdb_ltdb_unlock(ctdb_db, key);
+	}
+}
+
+
+/*
+  called when a CTDB_REQ_CALL packet comes in
+*/
+void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+	struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
+	TDB_DATA data;
+	struct ctdb_reply_call *r;
+	int ret, len;
+	struct ctdb_ltdb_header header;
+	struct ctdb_call call;
+	struct ctdb_db_context *ctdb_db;
+
+	ctdb_db = find_ctdb_db(ctdb, c->db_id);
+	if (!ctdb_db) {
+		ctdb_send_error(ctdb, hdr, -1,
+				"Unknown database in request. db_id==0x%08x",
+				c->db_id);
+		return;
+	}
+
+	call.call_id  = c->callid;
+	call.key.dptr = c->data;
+	call.key.dsize = c->keylen;
+	call.call_data.dptr = c->data + c->keylen;
+	call.call_data.dsize = c->calldatalen;
+
+	/* determine if we are the dmaster for this key. This also
+	   fetches the record data (if any), thus avoiding a 2nd fetch of the data 
+	   if the call will be answered locally */
+
+	ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call.key, &header, hdr, &data,
+					   ctdb_call_input_pkt, ctdb, False);
+	if (ret == -1) {
+		ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
+		return;
+	}
+	if (ret == -2) {
+		DEBUG(2,(__location__ " deferred ctdb_request_call\n"));
+		return;
+	}
+
+	/* if we are not the dmaster, then send a redirect to the
+	   requesting node */
+	if (header.dmaster != ctdb->vnn) {
+		talloc_free(data.dptr);
+		ctdb_call_send_redirect(ctdb, call.key, c, &header);
+		ctdb_ltdb_unlock(ctdb_db, call.key);
+		return;
+	}
+
+	if (c->hopcount > ctdb->statistics.max_hop_count) {
+		ctdb->statistics.max_hop_count = c->hopcount;
+	}
+
+	/* if this nodes has done enough consecutive calls on the same record
+	   then give them the record
+	   or if the node requested an immediate migration
+	*/
+	if ( c->hdr.srcnode != ctdb->vnn &&
+	     ((header.laccessor == c->hdr.srcnode
+	       && header.lacount >= ctdb->tunable.max_lacount)
+	      || (c->flags & CTDB_IMMEDIATE_MIGRATION)) ) {
+		DEBUG(2,("vnn %u starting migration of %08x to %u\n", 
+			 ctdb->vnn, ctdb_hash(&call.key), c->hdr.srcnode));
+		ctdb_call_send_dmaster(ctdb_db, c, &header, &call.key, &data);
+		talloc_free(data.dptr);
+		ctdb_ltdb_unlock(ctdb_db, call.key);
+		return;
+	}
+
+	ctdb_call_local(ctdb_db, &call, &header, hdr, &data, c->hdr.srcnode);
+
+	ctdb_ltdb_unlock(ctdb_db, call.key);
+
+	len = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize;
+	r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
+				    struct ctdb_reply_call);
+	CTDB_NO_MEMORY_FATAL(ctdb, r);
+	r->hdr.destnode  = hdr->srcnode;
+	r->hdr.reqid     = hdr->reqid;
+	r->status        = call.status;
+	r->datalen       = call.reply_data.dsize;
+	if (call.reply_data.dsize) {
+		memcpy(&r->data[0], call.reply_data.dptr, call.reply_data.dsize);
+	}
+
+	ctdb_queue_packet(ctdb, &r->hdr);
+
+	talloc_free(r);
+}
+
+/*
+  called when a CTDB_REPLY_CALL packet comes in
+
+  This packet comes in response to a CTDB_REQ_CALL request packet. It
+  contains any reply data from the call
+*/
+void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+	struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
+	struct ctdb_call_state *state;
+
+	state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
+	if (state == NULL) {
+		DEBUG(0, (__location__ " reqid %u not found\n", hdr->reqid));
+		return;
+	}
+
+	if (hdr->reqid != state->reqid) {
+		/* we found a record  but it was the wrong one */
+		DEBUG(0, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
+		return;
+	}
+
+	state->call.reply_data.dptr = c->data;
+	state->call.reply_data.dsize = c->datalen;
+	state->call.status = c->status;
+
+	talloc_steal(state, c);
+
+	state->state = CTDB_CALL_DONE;
+	if (state->async.fn) {
+		state->async.fn(state);
+	}
+}
+
+
+/*
+  called when a CTDB_REPLY_DMASTER packet comes in
+
+  This packet comes in from the lmaster response to a CTDB_REQ_CALL
+  request packet. It means that the current dmaster wants to give us
+  the dmaster role
+*/
+void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+	struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
+	struct ctdb_db_context *ctdb_db;
+	TDB_DATA key, data;
+	int ret;
+
+	ctdb_db = find_ctdb_db(ctdb, c->db_id);
+	if (ctdb_db == NULL) {
+		DEBUG(0,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
+		return;
+	}
+	
+	key.dptr = c->data;
+	key.dsize = c->keylen;
+	data.dptr = &c->data[key.dsize];
+	data.dsize = c->datalen;
+
+	ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
+				     ctdb_call_input_pkt, ctdb, False);
+	if (ret == -2) {
+		return;
+	}
+	if (ret != 0) {
+		DEBUG(0,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
+		return;
+	}
+
+	ctdb_become_dmaster(ctdb_db, hdr->reqid, key, data, c->rsn);
+}
+
+
+/*
+  called when a CTDB_REPLY_ERROR packet comes in
+*/
+void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+	struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
+	struct ctdb_call_state *state;
+
+	state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
+	if (state == NULL) {
+		DEBUG(0,("vnn %u Invalid reqid %u in ctdb_reply_error\n",
+			 ctdb->vnn, hdr->reqid));
+		return;
+	}
+
+	if (hdr->reqid != state->reqid) {
+		/* we found a record  but it was the wrong one */
+		DEBUG(0, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
+		return;
+	}
+
+	talloc_steal(state, c);
+
+	state->state  = CTDB_CALL_ERROR;
+	state->errmsg = (char *)c->msg;
+	if (state->async.fn) {
+		state->async.fn(state);
+	}
+}
+
+
+/*
+  destroy a ctdb_call
+*/
+static int ctdb_call_destructor(struct ctdb_call_state *state)
+{
+	DLIST_REMOVE(state->ctdb_db->ctdb->pending_calls, state);
+	ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
+	return 0;
+}
+
+
+/*
+  called when a ctdb_call needs to be resent after a reconfigure event
+*/
+static void ctdb_call_resend(struct ctdb_call_state *state)
+{
+	struct ctdb_context *ctdb = state->ctdb_db->ctdb;
+
+	state->generation = ctdb->vnn_map->generation;
+
+	/* use a new reqid, in case the old reply does eventually come in */
+	ctdb_reqid_remove(ctdb, state->reqid);
+	state->reqid = ctdb_reqid_new(ctdb, state);
+	state->c->hdr.reqid = state->reqid;
+
+	/* update the generation count for this request, so its valid with the new vnn_map */
+	state->c->hdr.generation = state->generation;
+
+	/* send the packet to ourselves, it will be redirected appropriately */
+	state->c->hdr.destnode = ctdb->vnn;
+
+	ctdb_queue_packet(ctdb, &state->c->hdr);
+	DEBUG(0,("resent ctdb_call\n"));
+}
+
+/*
+  resend all pending calls on recovery
+ */
+void ctdb_call_resend_all(struct ctdb_context *ctdb)
+{
+	struct ctdb_call_state *state, *next;
+	for (state=ctdb->pending_calls;state;state=next) {
+		next = state->next;
+		ctdb_call_resend(state);
+	}
+}
+
+/*
+  this allows the caller to setup a async.fn 
+*/
+static void call_local_trigger(struct event_context *ev, struct timed_event *te, 
+		       struct timeval t, void *private_data)
+{
+	struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
+	if (state->async.fn) {
+		state->async.fn(state);
+	}
+}	
+
+
+/*
+  construct an event driven local ctdb_call
+
+  this is used so that locally processed ctdb_call requests are processed
+  in an event driven manner
+*/
+struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, 
+					     struct ctdb_call *call,
+					     struct ctdb_ltdb_header *header,
+					     TDB_DATA *data)
+{
+	struct ctdb_call_state *state;
+	struct ctdb_context *ctdb = ctdb_db->ctdb;
+	int ret;
+
+	state = talloc_zero(ctdb_db, struct ctdb_call_state);
+	CTDB_NO_MEMORY_NULL(ctdb, state);
+
+	talloc_steal(state, data->dptr);
+
+	state->state = CTDB_CALL_DONE;
+	state->call = *call;
+	state->ctdb_db = ctdb_db;
+
+	ret = ctdb_call_local(ctdb_db, &state->call, header, state, data, ctdb->vnn);
+
+	event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
+
+	return state;
+}
+
+
+/*
+  make a remote ctdb call - async send. Called in daemon context.
+
+  This constructs a ctdb_call request and queues it for processing. 
+  This call never blocks.
+*/
+struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, 
+						     struct ctdb_call *call, 
+						     struct ctdb_ltdb_header *header)
+{
+	uint32_t len;
+	struct ctdb_call_state *state;
+	struct ctdb_context *ctdb = ctdb_db->ctdb;
+
+	state = talloc_zero(ctdb_db, struct ctdb_call_state);
+	CTDB_NO_MEMORY_NULL(ctdb, state);
+	state->reqid = ctdb_reqid_new(ctdb, state);
+	state->ctdb_db = ctdb_db;
+	talloc_set_destructor(state, ctdb_call_destructor);
+
+	len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
+	state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len, 
+					   struct ctdb_req_call);
+	CTDB_NO_MEMORY_NULL(ctdb, state->c);
+	state->c->hdr.destnode  = header->dmaster;
+
+	/* this limits us to 16k outstanding messages - not unreasonable */
+	state->c->hdr.reqid     = state->reqid;
+	state->c->flags         = call->flags;
+	state->c->db_id         = ctdb_db->db_id;
+	state->c->callid        = call->call_id;
+	state->c->hopcount      = 0;
+	state->c->keylen        = call->key.dsize;
+	state->c->calldatalen   = call->call_data.dsize;
+	memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
+	memcpy(&state->c->data[call->key.dsize], 
+	       call->call_data.dptr, call->call_data.dsize);
+	state->call                = *call;
+	state->call.call_data.dptr = &state->c->data[call->key.dsize];
+	state->call.key.dptr       = &state->c->data[0];
+
+	state->state  = CTDB_CALL_WAIT;
+	state->generation = ctdb->vnn_map->generation;
+
+	DLIST_ADD(ctdb->pending_calls, state);
+
+	ctdb_queue_packet(ctdb, &state->c->hdr);
+
+	return state;
+}
+
+/*
+  make a remote ctdb call - async recv - called in daemon context
+
+  This is called when the program wants to wait for a ctdb_call to complete and get the 
+  results. This call will block unless the call has already completed.
+*/
+int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
+{
+	while (state->state < CTDB_CALL_DONE) {
+		event_loop_once(state->ctdb_db->ctdb->ev);
+	}
+	if (state->state != CTDB_CALL_DONE) {
+		ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
+		talloc_free(state);
+		return -1;
+	}
+
+	if (state->call.reply_data.dsize) {
+		call->reply_data.dptr = talloc_memdup(state->ctdb_db->ctdb,
+						      state->call.reply_data.dptr,
+						      state->call.reply_data.dsize);
+		call->reply_data.dsize = state->call.reply_data.dsize;
+	} else {
+		call->reply_data.dptr = NULL;
+		call->reply_data.dsize = 0;
+	}
+	call->status = state->call.status;
+	talloc_free(state);
+	return 0;
+}
+
+
+/* 
+   send a keepalive packet to the other node
+*/
+void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
+{
+	struct ctdb_req_keepalive *r;
+	
+	r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
+				    sizeof(struct ctdb_req_keepalive), 
+				    struct ctdb_req_keepalive);
+	CTDB_NO_MEMORY_FATAL(ctdb, r);
+	r->hdr.destnode  = destnode;
+	r->hdr.reqid     = 0;
+	
+	ctdb->statistics.keepalive_packets_sent++;
+
+	ctdb_queue_packet(ctdb, &r->hdr);
+
+	talloc_free(r);
+}
diff --git a/source4/cluster/ctdb/server/ctdb_control.c b/source4/cluster/ctdb/server/ctdb_control.c
new file mode 100644
index 0000000000..69848bb15c
--- /dev/null
+++ b/source4/cluster/ctdb/server/ctdb_control.c
@@ -0,0 +1,499 @@
+/* 
+   ctdb_control protocol code
+
+   Copyright (C) Andrew Tridgell  2007
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/tdb/include/tdb.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "system/wait.h"
+#include "../include/ctdb_private.h"
+#include "lib/util/dlinklist.h"
+#include "db_wrap.h"
+
+struct ctdb_control_state {
+	struct ctdb_context *ctdb;
+	uint32_t reqid;
+	ctdb_control_callback_fn_t callback;
+	void *private_data;
+	unsigned flags;
+};
+
+/*
+  process a control request
+ */
+static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb, 
+				     struct ctdb_req_control *c,
+				     TDB_DATA indata,
+				     TDB_DATA *outdata, uint32_t srcnode,
+				     const char **errormsg,
+				     bool *async_reply)
+{
+	uint32_t opcode = c->opcode;
+	uint64_t srvid = c->srvid;
+	uint32_t client_id = c->client_id;
+
+	switch (opcode) {
+	case CTDB_CONTROL_PROCESS_EXISTS: {
+		CHECK_CONTROL_DATA_SIZE(sizeof(pid_t));
+		return kill(*(pid_t *)indata.dptr, 0);
+	}
+
+	case CTDB_CONTROL_SET_DEBUG: {
+		CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
+		LogLevel = *(uint32_t *)indata.dptr;
+		return 0;
+	}
+
+	case CTDB_CONTROL_GET_DEBUG: {
+		CHECK_CONTROL_DATA_SIZE(0);
+		outdata->dptr = (uint8_t *)&LogLevel;
+		outdata->dsize = sizeof(LogLevel);
+		return 0;
+	}
+
+	case CTDB_CONTROL_STATISTICS: {
+		CHECK_CONTROL_DATA_SIZE(0);
+		ctdb->statistics.memory_used = talloc_total_size(ctdb);
+		ctdb->statistics.frozen = (ctdb->freeze_mode == CTDB_FREEZE_FROZEN);
+		ctdb->statistics.recovering = (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE);
+		outdata->dptr = (uint8_t *)&ctdb->statistics;
+		outdata->dsize = sizeof(ctdb->statistics);
+		return 0;
+	}
+
+	case CTDB_CONTROL_GET_ALL_TUNABLES: {
+		CHECK_CONTROL_DATA_SIZE(0);
+		outdata->dptr = (uint8_t *)&ctdb->tunable;
+		outdata->dsize = sizeof(ctdb->tunable);
+		return 0;
+	}
+
+	case CTDB_CONTROL_DUMP_MEMORY: {
+		CHECK_CONTROL_DATA_SIZE(0);
+		talloc_report_full(ctdb, stdout);
+		return 0;
+	}
+
+	case CTDB_CONTROL_STATISTICS_RESET: {
+		CHECK_CONTROL_DATA_SIZE(0);
+		ZERO_STRUCT(ctdb->statistics);
+		return 0;
+	}
+
+	case CTDB_CONTROL_GETVNNMAP:
+		return ctdb_control_getvnnmap(ctdb, opcode, indata, outdata);
+
+	case CTDB_CONTROL_GET_DBMAP:
+		return ctdb_control_getdbmap(ctdb, opcode, indata, outdata);
+
+	case CTDB_CONTROL_GET_NODEMAP:
+		return ctdb_control_getnodemap(ctdb, opcode, indata, outdata);
+
+	case CTDB_CONTROL_SETVNNMAP:
+		return ctdb_control_setvnnmap(ctdb, opcode, indata, outdata);
+
+	case CTDB_CONTROL_PULL_DB: 
+		CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_pulldb));
+		return ctdb_control_pull_db(ctdb, indata, outdata);
+
+	case CTDB_CONTROL_SET_DMASTER: 
+		CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_set_dmaster));
+		return ctdb_control_set_dmaster(ctdb, indata);
+
+	case CTDB_CONTROL_PUSH_DB:
+		return ctdb_control_push_db(ctdb, indata);
+
+	case CTDB_CONTROL_GET_RECMODE: {
+		return ctdb->recovery_mode;
+	}
+
+	case CTDB_CONTROL_SET_RECMASTER: {
+		CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
+		if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
+			DEBUG(0,("Attempt to set recmaster when not frozen\n"));
+			return -1;
+		}
+		ctdb->recovery_master = ((uint32_t *)(&indata.dptr[0]))[0];
+		return 0;
+	}
+
+	case CTDB_CONTROL_GET_RECMASTER:
+		return ctdb->recovery_master;
+
+	case CTDB_CONTROL_GET_PID:
+		return getpid();
+
+	case CTDB_CONTROL_GET_VNN:
+		return ctdb->vnn;
+
+	case CTDB_CONTROL_PING:
+		CHECK_CONTROL_DATA_SIZE(0);
+		return ctdb->statistics.num_clients;
+
+	case CTDB_CONTROL_GET_DBNAME: {
+		uint32_t db_id;
+		struct ctdb_db_context *ctdb_db;
+
+		CHECK_CONTROL_DATA_SIZE(sizeof(db_id));
+		db_id = *(uint32_t *)indata.dptr;
+		ctdb_db = find_ctdb_db(ctdb, db_id);
+		if (ctdb_db == NULL) return -1;
+		outdata->dptr = discard_const(ctdb_db->db_name);
+		outdata->dsize = strlen(ctdb_db->db_name)+1;
+		return 0;
+	}
+
+	case CTDB_CONTROL_GETDBPATH: {
+		uint32_t db_id;
+		struct ctdb_db_context *ctdb_db;
+
+		CHECK_CONTROL_DATA_SIZE(sizeof(db_id));
+		db_id = *(uint32_t *)indata.dptr;
+		ctdb_db = find_ctdb_db(ctdb, db_id);
+		if (ctdb_db == NULL) return -1;
+		outdata->dptr = discard_const(ctdb_db->db_path);
+		outdata->dsize = strlen(ctdb_db->db_path)+1;
+		return 0;
+	}
+
+	case CTDB_CONTROL_DB_ATTACH:
+		return ctdb_control_db_attach(ctdb, indata, outdata);
+
+	case CTDB_CONTROL_SET_CALL: {
+		struct ctdb_control_set_call *sc = 
+			(struct ctdb_control_set_call *)indata.dptr;
+		CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_set_call));
+		return ctdb_daemon_set_call(ctdb, sc->db_id, sc->fn, sc->id);
+	}
+
+	case CTDB_CONTROL_TRAVERSE_START:
+		CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_traverse_start));
+		return ctdb_control_traverse_start(ctdb, indata, outdata, srcnode);
+
+	case CTDB_CONTROL_TRAVERSE_ALL:
+		return ctdb_control_traverse_all(ctdb, indata, outdata);
+
+	case CTDB_CONTROL_TRAVERSE_DATA:
+		return ctdb_control_traverse_data(ctdb, indata, outdata);
+
+	case CTDB_CONTROL_REGISTER_SRVID:
+		return daemon_register_message_handler(ctdb, client_id, srvid);
+
+	case CTDB_CONTROL_DEREGISTER_SRVID:
+		return daemon_deregister_message_handler(ctdb, client_id, srvid);
+
+	case CTDB_CONTROL_ENABLE_SEQNUM:
+		CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
+		return ctdb_ltdb_enable_seqnum(ctdb, *(uint32_t *)indata.dptr);
+
+	case CTDB_CONTROL_UPDATE_SEQNUM:
+		CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));		
+		return ctdb_ltdb_update_seqnum(ctdb, *(uint32_t *)indata.dptr, srcnode);
+
+	case CTDB_CONTROL_FREEZE:
+		CHECK_CONTROL_DATA_SIZE(0);
+		return ctdb_control_freeze(ctdb, c, async_reply);
+
+	case CTDB_CONTROL_THAW:
+		CHECK_CONTROL_DATA_SIZE(0);
+		return ctdb_control_thaw(ctdb);
+
+	case CTDB_CONTROL_SET_RECMODE:
+		CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));		
+		return ctdb_control_set_recmode(ctdb, c, indata, async_reply, errormsg);
+
+	case CTDB_CONTROL_SET_MONMODE:
+		CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));		
+		ctdb->monitoring_mode = *(uint32_t *)indata.dptr;
+		return 0;
+
+	case CTDB_CONTROL_GET_MONMODE: 
+		return ctdb->monitoring_mode;
+
+	case CTDB_CONTROL_SHUTDOWN:
+		ctdb_release_all_ips(ctdb);
+		ctdb->methods->shutdown(ctdb);
+		ctdb_event_script(ctdb, "shutdown");
+		DEBUG(0,("shutting down\n"));
+		exit(0);
+
+	case CTDB_CONTROL_MAX_RSN: 
+		CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
+		return ctdb_control_max_rsn(ctdb, indata, outdata);
+
+	case CTDB_CONTROL_SET_RSN_NONEMPTY: 
+		CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_set_rsn_nonempty));
+		return ctdb_control_set_rsn_nonempty(ctdb, indata, outdata);
+
+	case CTDB_CONTROL_TAKEOVER_IP:
+		CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_public_ip));
+		return ctdb_control_takeover_ip(ctdb, c, indata, async_reply);
+
+	case CTDB_CONTROL_RELEASE_IP:
+		CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_public_ip));
+		return ctdb_control_release_ip(ctdb, c, indata, async_reply);
+
+	case CTDB_CONTROL_GET_PUBLIC_IPS:
+		CHECK_CONTROL_DATA_SIZE(0);
+		return ctdb_control_get_public_ips(ctdb, c, outdata);
+
+	case CTDB_CONTROL_DELETE_LOW_RSN: 
+		CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_delete_low_rsn));
+		return ctdb_control_delete_low_rsn(ctdb, indata, outdata);
+
+	case CTDB_CONTROL_TCP_CLIENT: 
+		CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_tcp));
+		return ctdb_control_tcp_client(ctdb, client_id, srcnode, indata);
+
+	case CTDB_CONTROL_STARTUP: 
+		CHECK_CONTROL_DATA_SIZE(0);
+		return ctdb_control_startup(ctdb, srcnode);
+
+	case CTDB_CONTROL_TCP_ADD: 
+		CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_tcp_vnn));
+		return ctdb_control_tcp_add(ctdb, indata);
+
+	case CTDB_CONTROL_TCP_REMOVE: 
+		CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_tcp_vnn));
+		return ctdb_control_tcp_remove(ctdb, indata);
+
+	case CTDB_CONTROL_SET_TUNABLE:
+		return ctdb_control_set_tunable(ctdb, indata);
+
+	case CTDB_CONTROL_GET_TUNABLE:
+		return ctdb_control_get_tunable(ctdb, indata, outdata);
+
+	case CTDB_CONTROL_LIST_TUNABLES:
+		return ctdb_control_list_tunables(ctdb, outdata);
+
+	case CTDB_CONTROL_MODIFY_FLAGS:
+		CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_node_modflags));
+		return ctdb_control_modflags(ctdb, indata);
+
+	default:
+		DEBUG(0,(__location__ " Unknown CTDB control opcode %u\n", opcode));
+		return -1;
+	}
+}
+
+
+/*
+  send a reply for a ctdb control
+ */
+void ctdb_request_control_reply(struct ctdb_context *ctdb, struct ctdb_req_control *c,
+				TDB_DATA *outdata, int32_t status, const char *errormsg)
+{
+	struct ctdb_reply_control *r;
+	size_t len;
+	
+	/* some controls send no reply */
+	if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
+		return;
+	}
+
+	len = offsetof(struct ctdb_reply_control, data) + (outdata?outdata->dsize:0);
+	if (errormsg) {
+		len += strlen(errormsg);
+	}
+	r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CONTROL, len, struct ctdb_reply_control);
+	CTDB_NO_MEMORY_VOID(ctdb, r);
+
+	r->hdr.destnode     = c->hdr.srcnode;
+	r->hdr.reqid        = c->hdr.reqid;
+	r->status           = status;
+	r->datalen          = outdata?outdata->dsize:0;
+	if (outdata && outdata->dsize) {
+		memcpy(&r->data[0], outdata->dptr, outdata->dsize);
+	}
+	if (errormsg) {
+		r->errorlen = strlen(errormsg);
+		memcpy(&r->data[r->datalen], errormsg, r->errorlen);
+	}
+	
+	ctdb_queue_packet(ctdb, &r->hdr);	
+
+	talloc_free(r);
+}
+
+/*
+  called when a CTDB_REQ_CONTROL packet comes in
+*/
+void ctdb_request_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+	struct ctdb_req_control *c = (struct ctdb_req_control *)hdr;
+	TDB_DATA data, *outdata;
+	int32_t status;
+	bool async_reply = False;
+	const char *errormsg = NULL;
+
+	data.dptr = &c->data[0];
+	data.dsize = c->datalen;
+
+	outdata = talloc_zero(c, TDB_DATA);
+
+	status = ctdb_control_dispatch(ctdb, c, data, outdata, hdr->srcnode, 
+				       &errormsg, &async_reply);
+
+	if (!async_reply) {
+		ctdb_request_control_reply(ctdb, c, outdata, status, errormsg);
+	}
+}
+
+/*
+  called when a CTDB_REPLY_CONTROL packet comes in
+*/
+void ctdb_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+	struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
+	TDB_DATA data;
+	struct ctdb_control_state *state;
+	const char *errormsg = NULL;
+
+	state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_control_state);
+	if (state == NULL) {
+		DEBUG(0,("vnn %u Invalid reqid %u in ctdb_reply_control\n",
+			 ctdb->vnn, hdr->reqid));
+		return;
+	}
+
+	if (hdr->reqid != state->reqid) {
+		/* we found a record  but it was the wrong one */
+		DEBUG(0, ("Dropped orphaned control reply with reqid:%u\n", hdr->reqid));
+		return;
+	}
+
+	data.dptr = &c->data[0];
+	data.dsize = c->datalen;
+	if (c->errorlen) {
+		errormsg = talloc_strndup(state, 
+					  (char *)&c->data[c->datalen], c->errorlen);
+	}
+
+	/* make state a child of the packet, so it goes away when the packet
+	   is freed. */
+	talloc_steal(hdr, state);
+
+	state->callback(ctdb, c->status, data, errormsg, state->private_data);
+}
+
+static int ctdb_control_destructor(struct ctdb_control_state *state)
+{
+	ctdb_reqid_remove(state->ctdb, state->reqid);
+	return 0;
+}
+
+/*
+  handle a timeout of a control
+ */
+static void ctdb_control_timeout(struct event_context *ev, struct timed_event *te, 
+		       struct timeval t, void *private_data)
+{
+	struct ctdb_control_state *state = talloc_get_type(private_data, struct ctdb_control_state);
+	TALLOC_CTX *tmp_ctx = talloc_new(ev);
+
+	state->ctdb->statistics.timeouts.control++;
+
+	talloc_steal(tmp_ctx, state);
+
+	state->callback(state->ctdb, -1, tdb_null,
+			"ctdb_control timed out", 
+			state->private_data);
+	talloc_free(tmp_ctx);
+}
+
+
+/*
+  send a control message to a node
+ */
+int ctdb_daemon_send_control(struct ctdb_context *ctdb, uint32_t destnode,
+			     uint64_t srvid, uint32_t opcode, uint32_t client_id,
+			     uint32_t flags,
+			     TDB_DATA data,
+			     ctdb_control_callback_fn_t callback,
+			     void *private_data)
+{
+	struct ctdb_req_control *c;
+	struct ctdb_control_state *state;
+	size_t len;
+
+	if (((destnode == CTDB_BROADCAST_VNNMAP) || 
+	     (destnode == CTDB_BROADCAST_ALL) ||
+	     (destnode == CTDB_BROADCAST_CONNECTED)) && 
+	    !(flags & CTDB_CTRL_FLAG_NOREPLY)) {
+		DEBUG(0,("Attempt to broadcast control without NOREPLY\n"));
+		return -1;
+	}
+
+	if (destnode != CTDB_BROADCAST_VNNMAP && 
+	    destnode != CTDB_BROADCAST_ALL && 
+	    destnode != CTDB_BROADCAST_CONNECTED && 
+	    (!ctdb_validate_vnn(ctdb, destnode) || 
+	     (ctdb->nodes[destnode]->flags & NODE_FLAGS_DISCONNECTED))) {
+		if (!(flags & CTDB_CTRL_FLAG_NOREPLY)) {
+			callback(ctdb, -1, tdb_null, "ctdb_control to disconnected node", private_data);
+		}
+		return 0;
+	}
+
+	/* the state is made a child of private_data if possible. This means any reply
+	   will be discarded if the private_data goes away */
+	state = talloc(private_data?private_data:ctdb, struct ctdb_control_state);
+	CTDB_NO_MEMORY(ctdb, state);
+
+	state->reqid = ctdb_reqid_new(ctdb, state);
+	state->callback = callback;
+	state->private_data = private_data;
+	state->ctdb = ctdb;
+	state->flags = flags;
+
+	talloc_set_destructor(state, ctdb_control_destructor);
+
+	len = offsetof(struct ctdb_req_control, data) + data.dsize;
+	c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CONTROL, len, 
+				    struct ctdb_req_control);
+	CTDB_NO_MEMORY(ctdb, c);
+	talloc_set_name_const(c, "ctdb_req_control packet");
+
+	c->hdr.destnode     = destnode;
+	c->hdr.reqid        = state->reqid;
+	c->opcode           = opcode;
+	c->client_id        = client_id;
+	c->flags            = flags;
+	c->srvid            = srvid;
+	c->datalen          = data.dsize;
+	if (data.dsize) {
+		memcpy(&c->data[0], data.dptr, data.dsize);
+	}
+
+	ctdb_queue_packet(ctdb, &c->hdr);	
+
+	if (flags & CTDB_CTRL_FLAG_NOREPLY) {
+		talloc_free(state);
+		return 0;
+	}
+
+	if (ctdb->tunable.control_timeout) {
+		event_add_timed(ctdb->ev, state, 
+				timeval_current_ofs(ctdb->tunable.control_timeout, 0), 
+				ctdb_control_timeout, state);
+	}
+
+	talloc_free(c);
+	return 0;
+}
diff --git a/source4/cluster/ctdb/server/ctdb_daemon.c b/source4/cluster/ctdb/server/ctdb_daemon.c
new file mode 100644
index 0000000000..2577970075
--- /dev/null
+++ b/source4/cluster/ctdb/server/ctdb_daemon.c
@@ -0,0 +1,927 @@
+/* 
+   ctdb daemon code
+
+   Copyright (C) Andrew Tridgell  2006
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "db_wrap.h"
+#include "lib/tdb/include/tdb.h"
+#include "lib/events/events.h"
+#include "lib/util/dlinklist.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "system/wait.h"
+#include "../include/ctdb.h"
+#include "../include/ctdb_private.h"
+
+static void daemon_incoming_packet(void *, struct ctdb_req_header *);
+
+/*
+  handler for when a node changes its flags
+*/
+static void flag_change_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+				TDB_DATA data, void *private_data)
+{
+	struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
+
+	if (data.dsize != sizeof(*c) || !ctdb_validate_vnn(ctdb, c->vnn)) {
+		DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
+		return;
+	}
+
+	if (!ctdb_validate_vnn(ctdb, c->vnn)) {
+		DEBUG(0,("Bad vnn %u in flag_change_handler\n", c->vnn));
+		return;
+	}
+
+	/* don't get the disconnected flag from the other node */
+	ctdb->nodes[c->vnn]->flags = 
+		(ctdb->nodes[c->vnn]->flags&NODE_FLAGS_DISCONNECTED) 
+		| (c->flags & ~NODE_FLAGS_DISCONNECTED);	
+	DEBUG(2,("Node flags for node %u are now 0x%x\n", c->vnn, ctdb->nodes[c->vnn]->flags));
+
+	/* make sure we don't hold any IPs when we shouldn't */
+	if (c->vnn == ctdb->vnn &&
+	    (ctdb->nodes[c->vnn]->flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_BANNED))) {
+		ctdb_release_all_ips(ctdb);
+	}
+}
+
+/* called when the "startup" event script has finished */
+static void ctdb_start_transport(struct ctdb_context *ctdb, int status, void *p)
+{
+	if (status != 0) {
+		DEBUG(0,("startup event failed!\n"));
+		ctdb_fatal(ctdb, "startup event script failed");		
+	}
+
+	/* start the transport running */
+	if (ctdb->methods->start(ctdb) != 0) {
+		DEBUG(0,("transport failed to start!\n"));
+		ctdb_fatal(ctdb, "transport failed to start");
+	}
+
+	/* start the recovery daemon process */
+	if (ctdb_start_recoverd(ctdb) != 0) {
+		DEBUG(0,("Failed to start recovery daemon\n"));
+		exit(11);
+	}
+
+	/* a handler for when nodes are disabled/enabled */
+	ctdb_register_message_handler(ctdb, ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, 
+				      flag_change_handler, NULL);
+
+	/* start monitoring for dead nodes */
+	ctdb_start_monitoring(ctdb);
+}
+
+/* go into main ctdb loop */
+static void ctdb_main_loop(struct ctdb_context *ctdb)
+{
+	int ret = -1;
+
+	if (strcmp(ctdb->transport, "tcp") == 0) {
+		int ctdb_tcp_init(struct ctdb_context *);
+		ret = ctdb_tcp_init(ctdb);
+	}
+#ifdef USE_INFINIBAND
+	if (strcmp(ctdb->transport, "ib") == 0) {
+		int ctdb_ibw_init(struct ctdb_context *);
+		ret = ctdb_ibw_init(ctdb);
+	}
+#endif
+	if (ret != 0) {
+		DEBUG(0,("Failed to initialise transport '%s'\n", ctdb->transport));
+		return;
+	}
+
+	/* initialise the transport  */
+	if (ctdb->methods->initialise(ctdb) != 0) {
+		DEBUG(0,("transport failed to initialise!\n"));
+		ctdb_fatal(ctdb, "transport failed to initialise");
+	}
+
+	/* tell all other nodes we've just started up */
+	ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
+				 0, CTDB_CONTROL_STARTUP, 0,
+				 CTDB_CTRL_FLAG_NOREPLY,
+				 tdb_null, NULL, NULL);
+
+	/* release any IPs we hold from previous runs of the daemon */
+	ctdb_release_all_ips(ctdb);
+
+	ret = ctdb_event_script_callback(ctdb, timeval_zero(), ctdb, 
+					 ctdb_start_transport, NULL, "startup");
+	if (ret != 0) {
+		DEBUG(0,("Failed startup event script\n"));
+		return;
+	}
+
+	/* go into a wait loop to allow other nodes to complete */
+	event_loop_wait(ctdb->ev);
+
+	DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
+	exit(1);
+}
+
+
+static void block_signal(int signum)
+{
+	struct sigaction act;
+
+	memset(&act, 0, sizeof(act));
+
+	act.sa_handler = SIG_IGN;
+	sigemptyset(&act.sa_mask);
+	sigaddset(&act.sa_mask, signum);
+	sigaction(signum, &act, NULL);
+}
+
+
+/*
+  send a packet to a client
+ */
+static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
+{
+	client->ctdb->statistics.client_packets_sent++;
+	return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
+}
+
+/*
+  message handler for when we are in daemon mode. This redirects the message
+  to the right client
+ */
+static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+				    TDB_DATA data, void *private_data)
+{
+	struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
+	struct ctdb_req_message *r;
+	int len;
+
+	/* construct a message to send to the client containing the data */
+	len = offsetof(struct ctdb_req_message, data) + data.dsize;
+	r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
+			       len, struct ctdb_req_message);
+	CTDB_NO_MEMORY_VOID(ctdb, r);
+
+	talloc_set_name_const(r, "req_message packet");
+
+	r->srvid         = srvid;
+	r->datalen       = data.dsize;
+	memcpy(&r->data[0], data.dptr, data.dsize);
+
+	daemon_queue_send(client, &r->hdr);
+
+	talloc_free(r);
+}
+					   
+
+/*
+  this is called when the ctdb daemon received a ctdb request to 
+  set the srvid from the client
+ */
+int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
+{
+	struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
+	int res;
+	if (client == NULL) {
+		DEBUG(0,("Bad client_id in daemon_request_register_message_handler\n"));
+		return -1;
+	}
+	res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
+	if (res != 0) {
+		DEBUG(0,(__location__ " Failed to register handler %llu in daemon\n", 
+			 (unsigned long long)srvid));
+	} else {
+		DEBUG(2,(__location__ " Registered message handler for srvid=%llu\n", 
+			 (unsigned long long)srvid));
+	}
+
+	/* this is a hack for Samba - we now know the pid of the Samba client */
+	if ((srvid & 0xFFFFFFFF) == srvid &&
+	    kill(srvid, 0) == 0) {
+		client->pid = srvid;
+		DEBUG(0,(__location__ " Registered PID %u for client %u\n",
+			 (unsigned)client->pid, client_id));
+	}
+	return res;
+}
+
+/*
+  this is called when the ctdb daemon received a ctdb request to 
+  remove a srvid from the client
+ */
+int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
+{
+	struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
+	if (client == NULL) {
+		DEBUG(0,("Bad client_id in daemon_request_deregister_message_handler\n"));
+		return -1;
+	}
+	return ctdb_deregister_message_handler(ctdb, srvid, client);
+}
+
+
+/*
+  destroy a ctdb_client
+*/
+static int ctdb_client_destructor(struct ctdb_client *client)
+{
+	ctdb_takeover_client_destructor_hook(client);
+	ctdb_reqid_remove(client->ctdb, client->client_id);
+	client->ctdb->statistics.num_clients--;
+	return 0;
+}
+
+
+/*
+  this is called when the ctdb daemon received a ctdb request message
+  from a local client over the unix domain socket
+ */
+static void daemon_request_message_from_client(struct ctdb_client *client, 
+					       struct ctdb_req_message *c)
+{
+	TDB_DATA data;
+	int res;
+
+	/* maybe the message is for another client on this node */
+	if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
+		ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
+		return;
+	}
+
+	/* its for a remote node */
+	data.dptr = &c->data[0];
+	data.dsize = c->datalen;
+	res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
+				       c->srvid, data);
+	if (res != 0) {
+		DEBUG(0,(__location__ " Failed to send message to remote node %u\n",
+			 c->hdr.destnode));
+	}
+}
+
+
+struct daemon_call_state {
+	struct ctdb_client *client;
+	uint32_t reqid;
+	struct ctdb_call *call;
+	struct timeval start_time;
+};
+
+/* 
+   complete a call from a client 
+*/
+static void daemon_call_from_client_callback(struct ctdb_call_state *state)
+{
+	struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
+							   struct daemon_call_state);
+	struct ctdb_reply_call *r;
+	int res;
+	uint32_t length;
+	struct ctdb_client *client = dstate->client;
+
+	talloc_steal(client, dstate);
+	talloc_steal(dstate, dstate->call);
+
+	res = ctdb_daemon_call_recv(state, dstate->call);
+	if (res != 0) {
+		DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
+		client->ctdb->statistics.pending_calls--;
+		ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
+		return;
+	}
+
+	length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
+	r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
+			       length, struct ctdb_reply_call);
+	if (r == NULL) {
+		DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
+		client->ctdb->statistics.pending_calls--;
+		ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
+		return;
+	}
+	r->hdr.reqid        = dstate->reqid;
+	r->datalen          = dstate->call->reply_data.dsize;
+	memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
+
+	res = daemon_queue_send(client, &r->hdr);
+	if (res != 0) {
+		DEBUG(0, (__location__ " Failed to queue packet from daemon to client\n"));
+	}
+	ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
+	talloc_free(dstate);
+	client->ctdb->statistics.pending_calls--;
+}
+
+
+static void daemon_request_call_from_client(struct ctdb_client *client, 
+					    struct ctdb_req_call *c);
+
+/*
+  this is called when the ctdb daemon received a ctdb request call
+  from a local client over the unix domain socket
+ */
+static void daemon_request_call_from_client(struct ctdb_client *client, 
+					    struct ctdb_req_call *c)
+{
+	struct ctdb_call_state *state;
+	struct ctdb_db_context *ctdb_db;
+	struct daemon_call_state *dstate;
+	struct ctdb_call *call;
+	struct ctdb_ltdb_header header;
+	TDB_DATA key, data;
+	int ret;
+	struct ctdb_context *ctdb = client->ctdb;
+
+	ctdb->statistics.total_calls++;
+	ctdb->statistics.pending_calls++;
+
+	ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
+	if (!ctdb_db) {
+		DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
+			  c->db_id));
+		ctdb->statistics.pending_calls--;
+		return;
+	}
+
+	key.dptr = c->data;
+	key.dsize = c->keylen;
+
+	ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
+					   (struct ctdb_req_header *)c, &data,
+					   daemon_incoming_packet, client, True);
+	if (ret == -2) {
+		/* will retry later */
+		ctdb->statistics.pending_calls--;
+		return;
+	}
+
+	if (ret != 0) {
+		DEBUG(0,(__location__ " Unable to fetch record\n"));
+		ctdb->statistics.pending_calls--;
+		return;
+	}
+
+	dstate = talloc(client, struct daemon_call_state);
+	if (dstate == NULL) {
+		ctdb_ltdb_unlock(ctdb_db, key);
+		DEBUG(0,(__location__ " Unable to allocate dstate\n"));
+		ctdb->statistics.pending_calls--;
+		return;
+	}
+	dstate->start_time = timeval_current();
+	dstate->client = client;
+	dstate->reqid  = c->hdr.reqid;
+	talloc_steal(dstate, data.dptr);
+
+	call = dstate->call = talloc_zero(dstate, struct ctdb_call);
+	if (call == NULL) {
+		ctdb_ltdb_unlock(ctdb_db, key);
+		DEBUG(0,(__location__ " Unable to allocate call\n"));
+		ctdb->statistics.pending_calls--;
+		ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
+		return;
+	}
+
+	call->call_id = c->callid;
+	call->key = key;
+	call->call_data.dptr = c->data + c->keylen;
+	call->call_data.dsize = c->calldatalen;
+	call->flags = c->flags;
+
+	if (header.dmaster == ctdb->vnn) {
+		state = ctdb_call_local_send(ctdb_db, call, &header, &data);
+	} else {
+		state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
+	}
+
+	ctdb_ltdb_unlock(ctdb_db, key);
+
+	if (state == NULL) {
+		DEBUG(0,(__location__ " Unable to setup call send\n"));
+		ctdb->statistics.pending_calls--;
+		ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
+		return;
+	}
+	talloc_steal(state, dstate);
+	talloc_steal(client, state);
+
+	state->async.fn = daemon_call_from_client_callback;
+	state->async.private_data = dstate;
+}
+
+
+static void daemon_request_control_from_client(struct ctdb_client *client, 
+					       struct ctdb_req_control *c);
+
+/* data contains a packet from the client */
+static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
+{
+	struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
+	TALLOC_CTX *tmp_ctx;
+	struct ctdb_context *ctdb = client->ctdb;
+
+	/* place the packet as a child of a tmp_ctx. We then use
+	   talloc_free() below to free it. If any of the calls want
+	   to keep it, then they will steal it somewhere else, and the
+	   talloc_free() will be a no-op */
+	tmp_ctx = talloc_new(client);
+	talloc_steal(tmp_ctx, hdr);
+
+	if (hdr->ctdb_magic != CTDB_MAGIC) {
+		ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
+		goto done;
+	}
+
+	if (hdr->ctdb_version != CTDB_VERSION) {
+		ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
+		goto done;
+	}
+
+	switch (hdr->operation) {
+	case CTDB_REQ_CALL:
+		ctdb->statistics.client.req_call++;
+		daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
+		break;
+
+	case CTDB_REQ_MESSAGE:
+		ctdb->statistics.client.req_message++;
+		daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
+		break;
+
+	case CTDB_REQ_CONTROL:
+		ctdb->statistics.client.req_control++;
+		daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
+		break;
+
+	default:
+		DEBUG(0,(__location__ " daemon: unrecognized operation %u\n",
+			 hdr->operation));
+	}
+
+done:
+	talloc_free(tmp_ctx);
+}
+
+/*
+  called when the daemon gets a incoming packet
+ */
+static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
+{
+	struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
+	struct ctdb_req_header *hdr;
+
+	if (cnt == 0) {
+		talloc_free(client);
+		return;
+	}
+
+	client->ctdb->statistics.client_packets_recv++;
+
+	if (cnt < sizeof(*hdr)) {
+		ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
+			       (unsigned)cnt);
+		return;
+	}
+	hdr = (struct ctdb_req_header *)data;
+	if (cnt != hdr->length) {
+		ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
+			       (unsigned)hdr->length, (unsigned)cnt);
+		return;
+	}
+
+	if (hdr->ctdb_magic != CTDB_MAGIC) {
+		ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
+		return;
+	}
+
+	if (hdr->ctdb_version != CTDB_VERSION) {
+		ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
+		return;
+	}
+
+	DEBUG(3,(__location__ " client request %u of type %u length %u from "
+		 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
+		 hdr->srcnode, hdr->destnode));
+
+	/* it is the responsibility of the incoming packet function to free 'data' */
+	daemon_incoming_packet(client, hdr);
+}
+
+static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
+			 uint16_t flags, void *private_data)
+{
+	struct sockaddr_in addr;
+	socklen_t len;
+	int fd;
+	struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
+	struct ctdb_client *client;
+
+	memset(&addr, 0, sizeof(addr));
+	len = sizeof(addr);
+	fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
+	if (fd == -1) {
+		return;
+	}
+
+	set_nonblocking(fd);
+	set_close_on_exec(fd);
+
+	client = talloc_zero(ctdb, struct ctdb_client);
+	client->ctdb = ctdb;
+	client->fd = fd;
+	client->client_id = ctdb_reqid_new(ctdb, client);
+	ctdb->statistics.num_clients++;
+
+	client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
+					 ctdb_daemon_read_cb, client);
+
+	talloc_set_destructor(client, ctdb_client_destructor);
+}
+
+
+
+/*
+  create a unix domain socket and bind it
+  return a file descriptor open on the socket 
+*/
+static int ux_socket_bind(struct ctdb_context *ctdb)
+{
+	struct sockaddr_un addr;
+
+	ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
+	if (ctdb->daemon.sd == -1) {
+		return -1;
+	}
+
+	set_nonblocking(ctdb->daemon.sd);
+	set_close_on_exec(ctdb->daemon.sd);
+
+#if 0
+	/* AIX doesn't like this :( */
+	if (fchown(ctdb->daemon.sd, geteuid(), getegid()) != 0 ||
+	    fchmod(ctdb->daemon.sd, 0700) != 0) {
+		DEBUG(0,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n"));
+		goto failed;
+	}
+#endif
+
+	set_nonblocking(ctdb->daemon.sd);
+
+	memset(&addr, 0, sizeof(addr));
+	addr.sun_family = AF_UNIX;
+	strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
+
+	if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
+		DEBUG(0,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
+		goto failed;
+	}	
+	if (listen(ctdb->daemon.sd, 10) != 0) {
+		DEBUG(0,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
+		goto failed;
+	}
+
+	return 0;
+
+failed:
+	close(ctdb->daemon.sd);
+	ctdb->daemon.sd = -1;
+	return -1;	
+}
+
+/*
+  delete the socket on exit - called on destruction of autofree context
+ */
+static int unlink_destructor(const char *name)
+{
+	unlink(name);
+	return 0;
+}
+
+
+/*
+  start the protocol going as a daemon
+*/
+int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
+{
+	int res;
+	struct fd_event *fde;
+	const char *domain_socket_name;
+
+	/* get rid of any old sockets */
+	unlink(ctdb->daemon.name);
+
+	/* create a unix domain stream socket to listen to */
+	res = ux_socket_bind(ctdb);
+	if (res!=0) {
+		DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
+		exit(10);
+	}
+
+	if (do_fork && fork()) {
+		return 0;
+	}
+
+	tdb_reopen_all(False);
+
+	if (do_fork) {
+		setsid();
+	}
+	block_signal(SIGPIPE);
+
+	/* try to set us up as realtime */
+	ctdb_set_realtime(true);
+
+	/* ensure the socket is deleted on exit of the daemon */
+	domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
+	talloc_set_destructor(domain_socket_name, unlink_destructor);	
+
+	ctdb->ev = event_context_init(NULL);
+
+	/* start frozen, then let the first election sort things out */
+	if (!ctdb_blocking_freeze(ctdb)) {
+		DEBUG(0,("Failed to get initial freeze\n"));
+		exit(12);
+	}
+
+	/* force initial recovery for election */
+	ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
+
+	/* now start accepting clients, only can do this once frozen */
+	fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
+			   EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
+			   ctdb_accept_client, ctdb);
+
+	ctdb_main_loop(ctdb);
+
+	return 0;
+}
+
+/*
+  allocate a packet for use in daemon<->daemon communication
+ */
+struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
+						 TALLOC_CTX *mem_ctx, 
+						 enum ctdb_operation operation, 
+						 size_t length, size_t slength,
+						 const char *type)
+{
+	int size;
+	struct ctdb_req_header *hdr;
+
+	length = MAX(length, slength);
+	size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
+
+	hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
+	if (hdr == NULL) {
+		DEBUG(0,("Unable to allocate transport packet for operation %u of length %u\n",
+			 operation, (unsigned)length));
+		return NULL;
+	}
+	talloc_set_name_const(hdr, type);
+	memset(hdr, 0, slength);
+	hdr->length       = length;
+	hdr->operation    = operation;
+	hdr->ctdb_magic   = CTDB_MAGIC;
+	hdr->ctdb_version = CTDB_VERSION;
+	hdr->generation   = ctdb->vnn_map->generation;
+	hdr->srcnode      = ctdb->vnn;
+
+	return hdr;	
+}
+
+struct daemon_control_state {
+	struct daemon_control_state *next, *prev;
+	struct ctdb_client *client;
+	struct ctdb_req_control *c;
+	uint32_t reqid;
+	struct ctdb_node *node;
+};
+
+/*
+  callback when a control reply comes in
+ */
+static void daemon_control_callback(struct ctdb_context *ctdb,
+				    int32_t status, TDB_DATA data, 
+				    const char *errormsg,
+				    void *private_data)
+{
+	struct daemon_control_state *state = talloc_get_type(private_data, 
+							     struct daemon_control_state);
+	struct ctdb_client *client = state->client;
+	struct ctdb_reply_control *r;
+	size_t len;
+
+	/* construct a message to send to the client containing the data */
+	len = offsetof(struct ctdb_reply_control, data) + data.dsize;
+	if (errormsg) {
+		len += strlen(errormsg);
+	}
+	r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
+			       struct ctdb_reply_control);
+	CTDB_NO_MEMORY_VOID(ctdb, r);
+
+	r->hdr.reqid     = state->reqid;
+	r->status        = status;
+	r->datalen       = data.dsize;
+	r->errorlen = 0;
+	memcpy(&r->data[0], data.dptr, data.dsize);
+	if (errormsg) {
+		r->errorlen = strlen(errormsg);
+		memcpy(&r->data[r->datalen], errormsg, r->errorlen);
+	}
+
+	daemon_queue_send(client, &r->hdr);
+
+	talloc_free(state);
+}
+
+/*
+  fail all pending controls to a disconnected node
+ */
+void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
+{
+	struct daemon_control_state *state;
+	while ((state = node->pending_controls)) {
+		DLIST_REMOVE(node->pending_controls, state);
+		daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
+					"node is disconnected", state);
+	}
+}
+
+/*
+  destroy a daemon_control_state
+ */
+static int daemon_control_destructor(struct daemon_control_state *state)
+{
+	if (state->node) {
+		DLIST_REMOVE(state->node->pending_controls, state);
+	}
+	return 0;
+}
+
+/*
+  this is called when the ctdb daemon received a ctdb request control
+  from a local client over the unix domain socket
+ */
+static void daemon_request_control_from_client(struct ctdb_client *client, 
+					       struct ctdb_req_control *c)
+{
+	TDB_DATA data;
+	int res;
+	struct daemon_control_state *state;
+	TALLOC_CTX *tmp_ctx = talloc_new(client);
+
+	if (c->hdr.destnode == CTDB_CURRENT_NODE) {
+		c->hdr.destnode = client->ctdb->vnn;
+	}
+
+	state = talloc(client, struct daemon_control_state);
+	CTDB_NO_MEMORY_VOID(client->ctdb, state);
+
+	state->client = client;
+	state->c = talloc_steal(state, c);
+	state->reqid = c->hdr.reqid;
+	if (ctdb_validate_vnn(client->ctdb, c->hdr.destnode)) {
+		state->node = client->ctdb->nodes[c->hdr.destnode];
+		DLIST_ADD(state->node->pending_controls, state);
+	} else {
+		state->node = NULL;
+	}
+
+	talloc_set_destructor(state, daemon_control_destructor);
+
+	if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
+		talloc_steal(tmp_ctx, state);
+	}
+	
+	data.dptr = &c->data[0];
+	data.dsize = c->datalen;
+	res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
+				       c->srvid, c->opcode, client->client_id,
+				       c->flags,
+				       data, daemon_control_callback,
+				       state);
+	if (res != 0) {
+		DEBUG(0,(__location__ " Failed to send control to remote node %u\n",
+			 c->hdr.destnode));
+	}
+
+	talloc_free(tmp_ctx);
+}
+
+/*
+  register a call function
+*/
+int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
+			 ctdb_fn_t fn, int id)
+{
+	struct ctdb_registered_call *call;
+	struct ctdb_db_context *ctdb_db;
+
+	ctdb_db = find_ctdb_db(ctdb, db_id);
+	if (ctdb_db == NULL) {
+		return -1;
+	}
+
+	call = talloc(ctdb_db, struct ctdb_registered_call);
+	call->fn = fn;
+	call->id = id;
+
+	DLIST_ADD(ctdb_db->calls, call);	
+	return 0;
+}
+
+
+
+/*
+  this local messaging handler is ugly, but is needed to prevent
+  recursion in ctdb_send_message() when the destination node is the
+  same as the source node
+ */
+struct ctdb_local_message {
+	struct ctdb_context *ctdb;
+	uint64_t srvid;
+	TDB_DATA data;
+};
+
+static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
+				       struct timeval t, void *private_data)
+{
+	struct ctdb_local_message *m = talloc_get_type(private_data, 
+						       struct ctdb_local_message);
+	int res;
+
+	res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
+	if (res != 0) {
+		DEBUG(0, (__location__ " Failed to dispatch message for srvid=%llu\n", 
+			  (unsigned long long)m->srvid));
+	}
+	talloc_free(m);
+}
+
+static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
+{
+	struct ctdb_local_message *m;
+	m = talloc(ctdb, struct ctdb_local_message);
+	CTDB_NO_MEMORY(ctdb, m);
+
+	m->ctdb = ctdb;
+	m->srvid = srvid;
+	m->data  = data;
+	m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
+	if (m->data.dptr == NULL) {
+		talloc_free(m);
+		return -1;
+	}
+
+	/* this needs to be done as an event to prevent recursion */
+	event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
+	return 0;
+}
+
+/*
+  send a ctdb message
+*/
+int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn,
+			     uint64_t srvid, TDB_DATA data)
+{
+	struct ctdb_req_message *r;
+	int len;
+
+	/* see if this is a message to ourselves */
+	if (vnn == ctdb->vnn) {
+		return ctdb_local_message(ctdb, srvid, data);
+	}
+
+	len = offsetof(struct ctdb_req_message, data) + data.dsize;
+	r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
+				    struct ctdb_req_message);
+	CTDB_NO_MEMORY(ctdb, r);
+
+	r->hdr.destnode  = vnn;
+	r->srvid         = srvid;
+	r->datalen       = data.dsize;
+	memcpy(&r->data[0], data.dptr, data.dsize);
+
+	ctdb_queue_packet(ctdb, &r->hdr);
+
+	talloc_free(r);
+	return 0;
+}
+
diff --git a/source4/cluster/ctdb/server/ctdb_freeze.c b/source4/cluster/ctdb/server/ctdb_freeze.c
new file mode 100644
index 0000000000..42ad975b53
--- /dev/null
+++ b/source4/cluster/ctdb/server/ctdb_freeze.c
@@ -0,0 +1,256 @@
+/* 
+   ctdb freeze handling
+
+   Copyright (C) Andrew Tridgell  2007
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/tdb/include/tdb.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "system/wait.h"
+#include "../include/ctdb_private.h"
+#include "lib/util/dlinklist.h"
+#include "db_wrap.h"
+
+
+/*
+  lock all databases
+ */
+static int ctdb_lock_all_databases(struct ctdb_context *ctdb)
+{
+	struct ctdb_db_context *ctdb_db;
+	for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
+		if (tdb_lockall(ctdb_db->ltdb->tdb) != 0) {
+			return -1;
+		}
+	}
+	return 0;
+}
+
+/*
+  a list of control requests waiting for a freeze lock child to get
+  the database locks
+ */
+struct ctdb_freeze_waiter {
+	struct ctdb_freeze_waiter *next, *prev;
+	struct ctdb_context *ctdb;
+	struct ctdb_req_control *c;
+	int32_t status;
+};
+
+/* a handle to a freeze lock child process */
+struct ctdb_freeze_handle {
+	struct ctdb_context *ctdb;
+	pid_t child;
+	int fd;
+	struct ctdb_freeze_waiter *waiters;
+};
+
+/*
+  destroy a freeze handle
+ */	
+static int ctdb_freeze_handle_destructor(struct ctdb_freeze_handle *h)
+{
+	h->ctdb->freeze_mode = CTDB_FREEZE_NONE;
+	kill(h->child, SIGKILL);
+	waitpid(h->child, NULL, 0);
+	return 0;
+}
+
+/*
+  called when the child writes its status to us
+ */
+static void ctdb_freeze_lock_handler(struct event_context *ev, struct fd_event *fde, 
+				       uint16_t flags, void *private_data)
+{
+	struct ctdb_freeze_handle *h = talloc_get_type(private_data, struct ctdb_freeze_handle);
+	int32_t status;
+	struct ctdb_freeze_waiter *w;
+
+	if (h->ctdb->freeze_mode == CTDB_FREEZE_FROZEN) {
+		DEBUG(0,("freeze child died - unfreezing\n"));
+		talloc_free(h);
+		return;
+	}
+
+	if (read(h->fd, &status, sizeof(status)) != sizeof(status)) {
+		DEBUG(0,("read error from freeze lock child\n"));
+		status = -1;
+	}
+
+	if (status == -1) {
+		DEBUG(0,("Failed to get locks in ctdb_freeze_child\n"));
+		/* we didn't get the locks - destroy the handle */
+		talloc_free(h);
+		return;
+	}
+
+	h->ctdb->freeze_mode = CTDB_FREEZE_FROZEN;
+
+	/* notify the waiters */
+	while ((w = h->ctdb->freeze_handle->waiters)) {
+		w->status = status;
+		DLIST_REMOVE(h->ctdb->freeze_handle->waiters, w);
+		talloc_free(w);
+	}
+}
+
+/*
+  create a child which gets locks on all the open databases, then calls the callback telling the parent
+  that it is done
+ */
+static struct ctdb_freeze_handle *ctdb_freeze_lock(struct ctdb_context *ctdb)
+{
+	struct ctdb_freeze_handle *h;
+	int fd[2];
+	struct fd_event *fde;
+
+	h = talloc_zero(ctdb, struct ctdb_freeze_handle);
+	CTDB_NO_MEMORY_VOID(ctdb, h);
+
+	h->ctdb = ctdb;
+
+	/* use socketpair() instead of pipe() so we have bi-directional fds */
+	if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd) != 0) {
+		DEBUG(0,("Failed to create pipe for ctdb_freeze_lock\n"));
+		talloc_free(h);
+		return NULL;
+	}
+	
+	h->child = fork();
+	if (h->child == -1) {
+		DEBUG(0,("Failed to fork child for ctdb_freeze_lock\n"));
+		talloc_free(h);
+		return NULL;
+	}
+
+	if (h->child == 0) {
+		int ret;
+		/* in the child */
+		close(fd[0]);
+		ret = ctdb_lock_all_databases(ctdb);
+		if (ret != 0) {
+			_exit(0);
+		}
+		write(fd[1], &ret, sizeof(ret));
+		/* the read here means we will die if the parent exits */
+		read(fd[1], &ret, sizeof(ret));
+		_exit(0);
+	}
+
+	talloc_set_destructor(h, ctdb_freeze_handle_destructor);
+
+	close(fd[1]);
+
+	h->fd = fd[0];
+
+	fde = event_add_fd(ctdb->ev, h, h->fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
+			   ctdb_freeze_lock_handler, h);
+	if (fde == NULL) {
+		DEBUG(0,("Failed to setup fd event for ctdb_freeze_lock\n"));
+		close(fd[0]);
+		talloc_free(h);
+		return NULL;
+	}
+
+	return h;
+}
+
+/*
+  destroy a waiter for a freeze mode change
+ */
+static int ctdb_freeze_waiter_destructor(struct ctdb_freeze_waiter *w)
+{
+	DLIST_REMOVE(w->ctdb->freeze_handle->waiters, w);
+	ctdb_request_control_reply(w->ctdb, w->c, NULL, w->status, NULL);
+	return 0;
+}
+
+/*
+  start the freeze process
+ */
+void ctdb_start_freeze(struct ctdb_context *ctdb)
+{
+	if (ctdb->freeze_mode == CTDB_FREEZE_FROZEN) {
+		/* we're already frozen */
+		return;
+	}
+
+	/* if there isn't a freeze lock child then create one */
+	if (!ctdb->freeze_handle) {
+		ctdb->freeze_handle = ctdb_freeze_lock(ctdb);
+		CTDB_NO_MEMORY_VOID(ctdb, ctdb->freeze_handle);
+		ctdb->freeze_mode = CTDB_FREEZE_PENDING;
+	}
+}
+
+/*
+  freeze the databases
+ */
+int32_t ctdb_control_freeze(struct ctdb_context *ctdb, struct ctdb_req_control *c, bool *async_reply)
+{
+	struct ctdb_freeze_waiter *w;
+
+	if (ctdb->freeze_mode == CTDB_FREEZE_FROZEN) {
+		/* we're already frozen */
+		return 0;
+	}
+
+	ctdb_start_freeze(ctdb);
+
+	/* add ourselves to list of waiters */
+	w = talloc(ctdb->freeze_handle, struct ctdb_freeze_waiter);
+	CTDB_NO_MEMORY(ctdb, w);
+	w->ctdb   = ctdb;
+	w->c      = talloc_steal(w, c);
+	w->status = -1;
+	talloc_set_destructor(w, ctdb_freeze_waiter_destructor);
+	DLIST_ADD(ctdb->freeze_handle->waiters, w);
+
+	/* we won't reply till later */
+	*async_reply = True;
+	return 0;
+}
+
+
+/*
+  block until we are frozen, used during daemon startup
+ */
+bool ctdb_blocking_freeze(struct ctdb_context *ctdb)
+{
+	ctdb_start_freeze(ctdb);
+
+	/* block until frozen */
+	while (ctdb->freeze_mode == CTDB_FREEZE_PENDING) {
+		event_loop_once(ctdb->ev);
+	}
+
+	return ctdb->freeze_mode == CTDB_FREEZE_FROZEN;
+}
+
+
+
+/*
+  thaw the databases
+ */
+int32_t ctdb_control_thaw(struct ctdb_context *ctdb)
+{
+	talloc_free(ctdb->freeze_handle);
+	ctdb->freeze_handle = NULL;
+	ctdb_call_resend_all(ctdb);
+	return 0;
+}
diff --git a/source4/cluster/ctdb/server/ctdb_lockwait.c b/source4/cluster/ctdb/server/ctdb_lockwait.c
new file mode 100644
index 0000000000..5b0019836e
--- /dev/null
+++ b/source4/cluster/ctdb/server/ctdb_lockwait.c
@@ -0,0 +1,165 @@
+/* 
+   wait for a tdb chain lock
+
+   Copyright (C) Andrew Tridgell  2006
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/filesys.h"
+#include "system/wait.h"
+#include "db_wrap.h"
+#include "lib/tdb/include/tdb.h"
+#include "../include/ctdb_private.h"
+
+
+struct lockwait_handle {
+	struct ctdb_context *ctdb;
+	struct ctdb_db_context *ctdb_db;
+	struct fd_event *fde;
+	int fd[2];
+	pid_t child;
+	void *private_data;
+	void (*callback)(void *);
+	TDB_DATA key;
+	struct timeval start_time;
+};
+
+static void lockwait_handler(struct event_context *ev, struct fd_event *fde, 
+			     uint16_t flags, void *private_data)
+{
+	struct lockwait_handle *h = talloc_get_type(private_data, 
+						     struct lockwait_handle);
+	void (*callback)(void *) = h->callback;
+	void *p = h->private_data;
+	pid_t child = h->child;
+	TDB_DATA key = h->key;
+	struct tdb_context *tdb = h->ctdb_db->ltdb->tdb;
+	TALLOC_CTX *tmp_ctx = talloc_new(ev);
+
+	key.dptr = talloc_memdup(tmp_ctx, key.dptr, key.dsize);
+
+	talloc_set_destructor(h, NULL);
+	ctdb_latency(&h->ctdb->statistics.max_lockwait_latency, h->start_time);
+	h->ctdb->statistics.pending_lockwait_calls--;
+
+	/* the handle needs to go away when the context is gone - when
+	   the handle goes away this implicitly closes the pipe, which
+	   kills the child holding the lock */
+	talloc_steal(tmp_ctx, h);
+
+	if (h->ctdb->flags & CTDB_FLAG_TORTURE) {
+		if (tdb_chainlock_nonblock(tdb, key) == 0) {
+			ctdb_fatal(h->ctdb, "got chain lock while lockwait child active");
+		}
+	}
+
+	tdb_chainlock_mark(tdb, key);
+	callback(p);
+	tdb_chainlock_unmark(tdb, key);
+
+	kill(child, SIGKILL);
+	waitpid(child, NULL, 0);
+	talloc_free(tmp_ctx);
+}
+
+static int lockwait_destructor(struct lockwait_handle *h)
+{
+	h->ctdb->statistics.pending_lockwait_calls--;
+	kill(h->child, SIGKILL);
+	waitpid(h->child, NULL, 0);
+	return 0;
+}
+
+/*
+  setup a non-blocking chainlock on a tdb record. If this function
+  returns NULL then it could not get the chainlock. Otherwise it
+  returns a opaque handle, and will call callback() once it has
+  managed to get the chainlock. You can cancel it by using talloc_free
+  on the returned handle.
+
+  It is the callers responsibility to unlock the chainlock once
+  acquired
+ */
+struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db,
+				      TDB_DATA key,
+				      void (*callback)(void *private_data),
+				      void *private_data)
+{
+	struct lockwait_handle *result;
+	int ret;
+	pid_t parent = getpid();
+
+	ctdb_db->ctdb->statistics.lockwait_calls++;
+	ctdb_db->ctdb->statistics.pending_lockwait_calls++;
+
+	if (!(result = talloc_zero(private_data, struct lockwait_handle))) {
+		ctdb_db->ctdb->statistics.pending_lockwait_calls--;
+		return NULL;
+	}
+
+	ret = pipe(result->fd);
+
+	if (ret != 0) {
+		talloc_free(result);
+		ctdb_db->ctdb->statistics.pending_lockwait_calls--;
+		return NULL;
+	}
+
+	result->child = fork();
+
+	if (result->child == (pid_t)-1) {
+		close(result->fd[0]);
+		close(result->fd[1]);
+		talloc_free(result);
+		ctdb_db->ctdb->statistics.pending_lockwait_calls--;
+		return NULL;
+	}
+
+	result->callback = callback;
+	result->private_data = private_data;
+	result->ctdb = ctdb_db->ctdb;
+	result->ctdb_db = ctdb_db;
+	result->key = key;
+
+	if (result->child == 0) {
+		char c = 0;
+		close(result->fd[0]);
+		tdb_chainlock(ctdb_db->ltdb->tdb, key);
+		write(result->fd[1], &c, 1);
+		/* make sure we die when our parent dies */
+		while (kill(parent, 0) == 0 || errno != ESRCH) {
+			sleep(5);
+		}
+		_exit(0);
+	}
+
+	close(result->fd[1]);
+	talloc_set_destructor(result, lockwait_destructor);
+
+	result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
+				   EVENT_FD_READ|EVENT_FD_AUTOCLOSE, lockwait_handler,
+				   (void *)result);
+	if (result->fde == NULL) {
+		talloc_free(result);
+		ctdb_db->ctdb->statistics.pending_lockwait_calls--;
+		return NULL;
+	}
+
+	result->start_time = timeval_current();
+
+	return result;
+}
diff --git a/source4/cluster/ctdb/server/ctdb_ltdb_server.c b/source4/cluster/ctdb/server/ctdb_ltdb_server.c
new file mode 100644
index 0000000000..bd07f674db
--- /dev/null
+++ b/source4/cluster/ctdb/server/ctdb_ltdb_server.c
@@ -0,0 +1,366 @@
+/* 
+   ctdb ltdb code - server side
+
+   Copyright (C) Andrew Tridgell  2007
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/tdb/include/tdb.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "../include/ctdb_private.h"
+#include "db_wrap.h"
+#include "lib/util/dlinklist.h"
+
+/*
+  this is the dummy null procedure that all databases support
+*/
+static int ctdb_null_func(struct ctdb_call_info *call)
+{
+	return 0;
+}
+
+/*
+  this is a plain fetch procedure that all databases support
+*/
+static int ctdb_fetch_func(struct ctdb_call_info *call)
+{
+	call->reply_data = &call->record_data;
+	return 0;
+}
+
+
+
+struct lock_fetch_state {
+	struct ctdb_context *ctdb;
+	void (*recv_pkt)(void *, struct ctdb_req_header *);
+	void *recv_context;
+	struct ctdb_req_header *hdr;
+	uint32_t generation;
+	bool ignore_generation;
+};
+
+/*
+  called when we should retry the operation
+ */
+static void lock_fetch_callback(void *p)
+{
+	struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
+	if (!state->ignore_generation &&
+	    state->generation != state->ctdb->vnn_map->generation) {
+		DEBUG(0,("Discarding previous generation lockwait packet\n"));
+		talloc_free(state->hdr);
+		return;
+	}
+	state->recv_pkt(state->recv_context, state->hdr);
+	DEBUG(2,(__location__ " PACKET REQUEUED\n"));
+}
+
+
+/*
+  do a non-blocking ltdb_lock, deferring this ctdb request until we
+  have the chainlock
+
+  It does the following:
+
+   1) tries to get the chainlock. If it succeeds, then it returns 0
+
+   2) if it fails to get a chainlock immediately then it sets up a
+   non-blocking chainlock via ctdb_lockwait, and when it gets the
+   chainlock it re-submits this ctdb request to the main packet
+   receive function
+
+   This effectively queues all ctdb requests that cannot be
+   immediately satisfied until it can get the lock. This means that
+   the main ctdb daemon will not block waiting for a chainlock held by
+   a client
+
+   There are 3 possible return values:
+
+       0:    means that it got the lock immediately.
+      -1:    means that it failed to get the lock, and won't retry
+      -2:    means that it failed to get the lock immediately, but will retry
+ */
+int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
+			   TDB_DATA key, struct ctdb_req_header *hdr,
+			   void (*recv_pkt)(void *, struct ctdb_req_header *),
+			   void *recv_context, bool ignore_generation)
+{
+	int ret;
+	struct tdb_context *tdb = ctdb_db->ltdb->tdb;
+	struct lockwait_handle *h;
+	struct lock_fetch_state *state;
+	
+	ret = tdb_chainlock_nonblock(tdb, key);
+
+	if (ret != 0 &&
+	    !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
+		/* a hard failure - don't try again */
+		return -1;
+	}
+
+	/* when torturing, ensure we test the contended path */
+	if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
+	    random() % 5 == 0) {
+		ret = -1;
+		tdb_chainunlock(tdb, key);
+	}
+
+	/* first the non-contended path */
+	if (ret == 0) {
+		return 0;
+	}
+
+	state = talloc(hdr, struct lock_fetch_state);
+	state->ctdb = ctdb_db->ctdb;
+	state->hdr = hdr;
+	state->recv_pkt = recv_pkt;
+	state->recv_context = recv_context;
+	state->generation = ctdb_db->ctdb->vnn_map->generation;
+	state->ignore_generation = ignore_generation;
+
+	/* now the contended path */
+	h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
+	if (h == NULL) {
+		tdb_chainunlock(tdb, key);
+		return -1;
+	}
+
+	/* we need to move the packet off the temporary context in ctdb_input_pkt(),
+	   so it won't be freed yet */
+	talloc_steal(state, hdr);
+	talloc_steal(state, h);
+
+	/* now tell the caller than we will retry asynchronously */
+	return -2;
+}
+
+/*
+  a varient of ctdb_ltdb_lock_requeue that also fetches the record
+ */
+int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
+				 TDB_DATA key, struct ctdb_ltdb_header *header, 
+				 struct ctdb_req_header *hdr, TDB_DATA *data,
+				 void (*recv_pkt)(void *, struct ctdb_req_header *),
+				 void *recv_context, bool ignore_generation)
+{
+	int ret;
+
+	ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
+				     recv_context, ignore_generation);
+	if (ret == 0) {
+		ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
+		if (ret != 0) {
+			ctdb_ltdb_unlock(ctdb_db, key);
+		}
+	}
+	return ret;
+}
+
+
+/*
+  paraoid check to see if the db is empty
+ */
+static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
+{
+	struct tdb_context *tdb = ctdb_db->ltdb->tdb;
+	int count = tdb_traverse_read(tdb, NULL, NULL);
+	if (count != 0) {
+		DEBUG(0,(__location__ " tdb '%s' not empty on attach! aborting\n",
+			 ctdb_db->db_path));
+		ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
+	}
+}
+
+/*
+  a client has asked to attach a new database
+ */
+int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
+			       TDB_DATA *outdata)
+{
+	const char *db_name = (const char *)indata.dptr;
+	struct ctdb_db_context *ctdb_db, *tmp_db;
+	int ret;
+
+	/* see if we already have this name */
+	for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
+		if (strcmp(db_name, tmp_db->db_name) == 0) {
+			/* this is not an error */
+			outdata->dptr  = (uint8_t *)&tmp_db->db_id;
+			outdata->dsize = sizeof(tmp_db->db_id);
+			return 0;
+		}
+	}
+
+	ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
+	CTDB_NO_MEMORY(ctdb, ctdb_db);
+
+	ctdb_db->ctdb = ctdb;
+	ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
+	CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
+
+	ctdb_db->db_id = ctdb_hash(&indata);
+
+	outdata->dptr  = (uint8_t *)&ctdb_db->db_id;
+	outdata->dsize = sizeof(ctdb_db->db_id);
+
+	/* check for hash collisions */
+	for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
+		if (tmp_db->db_id == ctdb_db->db_id) {
+			DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
+				 tmp_db->db_id, db_name, tmp_db->db_name));
+			talloc_free(ctdb_db);
+			return -1;
+		}
+	}
+
+	if (ctdb->db_directory == NULL) {
+		ctdb->db_directory = VARDIR "/ctdb";
+	}
+
+	/* make sure the db directory exists */
+	if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
+		DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n", 
+			 ctdb->db_directory));
+		talloc_free(ctdb_db);
+		return -1;
+	}
+
+	/* open the database */
+	ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
+					   ctdb->db_directory, 
+					   db_name, ctdb->vnn);
+
+	ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
+				      ctdb->tunable.database_hash_size, 
+				      TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
+	if (ctdb_db->ltdb == NULL) {
+		DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
+		talloc_free(ctdb_db);
+		return -1;
+	}
+
+	ctdb_check_db_empty(ctdb_db);
+
+	DLIST_ADD(ctdb->db_list, ctdb_db);
+
+	/* 
+	   all databases support the "null" function. we need this in
+	   order to do forced migration of records
+	*/
+	ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
+	if (ret != 0) {
+		DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
+		talloc_free(ctdb_db);
+		return -1;
+	}
+
+	/* 
+	   all databases support the "fetch" function. we need this
+	   for efficient Samba3 ctdb fetch
+	*/
+	ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
+	if (ret != 0) {
+		DEBUG(0,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
+		talloc_free(ctdb_db);
+		return -1;
+	}
+	
+	/* tell all the other nodes about this database */
+	ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
+				 CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
+				 indata, NULL, NULL);
+
+	DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
+
+	/* success */
+	return 0;
+}
+
+/*
+  called when a broadcast seqnum update comes in
+ */
+int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
+{
+	struct ctdb_db_context *ctdb_db;
+	if (srcnode == ctdb->vnn) {
+		/* don't update ourselves! */
+		return 0;
+	}
+
+	ctdb_db = find_ctdb_db(ctdb, db_id);
+	if (!ctdb_db) {
+		DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
+		return -1;
+	}
+
+	tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
+	ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
+	return 0;
+}
+
+/*
+  timer to check for seqnum changes in a ltdb and propogate them
+ */
+static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
+				   struct timeval t, void *p)
+{
+	struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
+	struct ctdb_context *ctdb = ctdb_db->ctdb;
+	uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
+	if (new_seqnum != ctdb_db->seqnum) {
+		/* something has changed - propogate it */
+		TDB_DATA data;
+		data.dptr = (uint8_t *)&ctdb_db->db_id;
+		data.dsize = sizeof(uint32_t);
+		ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
+					 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
+					 data, NULL, NULL);		
+	}
+	ctdb_db->seqnum = new_seqnum;
+
+	/* setup a new timer */
+	ctdb_db->te = 
+		event_add_timed(ctdb->ev, ctdb_db, 
+				timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
+				ctdb_ltdb_seqnum_check, ctdb_db);
+}
+
+/*
+  enable seqnum handling on this db
+ */
+int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
+{
+	struct ctdb_db_context *ctdb_db;
+	ctdb_db = find_ctdb_db(ctdb, db_id);
+	if (!ctdb_db) {
+		DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
+		return -1;
+	}
+
+	if (ctdb_db->te == NULL) {
+		ctdb_db->te = 
+			event_add_timed(ctdb->ev, ctdb_db, 
+					timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
+					ctdb_ltdb_seqnum_check, ctdb_db);
+	}
+
+	tdb_enable_seqnum(ctdb_db->ltdb->tdb);
+	ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
+	return 0;
+}
+
diff --git a/source4/cluster/ctdb/server/ctdb_monitor.c b/source4/cluster/ctdb/server/ctdb_monitor.c
new file mode 100644
index 0000000000..ec5244703c
--- /dev/null
+++ b/source4/cluster/ctdb/server/ctdb_monitor.c
@@ -0,0 +1,227 @@
+/* 
+   monitoring links to all other nodes to detect dead nodes
+
+
+   Copyright (C) Ronnie Sahlberg 2007
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/filesys.h"
+#include "system/wait.h"
+#include "../include/ctdb_private.h"
+
+/*
+  see if any nodes are dead
+ */
+static void ctdb_check_for_dead_nodes(struct event_context *ev, struct timed_event *te, 
+				      struct timeval t, void *private_data)
+{
+	struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
+	int i;
+
+	if (ctdb->monitoring_mode == CTDB_MONITORING_DISABLED) {
+		event_add_timed(ctdb->ev, ctdb->monitor_context, 
+			timeval_current_ofs(ctdb->tunable.keepalive_interval, 0), 
+			ctdb_check_for_dead_nodes, ctdb);
+		return;
+	}
+
+	/* send a keepalive to all other nodes, unless */
+	for (i=0;i<ctdb->num_nodes;i++) {
+		struct ctdb_node *node = ctdb->nodes[i];
+		if (node->vnn == ctdb->vnn) {
+			continue;
+		}
+		
+		if (node->flags & NODE_FLAGS_DISCONNECTED) {
+			/* it might have come alive again */
+			if (node->rx_cnt != 0) {
+				ctdb_node_connected(node);
+			}
+			continue;
+		}
+
+
+		if (node->rx_cnt == 0) {
+			node->dead_count++;
+		} else {
+			node->dead_count = 0;
+		}
+
+		node->rx_cnt = 0;
+
+		if (node->dead_count >= ctdb->tunable.keepalive_limit) {
+			DEBUG(0,("dead count reached for node %u\n", node->vnn));
+			ctdb_node_dead(node);
+			ctdb_send_keepalive(ctdb, node->vnn);
+			/* maybe tell the transport layer to kill the
+			   sockets as well?
+			*/
+			continue;
+		}
+		
+		if (node->tx_cnt == 0) {
+			DEBUG(5,("sending keepalive to %u\n", node->vnn));
+			ctdb_send_keepalive(ctdb, node->vnn);
+		}
+
+		node->tx_cnt = 0;
+	}
+	
+	event_add_timed(ctdb->ev, ctdb->monitor_context, 
+			timeval_current_ofs(ctdb->tunable.keepalive_interval, 0), 
+			ctdb_check_for_dead_nodes, ctdb);
+}
+
+static void ctdb_check_health(struct event_context *ev, struct timed_event *te, 
+			      struct timeval t, void *private_data);
+
+/*
+  called when a health monitoring event script finishes
+ */
+static void ctdb_health_callback(struct ctdb_context *ctdb, int status, void *p)
+{
+	struct ctdb_node *node = ctdb->nodes[ctdb->vnn];
+	TDB_DATA data;
+	struct ctdb_node_flag_change c;
+
+	event_add_timed(ctdb->ev, ctdb->monitor_context, 
+			timeval_current_ofs(ctdb->tunable.monitor_interval, 0), 
+			ctdb_check_health, ctdb);
+
+	if (status != 0 && !(node->flags & NODE_FLAGS_UNHEALTHY)) {
+		DEBUG(0,("monitor event failed - disabling node\n"));
+		node->flags |= NODE_FLAGS_UNHEALTHY;
+	} else if (status == 0 && (node->flags & NODE_FLAGS_UNHEALTHY)) {
+		DEBUG(0,("monitor event OK - node re-enabled\n"));
+		ctdb->nodes[ctdb->vnn]->flags &= ~NODE_FLAGS_UNHEALTHY;
+	} else {
+		/* no change */
+		return;
+	}
+
+	c.vnn = ctdb->vnn;
+	c.flags = node->flags;
+
+	data.dptr = (uint8_t *)&c;
+	data.dsize = sizeof(c);
+
+	/* tell the other nodes that something has changed */
+	ctdb_daemon_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
+				 CTDB_SRVID_NODE_FLAGS_CHANGED, data);
+
+}
+
+
+/*
+  see if the event scripts think we are healthy
+ */
+static void ctdb_check_health(struct event_context *ev, struct timed_event *te, 
+			      struct timeval t, void *private_data)
+{
+	struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
+	int ret;
+
+	if (ctdb->monitoring_mode == CTDB_MONITORING_DISABLED) {
+		event_add_timed(ctdb->ev, ctdb->monitor_context,
+				timeval_current_ofs(ctdb->tunable.monitor_interval, 0), 
+				ctdb_check_health, ctdb);
+		return;
+	}
+	
+	ret = ctdb_event_script_callback(ctdb, 
+					 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
+					 ctdb->monitor_context, ctdb_health_callback, ctdb, "monitor");
+	if (ret != 0) {
+		DEBUG(0,("Unable to launch monitor event script\n"));
+		event_add_timed(ctdb->ev, ctdb->monitor_context, 
+				timeval_current_ofs(ctdb->tunable.monitor_interval, 0), 
+				ctdb_check_health, ctdb);
+	}	
+}
+
+/* stop any monitoring */
+void ctdb_stop_monitoring(struct ctdb_context *ctdb)
+{
+	talloc_free(ctdb->monitor_context);
+	ctdb->monitor_context = talloc_new(ctdb);
+	CTDB_NO_MEMORY_FATAL(ctdb, ctdb->monitor_context);
+}
+
+/*
+  start watching for nodes that might be dead
+ */
+void ctdb_start_monitoring(struct ctdb_context *ctdb)
+{
+	struct timed_event *te;
+
+	ctdb_stop_monitoring(ctdb);
+
+	te = event_add_timed(ctdb->ev, ctdb->monitor_context,
+			     timeval_current_ofs(ctdb->tunable.keepalive_interval, 0), 
+			     ctdb_check_for_dead_nodes, ctdb);
+	CTDB_NO_MEMORY_FATAL(ctdb, te);
+
+	te = event_add_timed(ctdb->ev, ctdb->monitor_context,
+			     timeval_current_ofs(ctdb->tunable.monitor_interval, 0), 
+			     ctdb_check_health, ctdb);
+	CTDB_NO_MEMORY_FATAL(ctdb, te);
+}
+
+
+/*
+  modify flags on a node
+ */
+int32_t ctdb_control_modflags(struct ctdb_context *ctdb, TDB_DATA indata)
+{
+	struct ctdb_node_modflags *m = (struct ctdb_node_modflags *)indata.dptr;
+	TDB_DATA data;
+	struct ctdb_node_flag_change c;
+	struct ctdb_node *node = ctdb->nodes[ctdb->vnn];
+	uint32_t old_flags = node->flags;
+
+	node->flags |= m->set;
+	node->flags &= ~m->clear;
+
+	if (node->flags == old_flags) {
+		/* no change */
+		return 0;
+	}
+
+	DEBUG(0, ("Control modflags on node %u - flags now 0x%x\n", ctdb->vnn, node->flags));
+
+	/* if we have been banned, go into recovery mode */
+	c.vnn = ctdb->vnn;
+	c.flags = node->flags;
+
+	data.dptr = (uint8_t *)&c;
+	data.dsize = sizeof(c);
+
+	/* tell the other nodes that something has changed */
+	ctdb_daemon_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
+				 CTDB_SRVID_NODE_FLAGS_CHANGED, data);
+
+	if ((node->flags & NODE_FLAGS_BANNED) && !(old_flags & NODE_FLAGS_BANNED)) {
+		/* make sure we are frozen */
+		DEBUG(0,("This node has been banned - forcing freeze and recovery\n"));
+		ctdb_start_freeze(ctdb);
+		ctdb_release_all_ips(ctdb);
+		ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
+	}
+	
+	return 0;
+}
diff --git a/source4/cluster/ctdb/server/ctdb_recover.c b/source4/cluster/ctdb/server/ctdb_recover.c
new file mode 100644
index 0000000000..82338d48ce
--- /dev/null
+++ b/source4/cluster/ctdb/server/ctdb_recover.c
@@ -0,0 +1,680 @@
+/* 
+   ctdb recovery code
+
+   Copyright (C) Andrew Tridgell  2007
+   Copyright (C) Ronnie Sahlberg  2007
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/tdb/include/tdb.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "system/wait.h"
+#include "../include/ctdb_private.h"
+#include "lib/util/dlinklist.h"
+#include "db_wrap.h"
+
+/*
+  lock all databases - mark only
+ */
+static int ctdb_lock_all_databases_mark(struct ctdb_context *ctdb)
+{
+	struct ctdb_db_context *ctdb_db;
+	if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
+		DEBUG(0,("Attempt to mark all databases locked when not frozen\n"));
+		return -1;
+	}
+	for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
+		if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
+			return -1;
+		}
+	}
+	return 0;
+}
+
+/*
+  lock all databases - unmark only
+ */
+static int ctdb_lock_all_databases_unmark(struct ctdb_context *ctdb)
+{
+	struct ctdb_db_context *ctdb_db;
+	if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
+		DEBUG(0,("Attempt to unmark all databases locked when not frozen\n"));
+		return -1;
+	}
+	for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
+		if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
+			return -1;
+		}
+	}
+	return 0;
+}
+
+
+int 
+ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
+{
+	CHECK_CONTROL_DATA_SIZE(0);
+	struct ctdb_vnn_map_wire *map;
+	size_t len;
+
+	len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
+	map = talloc_size(outdata, len);
+	CTDB_NO_MEMORY_VOID(ctdb, map);
+
+	map->generation = ctdb->vnn_map->generation;
+	map->size = ctdb->vnn_map->size;
+	memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
+
+	outdata->dsize = len;
+	outdata->dptr  = (uint8_t *)map;
+
+	return 0;
+}
+
+int 
+ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
+{
+	struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
+
+	if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
+		DEBUG(0,("Attempt to set vnnmap when not frozen\n"));
+		return -1;
+	}
+
+	talloc_free(ctdb->vnn_map);
+
+	ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
+	CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
+
+	ctdb->vnn_map->generation = map->generation;
+	ctdb->vnn_map->size       = map->size;
+	ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
+	CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
+
+	memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
+
+	return 0;
+}
+
+int 
+ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
+{
+	uint32_t i, len;
+	struct ctdb_db_context *ctdb_db;
+	struct ctdb_dbid_map *dbid_map;
+
+	CHECK_CONTROL_DATA_SIZE(0);
+
+	len = 0;
+	for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
+		len++;
+	}
+
+
+	outdata->dsize = offsetof(struct ctdb_dbid_map, dbids) + 4*len;
+	outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
+	if (!outdata->dptr) {
+		DEBUG(0, (__location__ " Failed to allocate dbmap array\n"));
+		exit(1);
+	}
+
+	dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
+	dbid_map->num = len;
+	for(i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
+		dbid_map->dbids[i] = ctdb_db->db_id;
+	}
+
+	return 0;
+}
+
+int 
+ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
+{
+	uint32_t i, num_nodes;
+	struct ctdb_node_map *node_map;
+
+	CHECK_CONTROL_DATA_SIZE(0);
+
+	num_nodes = ctdb->num_nodes;
+
+	outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
+	outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
+	if (!outdata->dptr) {
+		DEBUG(0, (__location__ " Failed to allocate nodemap array\n"));
+		exit(1);
+	}
+
+	node_map = (struct ctdb_node_map *)outdata->dptr;
+	node_map->num = num_nodes;
+	for (i=0; i<num_nodes; i++) {
+		inet_aton(ctdb->nodes[i]->address.address, &node_map->nodes[i].sin.sin_addr);
+		node_map->nodes[i].vnn   = ctdb->nodes[i]->vnn;
+		node_map->nodes[i].flags = ctdb->nodes[i]->flags;
+	}
+
+	return 0;
+}
+
+struct getkeys_params {
+	struct ctdb_context *ctdb;
+	uint32_t lmaster;
+	uint32_t rec_count;
+	struct getkeys_rec {
+		TDB_DATA key;
+		TDB_DATA data;
+	} *recs;
+};
+
+static int traverse_getkeys(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
+{
+	struct getkeys_params *params = (struct getkeys_params *)p;
+	uint32_t lmaster;
+
+	lmaster = ctdb_lmaster(params->ctdb, &key);
+
+	/* only include this record if the lmaster matches or if
+	   the wildcard lmaster (-1) was specified.
+	*/
+	if ((params->lmaster != CTDB_LMASTER_ANY) && (params->lmaster != lmaster)) {
+		return 0;
+	}
+
+	params->recs = talloc_realloc(NULL, params->recs, struct getkeys_rec, params->rec_count+1);
+	key.dptr = talloc_memdup(params->recs, key.dptr, key.dsize);
+	data.dptr = talloc_memdup(params->recs, data.dptr, data.dsize);
+	params->recs[params->rec_count].key = key;
+	params->recs[params->rec_count].data = data;
+	params->rec_count++;
+
+	return 0;
+}
+
+/*
+  pul a bunch of records from a ltdb, filtering by lmaster
+ */
+int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
+{
+	struct ctdb_control_pulldb *pull;
+	struct ctdb_db_context *ctdb_db;
+	struct getkeys_params params;
+	struct ctdb_control_pulldb_reply *reply;
+	int i;
+	size_t len = 0;
+
+	if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
+		DEBUG(0,("rejecting ctdb_control_pull_db when not frozen\n"));
+		return -1;
+	}
+
+	pull = (struct ctdb_control_pulldb *)indata.dptr;
+	
+	ctdb_db = find_ctdb_db(ctdb, pull->db_id);
+	if (!ctdb_db) {
+		DEBUG(0,(__location__ " Unknown db\n"));
+		return -1;
+	}
+
+	params.ctdb = ctdb;
+	params.lmaster = pull->lmaster;
+
+	params.rec_count = 0;
+	params.recs = talloc_array(outdata, struct getkeys_rec, 0);
+	CTDB_NO_MEMORY(ctdb, params.recs);
+
+	if (ctdb_lock_all_databases_mark(ctdb) != 0) {
+		DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
+		return -1;
+	}
+
+	tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_getkeys, &params);
+
+	ctdb_lock_all_databases_unmark(ctdb);
+
+	reply = talloc(outdata, struct ctdb_control_pulldb_reply);
+	CTDB_NO_MEMORY(ctdb, reply);
+
+	reply->db_id = pull->db_id;
+	reply->count = params.rec_count;
+
+	len = offsetof(struct ctdb_control_pulldb_reply, data);
+
+	for (i=0;i<reply->count;i++) {
+		struct ctdb_rec_data *rec;
+		rec = ctdb_marshall_record(outdata, 0, params.recs[i].key, params.recs[i].data);
+		reply = talloc_realloc_size(outdata, reply, rec->length + len);
+		memcpy(len+(uint8_t *)reply, rec, rec->length);
+		len += rec->length;
+		talloc_free(rec);
+	}
+
+	talloc_free(params.recs);
+
+	outdata->dptr = (uint8_t *)reply;
+	outdata->dsize = len;
+
+	return 0;
+}
+
+/*
+  push a bunch of records into a ltdb, filtering by rsn
+ */
+int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
+{
+	struct ctdb_control_pulldb_reply *reply = (struct ctdb_control_pulldb_reply *)indata.dptr;
+	struct ctdb_db_context *ctdb_db;
+	int i, ret;
+	struct ctdb_rec_data *rec;
+
+	if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
+		DEBUG(0,("rejecting ctdb_control_push_db when not frozen\n"));
+		return -1;
+	}
+
+	if (indata.dsize < offsetof(struct ctdb_control_pulldb_reply, data)) {
+		DEBUG(0,(__location__ " invalid data in pulldb reply\n"));
+		return -1;
+	}
+
+	ctdb_db = find_ctdb_db(ctdb, reply->db_id);
+	if (!ctdb_db) {
+		DEBUG(0,(__location__ " Unknown db 0x%08x\n", reply->db_id));
+		return -1;
+	}
+
+	if (ctdb_lock_all_databases_mark(ctdb) != 0) {
+		DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
+		return -1;
+	}
+
+	rec = (struct ctdb_rec_data *)&reply->data[0];
+
+	DEBUG(3,("starting push of %u records for dbid 0x%x\n",
+		 reply->count, reply->db_id));
+
+	for (i=0;i<reply->count;i++) {
+		TDB_DATA key, data;
+		struct ctdb_ltdb_header *hdr, header;
+
+		key.dptr = &rec->data[0];
+		key.dsize = rec->keylen;
+		data.dptr = &rec->data[key.dsize];
+		data.dsize = rec->datalen;
+
+		if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+			DEBUG(0,(__location__ " bad ltdb record\n"));
+			goto failed;
+		}
+		hdr = (struct ctdb_ltdb_header *)data.dptr;
+		data.dptr += sizeof(*hdr);
+		data.dsize -= sizeof(*hdr);
+
+		ret = ctdb_ltdb_fetch(ctdb_db, key, &header, NULL, NULL);
+		if (ret != 0) {
+			DEBUG(0, (__location__ " Unable to fetch record\n"));
+			goto failed;
+		}
+		/* The check for dmaster gives priority to the dmaster
+		   if the rsn values are equal */
+		if (header.rsn < hdr->rsn ||
+		    (header.dmaster != ctdb->vnn && header.rsn == hdr->rsn)) {
+			ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
+			if (ret != 0) {
+				DEBUG(0, (__location__ " Unable to store record\n"));
+				goto failed;
+			}
+		}
+
+		rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
+	}	    
+
+	DEBUG(3,("finished push of %u records for dbid 0x%x\n",
+		 reply->count, reply->db_id));
+
+	ctdb_lock_all_databases_unmark(ctdb);
+	return 0;
+
+failed:
+	ctdb_lock_all_databases_unmark(ctdb);
+	return -1;
+}
+
+
+static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
+{
+	uint32_t *dmaster = (uint32_t *)p;
+	struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
+	int ret;
+
+	header->dmaster = *dmaster;
+
+	ret = tdb_store(tdb, key, data, TDB_REPLACE);
+	if (ret) {
+		DEBUG(0,(__location__ " failed to write tdb data back  ret:%d\n",ret));
+		return ret;
+	}
+	return 0;
+}
+
+int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata)
+{
+	struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr;
+	struct ctdb_db_context *ctdb_db;
+
+	if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
+		DEBUG(0,("rejecting ctdb_control_set_dmaster when not frozen\n"));
+		return -1;
+	}
+
+	ctdb_db = find_ctdb_db(ctdb, p->db_id);
+	if (!ctdb_db) {
+		DEBUG(0,(__location__ " Unknown db 0x%08x\n", p->db_id));
+		return -1;
+	}
+
+	if (ctdb_lock_all_databases_mark(ctdb) != 0) {
+		DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
+		return -1;
+	}
+
+	tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster);
+
+	ctdb_lock_all_databases_unmark(ctdb);
+	
+	return 0;
+}
+
+struct ctdb_set_recmode_state {
+	struct ctdb_req_control *c;
+	uint32_t recmode;
+};
+
+/*
+  called when the 'recovered' event script has finished
+ */
+static void ctdb_recovered_callback(struct ctdb_context *ctdb, int status, void *p)
+{
+	struct ctdb_set_recmode_state *state = talloc_get_type(p, struct ctdb_set_recmode_state);
+
+	ctdb_start_monitoring(ctdb);
+
+	if (status == 0) {
+		ctdb->recovery_mode = state->recmode;
+	} else {
+		DEBUG(0,(__location__ " recovered event script failed (status %d)\n", status));
+	}
+
+	ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
+	talloc_free(state);
+}
+
+/*
+  set the recovery mode
+ */
+int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb, 
+				 struct ctdb_req_control *c,
+				 TDB_DATA indata, bool *async_reply,
+				 const char **errormsg)
+{
+	uint32_t recmode = *(uint32_t *)indata.dptr;
+	int ret;
+	struct ctdb_set_recmode_state *state;
+
+	if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
+		DEBUG(0,("Attempt to change recovery mode to %u when not frozen\n", 
+			 recmode));
+		(*errormsg) = "Cannot change recovery mode while not frozen";
+		return -1;
+	}
+
+	if (recmode != CTDB_RECOVERY_NORMAL ||
+	    ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
+		ctdb->recovery_mode = recmode;
+		return 0;
+	}
+
+	/* some special handling when ending recovery mode */
+	state = talloc(ctdb, struct ctdb_set_recmode_state);
+	CTDB_NO_MEMORY(ctdb, state);
+
+	/* we should not be able to get the lock on the nodes list, as it should be
+	   held by the recovery master */
+	if (ctdb_recovery_lock(ctdb, false)) {
+		DEBUG(0,("ERROR: recovery lock file %s not locked when recovering!\n",
+			 ctdb->recovery_lock_file));
+		return -1;
+	}	
+
+	state->c = talloc_steal(state, c);
+	state->recmode = recmode;
+	
+	ctdb_stop_monitoring(ctdb);
+
+	/* call the events script to tell all subsystems that we have recovered */
+	ret = ctdb_event_script_callback(ctdb, 
+					 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
+					 state, 
+					 ctdb_recovered_callback, 
+					 state, "recovered");
+	if (ret != 0) {
+		return ret;
+	}
+	*async_reply = true;
+
+	return 0;
+}
+
+/*
+  callback for ctdb_control_max_rsn
+ */
+static int traverse_max_rsn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
+{
+	struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
+	uint64_t *max_rsn = (uint64_t *)p;
+
+	if (data.dsize >= sizeof(*h)) {
+		(*max_rsn) = MAX(*max_rsn, h->rsn);
+	}
+	return 0;
+}
+
+/*
+  get max rsn across an entire db
+ */
+int32_t ctdb_control_max_rsn(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
+{
+	struct ctdb_db_context *ctdb_db;
+	uint32_t db_id = *(uint32_t *)indata.dptr;
+	uint64_t max_rsn = 0;
+	int ret;
+
+	if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
+		DEBUG(0,("rejecting ctdb_control_max_rsn when not frozen\n"));
+		return -1;
+	}
+
+	ctdb_db = find_ctdb_db(ctdb, db_id);
+	if (!ctdb_db) {
+		DEBUG(0,(__location__ " Unknown db\n"));
+		return -1;
+	}
+
+	if (ctdb_lock_all_databases_mark(ctdb) != 0) {
+		DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
+		return -1;
+	}
+
+	ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_max_rsn, &max_rsn);
+	if (ret < 0) {
+		DEBUG(0,(__location__ " traverse failed in ctdb_control_max_rsn\n"));
+		return -1;
+	}
+
+	ctdb_lock_all_databases_unmark(ctdb);
+
+	outdata->dptr = (uint8_t *)talloc(outdata, uint64_t);
+	if (!outdata->dptr) {
+		return -1;
+	}
+	(*(uint64_t *)outdata->dptr) = max_rsn;
+	outdata->dsize = sizeof(uint64_t);
+
+	return 0;
+}
+
+
+/*
+  callback for ctdb_control_set_rsn_nonempty
+ */
+static int traverse_set_rsn_nonempty(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
+{
+	struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
+	uint64_t *rsn = (uint64_t *)p;
+
+	if (data.dsize > sizeof(*h)) {
+		h->rsn = *rsn;
+		if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
+			return -1;
+		}
+	}
+	return 0;
+}
+
+/*
+  set rsn for all non-empty records in a database to a given rsn
+ */
+int32_t ctdb_control_set_rsn_nonempty(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
+{
+	struct ctdb_control_set_rsn_nonempty *p = (struct ctdb_control_set_rsn_nonempty *)indata.dptr;
+	struct ctdb_db_context *ctdb_db;
+	int ret;
+
+	if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
+		DEBUG(0,("rejecting ctdb_control_set_rsn_nonempty when not frozen\n"));
+		return -1;
+	}
+
+	ctdb_db = find_ctdb_db(ctdb, p->db_id);
+	if (!ctdb_db) {
+		DEBUG(0,(__location__ " Unknown db\n"));
+		return -1;
+	}
+
+	if (ctdb_lock_all_databases_mark(ctdb) != 0) {
+		DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
+		return -1;
+	}
+
+	ret = tdb_traverse(ctdb_db->ltdb->tdb, traverse_set_rsn_nonempty, &p->rsn);
+	if (ret < 0) {
+		DEBUG(0,(__location__ " traverse failed in ctdb_control_set_rsn_nonempty\n"));
+		return -1;
+	}
+
+	ctdb_lock_all_databases_unmark(ctdb);
+
+	return 0;
+}
+
+
+/*
+  callback for ctdb_control_delete_low_rsn
+ */
+static int traverse_delete_low_rsn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
+{
+	struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
+	uint64_t *rsn = (uint64_t *)p;
+
+	if (data.dsize < sizeof(*h) || h->rsn < *rsn) {
+		if (tdb_delete(tdb, key) != 0) {
+			return -1;
+		}
+	}
+	return 0;
+}
+
+/*
+  delete any records with a rsn < the given rsn
+ */
+int32_t ctdb_control_delete_low_rsn(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
+{
+	struct ctdb_control_delete_low_rsn *p = (struct ctdb_control_delete_low_rsn *)indata.dptr;
+	struct ctdb_db_context *ctdb_db;
+	int ret;
+
+	if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
+		DEBUG(0,("rejecting ctdb_control_delete_low_rsn when not frozen\n"));
+		return -1;
+	}
+
+	ctdb_db = find_ctdb_db(ctdb, p->db_id);
+	if (!ctdb_db) {
+		DEBUG(0,(__location__ " Unknown db\n"));
+		return -1;
+	}
+
+	if (ctdb_lock_all_databases_mark(ctdb) != 0) {
+		DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
+		return -1;
+	}
+
+	ret = tdb_traverse(ctdb_db->ltdb->tdb, traverse_delete_low_rsn, &p->rsn);
+	if (ret < 0) {
+		DEBUG(0,(__location__ " traverse failed in ctdb_control_delete_low_rsn\n"));
+		return -1;
+	}
+
+	ctdb_lock_all_databases_unmark(ctdb);
+
+	return 0;
+}
+
+
+/*
+  try and get the recovery lock in shared storage - should only work
+  on the recovery master recovery daemon. Anywhere else is a bug
+ */
+bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
+{
+	struct flock lock;
+
+	if (ctdb->recovery_lock_fd != -1) {
+		close(ctdb->recovery_lock_fd);
+	}
+	ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
+	if (ctdb->recovery_lock_fd == -1) {
+		DEBUG(0,("Unable to open %s - (%s)\n", 
+			 ctdb->recovery_lock_file, strerror(errno)));
+		return false;
+	}
+
+	lock.l_type = F_WRLCK;
+	lock.l_whence = SEEK_SET;
+	lock.l_start = 0;
+	lock.l_len = 1;
+	lock.l_pid = 0;
+
+	if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
+		return false;
+	}
+
+	if (!keep) {
+		close(ctdb->recovery_lock_fd);
+		ctdb->recovery_lock_fd = -1;
+	}
+
+	return true;
+}
diff --git a/source4/cluster/ctdb/server/ctdb_recoverd.c b/source4/cluster/ctdb/server/ctdb_recoverd.c
new file mode 100644
index 0000000000..5cb985521d
--- /dev/null
+++ b/source4/cluster/ctdb/server/ctdb_recoverd.c
@@ -0,0 +1,1511 @@
+/* 
+   ctdb recovery daemon
+
+   Copyright (C) Ronnie Sahlberg  2007
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/filesys.h"
+#include "system/time.h"
+#include "popt.h"
+#include "cmdline.h"
+#include "../include/ctdb.h"
+#include "../include/ctdb_private.h"
+
+
+struct ban_state {
+	struct ctdb_recoverd *rec;
+	uint32_t banned_node;
+};
+
+/*
+  private state of recovery daemon
+ */
+struct ctdb_recoverd {
+	struct ctdb_context *ctdb;
+	uint32_t last_culprit;
+	uint32_t culprit_counter;
+	struct timeval first_recover_time;
+	struct ban_state **banned_nodes;
+	struct timeval priority_time;
+};
+
+#define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
+#define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
+
+/*
+  unban a node
+ */
+static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t vnn)
+{
+	struct ctdb_context *ctdb = rec->ctdb;
+
+	if (!ctdb_validate_vnn(ctdb, vnn)) {
+		DEBUG(0,("Bad vnn %u in ctdb_ban_node\n", vnn));
+		return;
+	}
+
+	if (rec->banned_nodes[vnn] == NULL) {
+		return;
+	}
+
+	ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), vnn, 0, NODE_FLAGS_BANNED);
+
+	talloc_free(rec->banned_nodes[vnn]);
+	rec->banned_nodes[vnn] = NULL;
+}
+
+
+/*
+  called when a ban has timed out
+ */
+static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
+{
+	struct ban_state *state = talloc_get_type(p, struct ban_state);
+	struct ctdb_recoverd *rec = state->rec;
+	uint32_t vnn = state->banned_node;
+
+	DEBUG(0,("Node %u is now unbanned\n", vnn));
+	ctdb_unban_node(rec, vnn);
+}
+
+/*
+  ban a node for a period of time
+ */
+static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t vnn, uint32_t ban_time)
+{
+	struct ctdb_context *ctdb = rec->ctdb;
+
+	if (!ctdb_validate_vnn(ctdb, vnn)) {
+		DEBUG(0,("Bad vnn %u in ctdb_ban_node\n", vnn));
+		return;
+	}
+
+	if (vnn == ctdb->vnn) {
+		DEBUG(0,("self ban - lowering our election priority\n"));
+		/* banning ourselves - lower our election priority */
+		rec->priority_time = timeval_current();
+	}
+
+	ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), vnn, NODE_FLAGS_BANNED, 0);
+
+	rec->banned_nodes[vnn] = talloc(rec, struct ban_state);
+	CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[vnn]);
+
+	rec->banned_nodes[vnn]->rec = rec;
+	rec->banned_nodes[vnn]->banned_node = vnn;
+
+	if (ban_time != 0) {
+		event_add_timed(ctdb->ev, rec->banned_nodes[vnn], 
+				timeval_current_ofs(ban_time, 0),
+				ctdb_ban_timeout, rec->banned_nodes[vnn]);
+	}
+}
+
+
+/*
+  change recovery mode on all nodes
+ */
+static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
+{
+	int j, ret;
+
+	/* start the freeze process immediately on all nodes */
+	ctdb_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, 
+		     CTDB_CONTROL_FREEZE, CTDB_CTRL_FLAG_NOREPLY, tdb_null, 
+		     NULL, NULL, NULL, NULL, NULL);
+
+	/* set recovery mode to active on all nodes */
+	for (j=0; j<nodemap->num; j++) {
+		/* dont change it for nodes that are unavailable */
+		if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+			continue;
+		}
+
+		if (rec_mode == CTDB_RECOVERY_ACTIVE) {
+			ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn);
+			if (ret != 0) {
+				DEBUG(0, (__location__ " Unable to freeze node %u\n", nodemap->nodes[j].vnn));
+				return -1;
+			}
+		}
+
+		ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, rec_mode);
+		if (ret != 0) {
+			DEBUG(0, (__location__ " Unable to set recmode on node %u\n", nodemap->nodes[j].vnn));
+			return -1;
+		}
+
+		if (rec_mode == CTDB_RECOVERY_NORMAL) {
+			ret = ctdb_ctrl_thaw(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn);
+			if (ret != 0) {
+				DEBUG(0, (__location__ " Unable to thaw node %u\n", nodemap->nodes[j].vnn));
+				return -1;
+			}
+		}
+	}
+
+	return 0;
+}
+
+/*
+  change recovery master on all node
+ */
+static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t vnn)
+{
+	int j, ret;
+
+	/* set recovery master to vnn on all nodes */
+	for (j=0; j<nodemap->num; j++) {
+		/* dont change it for nodes that are unavailable */
+		if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+			continue;
+		}
+
+		ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, vnn);
+		if (ret != 0) {
+			DEBUG(0, (__location__ " Unable to set recmaster on node %u\n", nodemap->nodes[j].vnn));
+			return -1;
+		}
+	}
+
+	return 0;
+}
+
+
+/*
+  ensure all other nodes have attached to any databases that we have
+ */
+static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
+					   uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
+{
+	int i, j, db, ret;
+	struct ctdb_dbid_map *remote_dbmap;
+
+	/* verify that all other nodes have all our databases */
+	for (j=0; j<nodemap->num; j++) {
+		/* we dont need to ourself ourselves */
+		if (nodemap->nodes[j].vnn == vnn) {
+			continue;
+		}
+		/* dont check nodes that are unavailable */
+		if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+			continue;
+		}
+
+		ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
+					 mem_ctx, &remote_dbmap);
+		if (ret != 0) {
+			DEBUG(0, (__location__ " Unable to get dbids from node %u\n", vnn));
+			return -1;
+		}
+
+		/* step through all local databases */
+		for (db=0; db<dbmap->num;db++) {
+			const char *name;
+
+
+			for (i=0;i<remote_dbmap->num;i++) {
+				if (dbmap->dbids[db] == remote_dbmap->dbids[i]) {
+					break;
+				}
+			}
+			/* the remote node already have this database */
+			if (i!=remote_dbmap->num) {
+				continue;
+			}
+			/* ok so we need to create this database */
+			ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), vnn, dbmap->dbids[db], mem_ctx, &name);
+			if (ret != 0) {
+				DEBUG(0, (__location__ " Unable to get dbname from node %u\n", vnn));
+				return -1;
+			}
+			ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, mem_ctx, name);
+			if (ret != 0) {
+				DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
+				return -1;
+			}
+		}
+	}
+
+	return 0;
+}
+
+
+/*
+  ensure we are attached to any databases that anyone else is attached to
+ */
+static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
+					  uint32_t vnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
+{
+	int i, j, db, ret;
+	struct ctdb_dbid_map *remote_dbmap;
+
+	/* verify that we have all database any other node has */
+	for (j=0; j<nodemap->num; j++) {
+		/* we dont need to ourself ourselves */
+		if (nodemap->nodes[j].vnn == vnn) {
+			continue;
+		}
+		/* dont check nodes that are unavailable */
+		if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+			continue;
+		}
+
+		ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
+					 mem_ctx, &remote_dbmap);
+		if (ret != 0) {
+			DEBUG(0, (__location__ " Unable to get dbids from node %u\n", vnn));
+			return -1;
+		}
+
+		/* step through all databases on the remote node */
+		for (db=0; db<remote_dbmap->num;db++) {
+			const char *name;
+
+			for (i=0;i<(*dbmap)->num;i++) {
+				if (remote_dbmap->dbids[db] == (*dbmap)->dbids[i]) {
+					break;
+				}
+			}
+			/* we already have this db locally */
+			if (i!=(*dbmap)->num) {
+				continue;
+			}
+			/* ok so we need to create this database and
+			   rebuild dbmap
+			 */
+			ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
+					    remote_dbmap->dbids[db], mem_ctx, &name);
+			if (ret != 0) {
+				DEBUG(0, (__location__ " Unable to get dbname from node %u\n", 
+					  nodemap->nodes[j].vnn));
+				return -1;
+			}
+			ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, name);
+			if (ret != 0) {
+				DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
+				return -1;
+			}
+			ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, dbmap);
+			if (ret != 0) {
+				DEBUG(0, (__location__ " Unable to reread dbmap on node %u\n", vnn));
+				return -1;
+			}
+		}
+	}
+
+	return 0;
+}
+
+
+/*
+  pull all the remote database contents into ours
+ */
+static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
+				     uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
+{
+	int i, j, ret;
+
+	/* pull all records from all other nodes across onto this node
+	   (this merges based on rsn)
+	*/
+	for (i=0;i<dbmap->num;i++) {
+		for (j=0; j<nodemap->num; j++) {
+			/* we dont need to merge with ourselves */
+			if (nodemap->nodes[j].vnn == vnn) {
+				continue;
+			}
+			/* dont merge from nodes that are unavailable */
+			if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+				continue;
+			}
+			ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
+					       vnn, dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
+			if (ret != 0) {
+				DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
+					  nodemap->nodes[j].vnn, vnn));
+				return -1;
+			}
+		}
+	}
+
+	return 0;
+}
+
+
+/*
+  change the dmaster on all databases to point to us
+ */
+static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
+					   uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
+{
+	int i, j, ret;
+
+	/* update dmaster to point to this node for all databases/nodes */
+	for (i=0;i<dbmap->num;i++) {
+		for (j=0; j<nodemap->num; j++) {
+			/* dont repoint nodes that are unavailable */
+			if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+				continue;
+			}
+			ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, ctdb, dbmap->dbids[i], vnn);
+			if (ret != 0) {
+				DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n", nodemap->nodes[j].vnn, dbmap->dbids[i]));
+				return -1;
+			}
+		}
+	}
+
+	return 0;
+}
+
+
+/*
+  update flags on all active nodes
+ */
+static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
+{
+	int i;
+	for (i=0;i<nodemap->num;i++) {
+		struct ctdb_node_flag_change c;
+		TDB_DATA data;
+
+		c.vnn = nodemap->nodes[i].vnn;
+		c.flags = nodemap->nodes[i].flags;
+
+		data.dptr = (uint8_t *)&c;
+		data.dsize = sizeof(c);
+
+		ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
+				  CTDB_SRVID_NODE_FLAGS_CHANGED, data);
+
+	}
+	return 0;
+}
+
+/*
+  vacuum one database
+ */
+static int vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *nodemap)
+{
+	uint64_t max_rsn;
+	int ret, i;
+
+	/* find max rsn on our local node for this db */
+	ret = ctdb_ctrl_get_max_rsn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, db_id, &max_rsn);
+	if (ret != 0) {
+		return -1;
+	}
+
+	/* set rsn on non-empty records to max_rsn+1 */
+	for (i=0;i<nodemap->num;i++) {
+		if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
+			continue;
+		}
+		ret = ctdb_ctrl_set_rsn_nonempty(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].vnn,
+						 db_id, max_rsn+1);
+		if (ret != 0) {
+			DEBUG(0,(__location__ " Failed to set rsn on node %u to %llu\n",
+				 nodemap->nodes[i].vnn, (unsigned long long)max_rsn+1));
+			return -1;
+		}
+	}
+
+	/* delete records with rsn < max_rsn+1 on all nodes */
+	for (i=0;i<nodemap->num;i++) {
+		if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
+			continue;
+		}
+		ret = ctdb_ctrl_delete_low_rsn(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].vnn,
+						 db_id, max_rsn+1);
+		if (ret != 0) {
+			DEBUG(0,(__location__ " Failed to delete records on node %u with rsn below %llu\n",
+				 nodemap->nodes[i].vnn, (unsigned long long)max_rsn+1));
+			return -1;
+		}
+	}
+
+
+	return 0;
+}
+
+
+/*
+  vacuum all attached databases
+ */
+static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
+				struct ctdb_dbid_map *dbmap)
+{
+	int i;
+
+	/* update dmaster to point to this node for all databases/nodes */
+	for (i=0;i<dbmap->num;i++) {
+		if (vacuum_db(ctdb, dbmap->dbids[i], nodemap) != 0) {
+			return -1;
+		}
+	}
+	return 0;
+}
+
+
+/*
+  push out all our database contents to all other nodes
+ */
+static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
+				    uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
+{
+	int i, j, ret;
+
+	/* push all records out to the nodes again */
+	for (i=0;i<dbmap->num;i++) {
+		for (j=0; j<nodemap->num; j++) {
+			/* we dont need to push to ourselves */
+			if (nodemap->nodes[j].vnn == vnn) {
+				continue;
+			}
+			/* dont push to nodes that are unavailable */
+			if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+				continue;
+			}
+			ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), vnn, nodemap->nodes[j].vnn, 
+					       dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
+			if (ret != 0) {
+				DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
+					  vnn, nodemap->nodes[j].vnn));
+				return -1;
+			}
+		}
+	}
+
+	return 0;
+}
+
+
+/*
+  ensure all nodes have the same vnnmap we do
+ */
+static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
+				      uint32_t vnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
+{
+	int j, ret;
+
+	/* push the new vnn map out to all the nodes */
+	for (j=0; j<nodemap->num; j++) {
+		/* dont push to nodes that are unavailable */
+		if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+			continue;
+		}
+
+		ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, mem_ctx, vnnmap);
+		if (ret != 0) {
+			DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", vnn));
+			return -1;
+		}
+	}
+
+	return 0;
+}
+
+
+/*
+  handler for when the admin bans a node
+*/
+static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+			TDB_DATA data, void *private_data)
+{
+	struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+	struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
+	uint32_t recmaster;
+	int ret;
+
+	if (data.dsize != sizeof(*b)) {
+		DEBUG(0,("Bad data in ban_handler\n"));
+		return;
+	}
+
+	ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
+	if (ret != 0) {
+		DEBUG(0,(__location__ " Failed to find the recmaster\n"));
+		return;
+	}
+
+	if (recmaster != ctdb->vnn) {
+		DEBUG(0,("We are not the recmaster - ignoring ban request\n"));
+		return;
+	}
+
+	DEBUG(0,("Node %u has been banned for %u seconds by the administrator\n", 
+		 b->vnn, b->ban_time));
+	ctdb_ban_node(rec, b->vnn, b->ban_time);
+}
+
+/*
+  handler for when the admin unbans a node
+*/
+static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+			  TDB_DATA data, void *private_data)
+{
+	struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+	uint32_t vnn;
+	int ret;
+	uint32_t recmaster;
+
+	if (data.dsize != sizeof(uint32_t)) {
+		DEBUG(0,("Bad data in unban_handler\n"));
+		return;
+	}
+	vnn = *(uint32_t *)data.dptr;
+
+	ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
+	if (ret != 0) {
+		DEBUG(0,(__location__ " Failed to find the recmaster\n"));
+		return;
+	}
+
+	if (recmaster != ctdb->vnn) {
+		DEBUG(0,("We are not the recmaster - ignoring unban request\n"));
+		return;
+	}
+
+	DEBUG(0,("Node %u has been unbanned by the administrator\n", vnn));
+	ctdb_unban_node(rec, vnn);
+}
+
+
+
+/*
+  called when ctdb_wait_timeout should finish
+ */
+static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
+			      struct timeval yt, void *p)
+{
+	uint32_t *timed_out = (uint32_t *)p;
+	(*timed_out) = 1;
+}
+
+/*
+  wait for a given number of seconds
+ */
+static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
+{
+	uint32_t timed_out = 0;
+	event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
+	while (!timed_out) {
+		event_loop_once(ctdb->ev);
+	}
+}
+
+/*
+  we are the recmaster, and recovery is needed - start a recovery run
+ */
+static int do_recovery(struct ctdb_recoverd *rec, 
+		       TALLOC_CTX *mem_ctx, uint32_t vnn, uint32_t num_active,
+		       struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
+		       uint32_t culprit)
+{
+	struct ctdb_context *ctdb = rec->ctdb;
+	int i, j, ret;
+	uint32_t generation;
+	struct ctdb_dbid_map *dbmap;
+
+	if (rec->last_culprit != culprit ||
+	    timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
+		/* either a new node is the culprit, or we've decide to forgive them */
+		rec->last_culprit = culprit;
+		rec->first_recover_time = timeval_current();
+		rec->culprit_counter = 0;
+	}
+	rec->culprit_counter++;
+
+	if (rec->culprit_counter > 2*nodemap->num) {
+		DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
+			 culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
+			 ctdb->tunable.recovery_ban_period));
+		ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
+	}
+
+	if (!ctdb_recovery_lock(ctdb, true)) {
+		DEBUG(0,("Unable to get recovery lock - aborting recovery\n"));
+		return -1;
+	}
+
+	/* set recovery mode to active on all nodes */
+	ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
+	if (ret!=0) {
+		DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
+		return -1;
+	}
+
+	DEBUG(0, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
+
+	/* pick a new generation number */
+	generation = random();
+
+	/* change the vnnmap on this node to use the new generation 
+	   number but not on any other nodes.
+	   this guarantees that if we abort the recovery prematurely
+	   for some reason (a node stops responding?)
+	   that we can just return immediately and we will reenter
+	   recovery shortly again.
+	   I.e. we deliberately leave the cluster with an inconsistent
+	   generation id to allow us to abort recovery at any stage and
+	   just restart it from scratch.
+	 */
+	vnnmap->generation = generation;
+	ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, vnnmap);
+	if (ret != 0) {
+		DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", vnn));
+		return -1;
+	}
+
+	/* get a list of all databases */
+	ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, &dbmap);
+	if (ret != 0) {
+		DEBUG(0, (__location__ " Unable to get dbids from node :%u\n", vnn));
+		return -1;
+	}
+
+
+
+	/* verify that all other nodes have all our databases */
+	ret = create_missing_remote_databases(ctdb, nodemap, vnn, dbmap, mem_ctx);
+	if (ret != 0) {
+		DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
+		return -1;
+	}
+
+	/* verify that we have all the databases any other node has */
+	ret = create_missing_local_databases(ctdb, nodemap, vnn, &dbmap, mem_ctx);
+	if (ret != 0) {
+		DEBUG(0, (__location__ " Unable to create missing local databases\n"));
+		return -1;
+	}
+
+
+
+	/* verify that all other nodes have all our databases */
+	ret = create_missing_remote_databases(ctdb, nodemap, vnn, dbmap, mem_ctx);
+	if (ret != 0) {
+		DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
+		return -1;
+	}
+
+
+	DEBUG(1, (__location__ " Recovery - created remote databases\n"));
+
+	/* pull all remote databases onto the local node */
+	ret = pull_all_remote_databases(ctdb, nodemap, vnn, dbmap, mem_ctx);
+	if (ret != 0) {
+		DEBUG(0, (__location__ " Unable to pull remote databases\n"));
+		return -1;
+	}
+
+	DEBUG(1, (__location__ " Recovery - pulled remote databases\n"));
+
+	/* push all local databases to the remote nodes */
+	ret = push_all_local_databases(ctdb, nodemap, vnn, dbmap, mem_ctx);
+	if (ret != 0) {
+		DEBUG(0, (__location__ " Unable to push local databases\n"));
+		return -1;
+	}
+
+	DEBUG(1, (__location__ " Recovery - pushed remote databases\n"));
+
+	/* build a new vnn map with all the currently active and
+	   unbanned nodes */
+	generation = random();
+	vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
+	CTDB_NO_MEMORY(ctdb, vnnmap);
+	vnnmap->generation = generation;
+	vnnmap->size = num_active;
+	vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
+	for (i=j=0;i<nodemap->num;i++) {
+		if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
+			vnnmap->map[j++] = nodemap->nodes[i].vnn;
+		}
+	}
+
+
+
+	/* update to the new vnnmap on all nodes */
+	ret = update_vnnmap_on_all_nodes(ctdb, nodemap, vnn, vnnmap, mem_ctx);
+	if (ret != 0) {
+		DEBUG(0, (__location__ " Unable to update vnnmap on all nodes\n"));
+		return -1;
+	}
+
+	DEBUG(1, (__location__ " Recovery - updated vnnmap\n"));
+
+	/* update recmaster to point to us for all nodes */
+	ret = set_recovery_master(ctdb, nodemap, vnn);
+	if (ret!=0) {
+		DEBUG(0, (__location__ " Unable to set recovery master\n"));
+		return -1;
+	}
+
+	DEBUG(1, (__location__ " Recovery - updated recmaster\n"));
+
+	/* repoint all local and remote database records to the local
+	   node as being dmaster
+	 */
+	ret = update_dmaster_on_all_databases(ctdb, nodemap, vnn, dbmap, mem_ctx);
+	if (ret != 0) {
+		DEBUG(0, (__location__ " Unable to update dmaster on all databases\n"));
+		return -1;
+	}
+
+	DEBUG(1, (__location__ " Recovery - updated dmaster on all databases\n"));
+
+	/*
+	  update all nodes to have the same flags that we have
+	 */
+	ret = update_flags_on_all_nodes(ctdb, nodemap);
+	if (ret != 0) {
+		DEBUG(0, (__location__ " Unable to update flags on all nodes\n"));
+		return -1;
+	}
+	
+	DEBUG(1, (__location__ " Recovery - updated flags\n"));
+
+	/*
+	  run a vacuum operation on empty records
+	 */
+	ret = vacuum_all_databases(ctdb, nodemap, dbmap);
+	if (ret != 0) {
+		DEBUG(0, (__location__ " Unable to vacuum all databases\n"));
+		return -1;
+	}
+
+	DEBUG(1, (__location__ " Recovery - vacuumed all databases\n"));
+
+	/*
+	  if enabled, tell nodes to takeover their public IPs
+	 */
+	if (ctdb->takeover.enabled) {
+		ret = ctdb_takeover_run(ctdb, nodemap);
+		if (ret != 0) {
+			DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
+			return -1;
+		}
+		DEBUG(1, (__location__ " Recovery - done takeover\n"));
+	}
+
+
+	/* disable recovery mode */
+	ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
+	if (ret!=0) {
+		DEBUG(0, (__location__ " Unable to set recovery mode to normal on cluster\n"));
+		return -1;
+	}
+
+	/* send a message to all clients telling them that the cluster 
+	   has been reconfigured */
+	ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, CTDB_SRVID_RECONFIGURE, tdb_null);
+
+	DEBUG(0, (__location__ " Recovery complete\n"));
+
+	/* We just finished a recovery successfully. 
+	   We now wait for rerecovery_timeout before we allow 
+	   another recovery to take place.
+	*/
+	DEBUG(0, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
+	ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
+	DEBUG(0, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
+
+	return 0;
+}
+
+
+/*
+  elections are won by first checking the number of connected nodes, then
+  the priority time, then the vnn
+ */
+struct election_message {
+	uint32_t num_connected;
+	struct timeval priority_time;
+	uint32_t vnn;
+};
+
+/*
+  form this nodes election data
+ */
+static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
+{
+	int ret, i;
+	struct ctdb_node_map *nodemap;
+	struct ctdb_context *ctdb = rec->ctdb;
+
+	ZERO_STRUCTP(em);
+
+	em->vnn = rec->ctdb->vnn;
+	em->priority_time = rec->priority_time;
+
+	ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
+	if (ret != 0) {
+		return;
+	}
+
+	for (i=0;i<nodemap->num;i++) {
+		if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
+			em->num_connected++;
+		}
+	}
+	talloc_free(nodemap);
+}
+
+/*
+  see if the given election data wins
+ */
+static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
+{
+	struct election_message myem;
+	int cmp;
+
+	ctdb_election_data(rec, &myem);
+
+	/* try to use the most connected node */
+	cmp = (int)myem.num_connected - (int)em->num_connected;
+
+	/* then the longest running node */
+	if (cmp == 0) {
+		cmp = timeval_compare(&em->priority_time, &myem.priority_time);
+	}
+
+	if (cmp == 0) {
+		cmp = (int)myem.vnn - (int)em->vnn;
+	}
+
+	return cmp > 0;
+}
+
+/*
+  send out an election request
+ */
+static int send_election_request(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t vnn)
+{
+	int ret;
+	TDB_DATA election_data;
+	struct election_message emsg;
+	uint64_t srvid;
+	struct ctdb_context *ctdb = rec->ctdb;
+	
+	srvid = CTDB_SRVID_RECOVERY;
+
+	ctdb_election_data(rec, &emsg);
+
+	election_data.dsize = sizeof(struct election_message);
+	election_data.dptr  = (unsigned char *)&emsg;
+
+
+	/* first we assume we will win the election and set 
+	   recoverymaster to be ourself on the current node
+	 */
+	ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), vnn, vnn);
+	if (ret != 0) {
+		DEBUG(0, (__location__ " failed to send recmaster election request\n"));
+		return -1;
+	}
+
+
+	/* send an election message to all active nodes */
+	ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
+
+	return 0;
+}
+
+/*
+  this function will unban all nodes in the cluster
+*/
+static void unban_all_nodes(struct ctdb_context *ctdb)
+{
+	int ret, i;
+	struct ctdb_node_map *nodemap;
+	TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+	
+	ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
+	if (ret != 0) {
+		DEBUG(0,(__location__ " failed to get nodemap to unban all nodes\n"));
+		return;
+	}
+
+	for (i=0;i<nodemap->num;i++) {
+		if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
+		  && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
+			ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].vnn, 0, NODE_FLAGS_BANNED);
+		}
+	}
+
+	talloc_free(tmp_ctx);
+}
+
+/*
+  handler for recovery master elections
+*/
+static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+			     TDB_DATA data, void *private_data)
+{
+	struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+	int ret;
+	struct election_message *em = (struct election_message *)data.dptr;
+	TALLOC_CTX *mem_ctx;
+
+	mem_ctx = talloc_new(ctdb);
+
+	/* someone called an election. check their election data
+	   and if we disagree and we would rather be the elected node, 
+	   send a new election message to all other nodes
+	 */
+	if (ctdb_election_win(rec, em)) {
+		ret = send_election_request(rec, mem_ctx, ctdb_get_vnn(ctdb));
+		if (ret!=0) {
+			DEBUG(0, (__location__ " failed to initiate recmaster election"));
+		}
+		talloc_free(mem_ctx);
+		/*unban_all_nodes(ctdb);*/
+		return;
+	}
+
+	/* release the recmaster lock */
+	if (em->vnn != ctdb->vnn &&
+	    ctdb->recovery_lock_fd != -1) {
+		close(ctdb->recovery_lock_fd);
+		ctdb->recovery_lock_fd = -1;
+		unban_all_nodes(ctdb);
+	}
+
+	/* ok, let that guy become recmaster then */
+	ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_vnn(ctdb), em->vnn);
+	if (ret != 0) {
+		DEBUG(0, (__location__ " failed to send recmaster election request"));
+		talloc_free(mem_ctx);
+		return;
+	}
+
+	/* release any bans */
+	rec->last_culprit = (uint32_t)-1;
+	talloc_free(rec->banned_nodes);
+	rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
+	CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
+
+	talloc_free(mem_ctx);
+	return;
+}
+
+
+/*
+  force the start of the election process
+ */
+static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t vnn, 
+			   struct ctdb_node_map *nodemap)
+{
+	int ret;
+	struct ctdb_context *ctdb = rec->ctdb;
+
+	/* set all nodes to recovery mode to stop all internode traffic */
+	ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
+	if (ret!=0) {
+		DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
+		return;
+	}
+	
+	ret = send_election_request(rec, mem_ctx, vnn);
+	if (ret!=0) {
+		DEBUG(0, (__location__ " failed to initiate recmaster election"));
+		return;
+	}
+
+	/* wait for a few seconds to collect all responses */
+	ctdb_wait_timeout(ctdb, ctdb->tunable.election_timeout);
+}
+
+
+
+/*
+  handler for when a node changes its flags
+*/
+static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+			    TDB_DATA data, void *private_data)
+{
+	int ret;
+	struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
+	struct ctdb_node_map *nodemap=NULL;
+	TALLOC_CTX *tmp_ctx;
+	int i;
+
+	if (data.dsize != sizeof(*c)) {
+		DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
+		return;
+	}
+
+	tmp_ctx = talloc_new(ctdb);
+	CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
+
+	ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
+
+	for (i=0;i<nodemap->num;i++) {
+		if (nodemap->nodes[i].vnn == c->vnn) break;
+	}
+
+	if (i == nodemap->num) {
+		DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->vnn));
+		talloc_free(tmp_ctx);
+		return;
+	}
+
+	/* Dont let messages from remote nodes change the DISCONNECTED flag. 
+	   This flag is handled locally based on whether the local node
+	   can communicate with the node or not.
+	*/
+	c->flags &= ~NODE_FLAGS_DISCONNECTED;
+	if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
+		c->flags |= NODE_FLAGS_DISCONNECTED;
+	}
+
+	if (nodemap->nodes[i].flags != c->flags) {
+		DEBUG(0,("Node %u has changed flags - now 0x%x\n", c->vnn, c->flags));
+	}
+
+	nodemap->nodes[i].flags = c->flags;
+
+	ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), 
+				     CTDB_CURRENT_NODE, &ctdb->recovery_master);
+
+	if (ret == 0) {
+		ret = ctdb_ctrl_getrecmode(ctdb, CONTROL_TIMEOUT(), 
+					   CTDB_CURRENT_NODE, &ctdb->recovery_mode);
+	}
+	
+	if (ret == 0 &&
+	    ctdb->recovery_master == ctdb->vnn &&
+	    ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
+	    ctdb->takeover.enabled) {
+		ret = ctdb_takeover_run(ctdb, nodemap);
+		if (ret != 0) {
+			DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
+		}
+	}
+
+	talloc_free(tmp_ctx);
+}
+
+
+
+/*
+  the main monitoring loop
+ */
+static void monitor_cluster(struct ctdb_context *ctdb)
+{
+	uint32_t vnn, num_active, recmode, recmaster;
+	TALLOC_CTX *mem_ctx=NULL;
+	struct ctdb_node_map *nodemap=NULL;
+	struct ctdb_node_map *remote_nodemap=NULL;
+	struct ctdb_vnn_map *vnnmap=NULL;
+	struct ctdb_vnn_map *remote_vnnmap=NULL;
+	int i, j, ret;
+	bool need_takeover_run;
+	struct ctdb_recoverd *rec;
+
+	rec = talloc_zero(ctdb, struct ctdb_recoverd);
+	CTDB_NO_MEMORY_FATAL(ctdb, rec);
+
+	rec->ctdb = ctdb;
+	rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
+	CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
+
+	rec->priority_time = timeval_current();
+
+	/* register a message port for recovery elections */
+	ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
+
+	/* and one for when nodes are disabled/enabled */
+	ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
+
+	/* and one for when nodes are banned */
+	ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
+
+	/* and one for when nodes are unbanned */
+	ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
+	
+again:
+	need_takeover_run = false;
+
+	if (mem_ctx) {
+		talloc_free(mem_ctx);
+		mem_ctx = NULL;
+	}
+	mem_ctx = talloc_new(ctdb);
+	if (!mem_ctx) {
+		DEBUG(0,("Failed to create temporary context\n"));
+		exit(-1);
+	}
+
+	/* we only check for recovery once every second */
+	ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
+
+	/* get relevant tunables */
+	ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
+	if (ret != 0) {
+		DEBUG(0,("Failed to get tunables - retrying\n"));
+		goto again;
+	}
+
+	vnn = ctdb_ctrl_getvnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
+	if (vnn == (uint32_t)-1) {
+		DEBUG(0,("Failed to get local vnn - retrying\n"));
+		goto again;
+	}
+
+	/* get the vnnmap */
+	ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, &vnnmap);
+	if (ret != 0) {
+		DEBUG(0, (__location__ " Unable to get vnnmap from node %u\n", vnn));
+		goto again;
+	}
+
+
+	/* get number of nodes */
+	ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, &nodemap);
+	if (ret != 0) {
+		DEBUG(0, (__location__ " Unable to get nodemap from node %u\n", vnn));
+		goto again;
+	}
+
+
+	/* count how many active nodes there are */
+	num_active = 0;
+	for (i=0; i<nodemap->num; i++) {
+		if (rec->banned_nodes[nodemap->nodes[i].vnn] != NULL) {
+			nodemap->nodes[i].flags |= NODE_FLAGS_BANNED;
+		} else {
+			nodemap->nodes[i].flags &= ~NODE_FLAGS_BANNED;
+		}
+		if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
+			num_active++;
+		}
+	}
+
+
+	/* check which node is the recovery master */
+	ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), vnn, &recmaster);
+	if (ret != 0) {
+		DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", vnn));
+		goto again;
+	}
+
+	if (recmaster == (uint32_t)-1) {
+		DEBUG(0,(__location__ " Initial recovery master set - forcing election\n"));
+		force_election(rec, mem_ctx, vnn, nodemap);
+		goto again;
+	}
+	
+	/* verify that the recmaster node is still active */
+	for (j=0; j<nodemap->num; j++) {
+		if (nodemap->nodes[j].vnn==recmaster) {
+			break;
+		}
+	}
+
+	if (j == nodemap->num) {
+		DEBUG(0, ("Recmaster node %u not in list. Force reelection\n", recmaster));
+		force_election(rec, mem_ctx, vnn, nodemap);
+		goto again;
+	}
+
+	if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+		DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].vnn));
+		force_election(rec, mem_ctx, vnn, nodemap);
+		goto again;
+	}
+	
+
+	/* if we are not the recmaster then we do not need to check
+	   if recovery is needed
+	 */
+	if (vnn!=recmaster) {
+		goto again;
+	}
+
+
+	/* verify that all active nodes agree that we are the recmaster */
+	for (j=0; j<nodemap->num; j++) {
+		if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+			continue;
+		}
+		if (nodemap->nodes[j].vnn == vnn) {
+			continue;
+		}
+
+		ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, &recmaster);
+		if (ret != 0) {
+			DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", vnn));
+			goto again;
+		}
+
+		if (recmaster!=vnn) {
+			DEBUG(0, ("Node %u does not agree we are the recmaster. Force reelection\n", 
+				  nodemap->nodes[j].vnn));
+			force_election(rec, mem_ctx, vnn, nodemap);
+			goto again;
+		}
+	}
+
+
+	/* verify that all active nodes are in normal mode 
+	   and not in recovery mode 
+	 */
+	for (j=0; j<nodemap->num; j++) {
+		if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+			continue;
+		}
+
+		ret = ctdb_ctrl_getrecmode(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, &recmode);
+		if (ret != 0) {
+			DEBUG(0, ("Unable to get recmode from node %u\n", vnn));
+			goto again;
+		}
+		if (recmode != CTDB_RECOVERY_NORMAL) {
+			DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", 
+				  nodemap->nodes[j].vnn));
+			do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
+			goto again;
+		}
+	}
+
+
+	/* get the nodemap for all active remote nodes and verify
+	   they are the same as for this node
+	 */
+	for (j=0; j<nodemap->num; j++) {
+		if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+			continue;
+		}
+		if (nodemap->nodes[j].vnn == vnn) {
+			continue;
+		}
+
+		ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
+					   mem_ctx, &remote_nodemap);
+		if (ret != 0) {
+			DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
+				  nodemap->nodes[j].vnn));
+			goto again;
+		}
+
+		/* if the nodes disagree on how many nodes there are
+		   then this is a good reason to try recovery
+		 */
+		if (remote_nodemap->num != nodemap->num) {
+			DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
+				  nodemap->nodes[j].vnn, remote_nodemap->num, nodemap->num));
+			do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
+			goto again;
+		}
+
+		/* if the nodes disagree on which nodes exist and are
+		   active, then that is also a good reason to do recovery
+		 */
+		for (i=0;i<nodemap->num;i++) {
+			if (remote_nodemap->nodes[i].vnn != nodemap->nodes[i].vnn) {
+				DEBUG(0, (__location__ " Remote node:%u has different nodemap vnn for %d (%u vs %u).\n", 
+					  nodemap->nodes[j].vnn, i, 
+					  remote_nodemap->nodes[i].vnn, nodemap->nodes[i].vnn));
+				do_recovery(rec, mem_ctx, vnn, num_active, nodemap, 
+					    vnnmap, nodemap->nodes[j].vnn);
+				goto again;
+			}
+			if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
+			    (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
+				DEBUG(0, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
+					  nodemap->nodes[j].vnn, i,
+					  remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
+				do_recovery(rec, mem_ctx, vnn, num_active, nodemap, 
+					    vnnmap, nodemap->nodes[j].vnn);
+				goto again;
+			}
+		}
+
+		/* update our nodemap flags according to the other
+		   server - this gets the NODE_FLAGS_DISABLED
+		   flag. Note that the remote node is authoritative
+		   for its flags (except CONNECTED, which we know
+		   matches in this code) */
+		if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
+			nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
+			need_takeover_run = true;
+		}
+	}
+
+
+	/* there better be the same number of lmasters in the vnn map
+	   as there are active nodes or we will have to do a recovery
+	 */
+	if (vnnmap->size != num_active) {
+		DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
+			  vnnmap->size, num_active));
+		do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, ctdb->vnn);
+		goto again;
+	}
+
+	/* verify that all active nodes in the nodemap also exist in 
+	   the vnnmap.
+	 */
+	for (j=0; j<nodemap->num; j++) {
+		if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+			continue;
+		}
+		if (nodemap->nodes[j].vnn == vnn) {
+			continue;
+		}
+
+		for (i=0; i<vnnmap->size; i++) {
+			if (vnnmap->map[i] == nodemap->nodes[j].vnn) {
+				break;
+			}
+		}
+		if (i == vnnmap->size) {
+			DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
+				  nodemap->nodes[j].vnn));
+			do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
+			goto again;
+		}
+	}
+
+	
+	/* verify that all other nodes have the same vnnmap
+	   and are from the same generation
+	 */
+	for (j=0; j<nodemap->num; j++) {
+		if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+			continue;
+		}
+		if (nodemap->nodes[j].vnn == vnn) {
+			continue;
+		}
+
+		ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
+					  mem_ctx, &remote_vnnmap);
+		if (ret != 0) {
+			DEBUG(0, (__location__ " Unable to get vnnmap from remote node %u\n", 
+				  nodemap->nodes[j].vnn));
+			goto again;
+		}
+
+		/* verify the vnnmap generation is the same */
+		if (vnnmap->generation != remote_vnnmap->generation) {
+			DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
+				  nodemap->nodes[j].vnn, remote_vnnmap->generation, vnnmap->generation));
+			do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
+			goto again;
+		}
+
+		/* verify the vnnmap size is the same */
+		if (vnnmap->size != remote_vnnmap->size) {
+			DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
+				  nodemap->nodes[j].vnn, remote_vnnmap->size, vnnmap->size));
+			do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
+			goto again;
+		}
+
+		/* verify the vnnmap is the same */
+		for (i=0;i<vnnmap->size;i++) {
+			if (remote_vnnmap->map[i] != vnnmap->map[i]) {
+				DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", 
+					  nodemap->nodes[j].vnn));
+				do_recovery(rec, mem_ctx, vnn, num_active, nodemap, 
+					    vnnmap, nodemap->nodes[j].vnn);
+				goto again;
+			}
+		}
+	}
+
+	/* we might need to change who has what IP assigned */
+	if (need_takeover_run && ctdb->takeover.enabled) {
+		ret = ctdb_takeover_run(ctdb, nodemap);
+		if (ret != 0) {
+			DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
+		}
+	}
+
+	goto again;
+
+}
+
+/*
+  event handler for when the main ctdbd dies
+ */
+static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
+				 uint16_t flags, void *private_data)
+{
+	DEBUG(0,("recovery daemon parent died - exiting\n"));
+	_exit(1);
+}
+
+
+
+/*
+  startup the recovery daemon as a child of the main ctdb daemon
+ */
+int ctdb_start_recoverd(struct ctdb_context *ctdb)
+{
+	int ret;
+	int fd[2];
+	pid_t child;
+
+	if (pipe(fd) != 0) {
+		return -1;
+	}
+
+	child = fork();
+	if (child == -1) {
+		return -1;
+	}
+	
+	if (child != 0) {
+		close(fd[0]);
+		return 0;
+	}
+
+	close(fd[1]);
+
+	/* shutdown the transport */
+	ctdb->methods->shutdown(ctdb);
+
+	/* get a new event context */
+	talloc_free(ctdb->ev);
+	ctdb->ev = event_context_init(ctdb);
+
+	event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
+		     ctdb_recoverd_parent, &fd[0]);	
+
+	close(ctdb->daemon.sd);
+	ctdb->daemon.sd = -1;
+
+	srandom(getpid() ^ time(NULL));
+
+	/* initialise ctdb */
+	ret = ctdb_socket_connect(ctdb);
+	if (ret != 0) {
+		DEBUG(0, (__location__ " Failed to init ctdb\n"));
+		exit(1);
+	}
+
+	monitor_cluster(ctdb);
+
+	DEBUG(0,("ERROR: ctdb_recoverd finished!?\n"));
+	return -1;
+}
diff --git a/source4/cluster/ctdb/server/ctdb_server.c b/source4/cluster/ctdb/server/ctdb_server.c
new file mode 100644
index 0000000000..1480127327
--- /dev/null
+++ b/source4/cluster/ctdb/server/ctdb_server.c
@@ -0,0 +1,469 @@
+/* 
+   ctdb main protocol code
+
+   Copyright (C) Andrew Tridgell  2006
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "lib/tdb/include/tdb.h"
+#include "lib/events/events.h"
+#include "lib/util/dlinklist.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "../include/ctdb_private.h"
+
+/*
+  choose the transport we will use
+*/
+int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
+{
+	ctdb->transport = talloc_strdup(ctdb, transport);
+	return 0;
+}
+
+/*
+  choose the recovery lock file
+*/
+int ctdb_set_recovery_lock_file(struct ctdb_context *ctdb, const char *file)
+{
+	ctdb->recovery_lock_file = talloc_strdup(ctdb, file);
+	return 0;
+}
+
+/*
+  choose the logfile location
+*/
+int ctdb_set_logfile(struct ctdb_context *ctdb, const char *logfile)
+{
+	ctdb->logfile = talloc_strdup(ctdb, logfile);
+	if (ctdb->logfile != NULL && strcmp(logfile, "-") != 0) {
+		int fd;
+		fd = open(ctdb->logfile, O_WRONLY|O_APPEND|O_CREAT, 0666);
+		if (fd == -1) {
+			printf("Failed to open logfile %s\n", ctdb->logfile);
+			abort();
+		}
+		close(1);
+		close(2);
+		if (fd != 1) {
+			dup2(fd, 1);
+			close(fd);
+		}
+		/* also catch stderr of subcommands to the log file */
+		dup2(1, 2);
+	}
+	return 0;
+}
+
+
+/*
+  set the directory for the local databases
+*/
+int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir)
+{
+	ctdb->db_directory = talloc_strdup(ctdb, dir);
+	if (ctdb->db_directory == NULL) {
+		return -1;
+	}
+	return 0;
+}
+
+/*
+  add a node to the list of active nodes
+*/
+static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
+{
+	struct ctdb_node *node, **nodep;
+
+	nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
+	CTDB_NO_MEMORY(ctdb, nodep);
+
+	ctdb->nodes = nodep;
+	nodep = &ctdb->nodes[ctdb->num_nodes];
+	(*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
+	CTDB_NO_MEMORY(ctdb, *nodep);
+	node = *nodep;
+
+	if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
+		return -1;
+	}
+	node->ctdb = ctdb;
+	node->name = talloc_asprintf(node, "%s:%u", 
+				     node->address.address, 
+				     node->address.port);
+	/* this assumes that the nodes are kept in sorted order, and no gaps */
+	node->vnn = ctdb->num_nodes;
+
+	/* nodes start out disconnected */
+	node->flags |= NODE_FLAGS_DISCONNECTED;
+
+	if (ctdb->address.address &&
+	    ctdb_same_address(&ctdb->address, &node->address)) {
+		ctdb->vnn = node->vnn;
+		node->flags &= ~NODE_FLAGS_DISCONNECTED;
+	}
+
+	ctdb->num_nodes++;
+	node->dead_count = 0;
+
+	return 0;
+}
+
+/*
+  setup the node list from a file
+*/
+int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
+{
+	char **lines;
+	int nlines;
+	int i;
+
+	talloc_free(ctdb->node_list_file);
+	ctdb->node_list_file = talloc_strdup(ctdb, nlist);
+
+	lines = file_lines_load(nlist, &nlines, ctdb);
+	if (lines == NULL) {
+		ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
+		return -1;
+	}
+	while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
+		nlines--;
+	}
+
+	for (i=0;i<nlines;i++) {
+		if (ctdb_add_node(ctdb, lines[i]) != 0) {
+			talloc_free(lines);
+			return -1;
+		}
+	}
+
+	/* initialize the vnn mapping table now that we have num_nodes setup */
+/*
+XXX we currently initialize it to the maximum number of nodes to 
+XXX make it behave the same way as previously.  
+XXX Once we have recovery working we should initialize this always to 
+XXX generation==0 (==invalid) and let the recovery tool populate this 
+XXX table for the daemons. 
+*/
+	ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
+	CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
+
+	ctdb->vnn_map->generation = 1;
+	ctdb->vnn_map->size = ctdb->num_nodes;
+	ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
+	CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
+
+	for(i=0;i<ctdb->vnn_map->size;i++) {
+		ctdb->vnn_map->map[i] = i;
+	}
+	
+	talloc_free(lines);
+	return 0;
+}
+
+
+/*
+  setup the local node address
+*/
+int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
+{
+	if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
+		return -1;
+	}
+	
+	ctdb->name = talloc_asprintf(ctdb, "%s:%u", 
+				     ctdb->address.address, 
+				     ctdb->address.port);
+	return 0;
+}
+
+
+/*
+  return the number of active nodes
+*/
+uint32_t ctdb_get_num_active_nodes(struct ctdb_context *ctdb)
+{
+	int i;
+	uint32_t count=0;
+	for (i=0;i<ctdb->vnn_map->size;i++) {
+		struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
+		if (!(node->flags & NODE_FLAGS_INACTIVE)) {
+			count++;
+		}
+	}
+	return count;
+}
+
+
+/*
+  called when we need to process a packet. This can be a requeued packet
+  after a lockwait, or a real packet from another node
+*/
+void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+	TALLOC_CTX *tmp_ctx;
+
+	/* place the packet as a child of the tmp_ctx. We then use
+	   talloc_free() below to free it. If any of the calls want
+	   to keep it, then they will steal it somewhere else, and the
+	   talloc_free() will only free the tmp_ctx */
+	tmp_ctx = talloc_new(ctdb);
+	talloc_steal(tmp_ctx, hdr);
+
+	DEBUG(3,(__location__ " ctdb request %u of type %u length %u from "
+		 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
+		 hdr->srcnode, hdr->destnode));
+
+	switch (hdr->operation) {
+	case CTDB_REQ_CALL:
+	case CTDB_REPLY_CALL:
+	case CTDB_REQ_DMASTER:
+	case CTDB_REPLY_DMASTER:
+		/* for ctdb_call inter-node operations verify that the
+		   remote node that sent us the call is running in the
+		   same generation instance as this node
+		*/
+		if (ctdb->vnn_map->generation != hdr->generation) {
+			DEBUG(0,(__location__ " ctdb request %u"
+				" length %u from node %u to %u had an"
+				" invalid generation id:%u while our"
+				" generation id is:%u\n", 
+				 hdr->reqid, hdr->length, 
+				 hdr->srcnode, hdr->destnode, 
+				 hdr->generation, ctdb->vnn_map->generation));
+			goto done;
+		}
+	}
+
+	switch (hdr->operation) {
+	case CTDB_REQ_CALL:
+		ctdb->statistics.node.req_call++;
+		ctdb_request_call(ctdb, hdr);
+		break;
+
+	case CTDB_REPLY_CALL:
+		ctdb->statistics.node.reply_call++;
+		ctdb_reply_call(ctdb, hdr);
+		break;
+
+	case CTDB_REPLY_ERROR:
+		ctdb->statistics.node.reply_error++;
+		ctdb_reply_error(ctdb, hdr);
+		break;
+
+	case CTDB_REQ_DMASTER:
+		ctdb->statistics.node.req_dmaster++;
+		ctdb_request_dmaster(ctdb, hdr);
+		break;
+
+	case CTDB_REPLY_DMASTER:
+		ctdb->statistics.node.reply_dmaster++;
+		ctdb_reply_dmaster(ctdb, hdr);
+		break;
+
+	case CTDB_REQ_MESSAGE:
+		ctdb->statistics.node.req_message++;
+		ctdb_request_message(ctdb, hdr);
+		break;
+
+	case CTDB_REQ_CONTROL:
+		ctdb->statistics.node.req_control++;
+		ctdb_request_control(ctdb, hdr);
+		break;
+
+	case CTDB_REPLY_CONTROL:
+		ctdb->statistics.node.reply_control++;
+		ctdb_reply_control(ctdb, hdr);
+		break;
+
+	case CTDB_REQ_KEEPALIVE:
+		ctdb->statistics.keepalive_packets_recv++;
+		break;
+
+	default:
+		DEBUG(0,("%s: Packet with unknown operation %u\n", 
+			 __location__, hdr->operation));
+		break;
+	}
+
+done:
+	talloc_free(tmp_ctx);
+}
+
+
+/*
+  called by the transport layer when a node is dead
+*/
+void ctdb_node_dead(struct ctdb_node *node)
+{
+	if (node->flags & NODE_FLAGS_DISCONNECTED) {
+		DEBUG(1,("%s: node %s is already marked disconnected: %u connected\n", 
+			 node->ctdb->name, node->name, 
+			 node->ctdb->num_connected));
+		return;
+	}
+	node->ctdb->num_connected--;
+	node->flags |= NODE_FLAGS_DISCONNECTED;
+	node->rx_cnt = 0;
+	node->dead_count = 0;
+	DEBUG(1,("%s: node %s is dead: %u connected\n", 
+		 node->ctdb->name, node->name, node->ctdb->num_connected));
+	ctdb_daemon_cancel_controls(node->ctdb, node);
+}
+
+/*
+  called by the transport layer when a node is connected
+*/
+void ctdb_node_connected(struct ctdb_node *node)
+{
+	if (!(node->flags & NODE_FLAGS_DISCONNECTED)) {
+		DEBUG(1,("%s: node %s is already marked connected: %u connected\n", 
+			 node->ctdb->name, node->name, 
+			 node->ctdb->num_connected));
+		return;
+	}
+	node->ctdb->num_connected++;
+	node->dead_count = 0;
+	node->flags &= ~NODE_FLAGS_DISCONNECTED;
+	DEBUG(1,("%s: connected to %s - %u connected\n", 
+		 node->ctdb->name, node->name, node->ctdb->num_connected));
+}
+
+struct queue_next {
+	struct ctdb_context *ctdb;
+	struct ctdb_req_header *hdr;
+};
+
+
+/*
+  trigered when a deferred packet is due
+ */
+static void queue_next_trigger(struct event_context *ev, struct timed_event *te, 
+			       struct timeval t, void *private_data)
+{
+	struct queue_next *q = talloc_get_type(private_data, struct queue_next);
+	ctdb_input_pkt(q->ctdb, q->hdr);
+	talloc_free(q);
+}	
+
+/*
+  defer a packet, so it is processed on the next event loop
+  this is used for sending packets to ourselves
+ */
+static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+	struct queue_next *q;
+	q = talloc(ctdb, struct queue_next);
+	if (q == NULL) {
+		DEBUG(0,(__location__ " Failed to allocate deferred packet\n"));
+		return;
+	}
+	q->ctdb = ctdb;
+	q->hdr = talloc_memdup(ctdb, hdr, hdr->length);
+	if (q->hdr == NULL) {
+		DEBUG(0,("Error copying deferred packet to self\n"));
+		return;
+	}
+#if 0
+	/* use this to put packets directly into our recv function */
+	ctdb_input_pkt(q->ctdb, q->hdr);
+#else
+	event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
+#endif
+}
+
+
+/*
+  broadcast a packet to all nodes
+*/
+static void ctdb_broadcast_packet_all(struct ctdb_context *ctdb, 
+				      struct ctdb_req_header *hdr)
+{
+	int i;
+	for (i=0;i<ctdb->num_nodes;i++) {
+		hdr->destnode = ctdb->nodes[i]->vnn;
+		ctdb_queue_packet(ctdb, hdr);
+	}
+}
+
+/*
+  broadcast a packet to all nodes in the current vnnmap
+*/
+static void ctdb_broadcast_packet_vnnmap(struct ctdb_context *ctdb, 
+					 struct ctdb_req_header *hdr)
+{
+	int i;
+	for (i=0;i<ctdb->vnn_map->size;i++) {
+		hdr->destnode = ctdb->vnn_map->map[i];
+		ctdb_queue_packet(ctdb, hdr);
+	}
+}
+
+/*
+  broadcast a packet to all connected nodes
+*/
+static void ctdb_broadcast_packet_connected(struct ctdb_context *ctdb, 
+					    struct ctdb_req_header *hdr)
+{
+	int i;
+	for (i=0;i<ctdb->num_nodes;i++) {
+		if (!(ctdb->nodes[i]->flags & NODE_FLAGS_DISCONNECTED)) {
+			hdr->destnode = ctdb->nodes[i]->vnn;
+			ctdb_queue_packet(ctdb, hdr);
+		}
+	}
+}
+
+/*
+  queue a packet or die
+*/
+void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+	struct ctdb_node *node;
+
+	switch (hdr->destnode) {
+	case CTDB_BROADCAST_ALL:
+		ctdb_broadcast_packet_all(ctdb, hdr);
+		return;
+	case CTDB_BROADCAST_VNNMAP:
+		ctdb_broadcast_packet_vnnmap(ctdb, hdr);
+		return;
+	case CTDB_BROADCAST_CONNECTED:
+		ctdb_broadcast_packet_connected(ctdb, hdr);
+		return;
+	}
+
+	ctdb->statistics.node_packets_sent++;
+
+	if (!ctdb_validate_vnn(ctdb, hdr->destnode)) {
+	  	DEBUG(0,(__location__ " cant send to node %u that does not exist\n", 
+			 hdr->destnode));
+		return;
+	}
+
+	node = ctdb->nodes[hdr->destnode];
+
+	if (hdr->destnode == ctdb->vnn) {
+		ctdb_defer_packet(ctdb, hdr);
+	} else {
+		node->tx_cnt++;
+		if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
+			ctdb_fatal(ctdb, "Unable to queue packet\n");
+		}
+	}
+}
+
+
diff --git a/source4/cluster/ctdb/server/ctdb_takeover.c b/source4/cluster/ctdb/server/ctdb_takeover.c
new file mode 100644
index 0000000000..42a23808dd
--- /dev/null
+++ b/source4/cluster/ctdb/server/ctdb_takeover.c
@@ -0,0 +1,822 @@
+/* 
+   ctdb recovery code
+
+   Copyright (C) Ronnie Sahlberg  2007
+   Copyright (C) Andrew Tridgell  2007
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/tdb/include/tdb.h"
+#include "lib/util/dlinklist.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "system/wait.h"
+#include "../include/ctdb_private.h"
+
+
+#define TAKEOVER_TIMEOUT() timeval_current_ofs(ctdb->tunable.takeover_timeout,0)
+
+#define CTDB_ARP_INTERVAL 1
+#define CTDB_ARP_REPEAT   3
+
+struct ctdb_takeover_arp {
+	struct ctdb_context *ctdb;
+	uint32_t count;
+	struct sockaddr_in sin;
+	struct ctdb_tcp_list *tcp_list;
+};
+
+/*
+  lists of tcp endpoints
+ */
+struct ctdb_tcp_list {
+	struct ctdb_tcp_list *prev, *next;
+	uint32_t vnn;
+	struct sockaddr_in saddr;
+	struct sockaddr_in daddr;
+};
+
+
+/*
+  list of clients to kill on IP release
+ */
+struct ctdb_client_ip {
+	struct ctdb_client_ip *prev, *next;
+	struct ctdb_context *ctdb;
+	struct sockaddr_in ip;
+	uint32_t client_id;
+};
+
+
+/*
+  send a gratuitous arp
+ */
+static void ctdb_control_send_arp(struct event_context *ev, struct timed_event *te, 
+				  struct timeval t, void *private_data)
+{
+	struct ctdb_takeover_arp *arp = talloc_get_type(private_data, 
+							struct ctdb_takeover_arp);
+	int ret;
+	struct ctdb_tcp_list *tcp;
+
+	ret = ctdb_sys_send_arp(&arp->sin, arp->ctdb->takeover.interface);
+	if (ret != 0) {
+		DEBUG(0,(__location__ " sending of arp failed (%s)\n", strerror(errno)));
+	}
+
+	for (tcp=arp->tcp_list;tcp;tcp=tcp->next) {
+		DEBUG(2,("sending tcp tickle ack for %u->%s:%u\n",
+			 (unsigned)ntohs(tcp->daddr.sin_port), 
+			 inet_ntoa(tcp->saddr.sin_addr),
+			 (unsigned)ntohs(tcp->saddr.sin_port)));
+		ret = ctdb_sys_send_tcp(&tcp->saddr, &tcp->daddr, 0, 0, 0);
+		if (ret != 0) {
+			DEBUG(0,(__location__ " Failed to send tcp tickle ack for %s\n",
+				 inet_ntoa(tcp->saddr.sin_addr)));
+		}
+	}
+
+	arp->count++;
+
+	if (arp->count == CTDB_ARP_REPEAT) {
+		talloc_free(arp);
+		return;
+	}
+	
+	event_add_timed(arp->ctdb->ev, arp->ctdb->takeover.last_ctx, 
+			timeval_current_ofs(CTDB_ARP_INTERVAL, 0), 
+			ctdb_control_send_arp, arp);
+}
+
+struct takeover_callback_state {
+	struct ctdb_req_control *c;
+	struct sockaddr_in *sin;
+};
+
+/*
+  called when takeip event finishes
+ */
+static void takeover_ip_callback(struct ctdb_context *ctdb, int status, 
+				 void *private_data)
+{
+	struct takeover_callback_state *state = 
+		talloc_get_type(private_data, struct takeover_callback_state);
+	struct ctdb_takeover_arp *arp;
+	char *ip = inet_ntoa(state->sin->sin_addr);
+	struct ctdb_tcp_list *tcp;
+
+	ctdb_start_monitoring(ctdb);
+
+	if (status != 0) {
+		DEBUG(0,(__location__ " Failed to takeover IP %s on interface %s\n",
+			 ip, ctdb->takeover.interface));
+		ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
+		talloc_free(state);
+		return;
+	}
+
+	if (!ctdb->takeover.last_ctx) {
+		ctdb->takeover.last_ctx = talloc_new(ctdb);
+		if (!ctdb->takeover.last_ctx) goto failed;
+	}
+
+	arp = talloc_zero(ctdb->takeover.last_ctx, struct ctdb_takeover_arp);
+	if (!arp) goto failed;
+	
+	arp->ctdb = ctdb;
+	arp->sin = *state->sin;
+
+	/* add all of the known tcp connections for this IP to the
+	   list of tcp connections to send tickle acks for */
+	for (tcp=ctdb->tcp_list;tcp;tcp=tcp->next) {
+		if (state->sin->sin_addr.s_addr == tcp->daddr.sin_addr.s_addr) {
+			struct ctdb_tcp_list *t2 = talloc(arp, struct ctdb_tcp_list);
+			if (t2 == NULL) goto failed;
+			*t2 = *tcp;
+			DLIST_ADD(arp->tcp_list, t2);
+		}
+	}
+
+	event_add_timed(arp->ctdb->ev, arp->ctdb->takeover.last_ctx, 
+			timeval_zero(), ctdb_control_send_arp, arp);
+
+	/* the control succeeded */
+	ctdb_request_control_reply(ctdb, state->c, NULL, 0, NULL);
+	talloc_free(state);
+	return;
+
+failed:
+	ctdb_request_control_reply(ctdb, state->c, NULL, -1, NULL);
+	talloc_free(state);
+	return;
+}
+
+/*
+  take over an ip address
+ */
+int32_t ctdb_control_takeover_ip(struct ctdb_context *ctdb, 
+				 struct ctdb_req_control *c,
+				 TDB_DATA indata, 
+				 bool *async_reply)
+{
+	int ret;
+	struct takeover_callback_state *state;
+	struct ctdb_public_ip *pip = (struct ctdb_public_ip *)indata.dptr;
+	char *ip = inet_ntoa(pip->sin.sin_addr);
+
+
+	/* update out node table */
+	ctdb->nodes[pip->vnn]->takeover_vnn = pip->takeover_vnn;
+
+	/* if our kernel already has this IP, do nothing */
+	if (ctdb_sys_have_ip(ip)) {
+		return 0;
+	}
+
+	state = talloc(ctdb, struct takeover_callback_state);
+	CTDB_NO_MEMORY(ctdb, state);
+
+	state->c = talloc_steal(ctdb, c);
+	state->sin = talloc(ctdb, struct sockaddr_in);       
+	CTDB_NO_MEMORY(ctdb, state->sin);
+	*state->sin = pip->sin;
+
+	DEBUG(0,("Takover of IP %s/%u on interface %s\n", 
+		 ip, ctdb->nodes[ctdb->vnn]->public_netmask_bits, 
+		 ctdb->takeover.interface));
+
+	ctdb_stop_monitoring(ctdb);
+
+	ret = ctdb_event_script_callback(ctdb, 
+					 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
+					 state, takeover_ip_callback, state,
+					 "takeip %s %s %u",
+					 ctdb->takeover.interface, 
+					 ip,
+					 ctdb->nodes[ctdb->vnn]->public_netmask_bits);
+	if (ret != 0) {
+		DEBUG(0,(__location__ " Failed to takeover IP %s on interface %s\n",
+			 ip, ctdb->takeover.interface));
+		talloc_free(state);
+		return -1;
+	}
+
+	/* tell ctdb_control.c that we will be replying asynchronously */
+	*async_reply = true;
+
+	return 0;
+}
+
+/*
+  kill any clients that are registered with a IP that is being released
+ */
+static void release_kill_clients(struct ctdb_context *ctdb, struct in_addr in)
+{
+	struct ctdb_client_ip *ip;
+
+	for (ip=ctdb->client_ip_list; ip; ip=ip->next) {
+		if (ip->ip.sin_addr.s_addr == in.s_addr) {
+			struct ctdb_client *client = ctdb_reqid_find(ctdb, 
+								     ip->client_id, 
+								     struct ctdb_client);
+			if (client->pid != 0) {
+				DEBUG(0,(__location__ " Killing client pid %u for IP %s on client_id %u\n",
+					 (unsigned)client->pid, inet_ntoa(in),
+					 ip->client_id));
+				kill(client->pid, SIGKILL);
+			}
+		}
+	}
+}
+
+/*
+  called when releaseip event finishes
+ */
+static void release_ip_callback(struct ctdb_context *ctdb, int status, 
+				void *private_data)
+{
+	struct takeover_callback_state *state = 
+		talloc_get_type(private_data, struct takeover_callback_state);
+	char *ip = inet_ntoa(state->sin->sin_addr);
+	TDB_DATA data;
+	struct ctdb_tcp_list *tcp;
+
+	ctdb_start_monitoring(ctdb);
+
+	/* send a message to all clients of this node telling them
+	   that the cluster has been reconfigured and they should
+	   release any sockets on this IP */
+	data.dptr = (uint8_t *)ip;
+	data.dsize = strlen(ip)+1;
+
+	ctdb_daemon_send_message(ctdb, ctdb->vnn, CTDB_SRVID_RELEASE_IP, data);
+
+	/* kill clients that have registered with this IP */
+	release_kill_clients(ctdb, state->sin->sin_addr);
+	
+
+	/* tell other nodes about any tcp connections we were holding with this IP */
+	for (tcp=ctdb->tcp_list;tcp;tcp=tcp->next) {
+		if (tcp->vnn == ctdb->vnn && 
+		    state->sin->sin_addr.s_addr == tcp->daddr.sin_addr.s_addr) {
+			struct ctdb_control_tcp_vnn t;
+
+			t.vnn  = ctdb->vnn;
+			t.src  = tcp->saddr;
+			t.dest = tcp->daddr;
+
+			data.dptr = (uint8_t *)&t;
+			data.dsize = sizeof(t);
+
+			ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, 
+						 CTDB_CONTROL_TCP_ADD,
+						 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+		}
+	}
+
+	/* the control succeeded */
+	ctdb_request_control_reply(ctdb, state->c, NULL, 0, NULL);
+	talloc_free(state);
+}
+
+
+/*
+  release an ip address
+ */
+int32_t ctdb_control_release_ip(struct ctdb_context *ctdb, 
+				struct ctdb_req_control *c,
+				TDB_DATA indata, 
+				bool *async_reply)
+{
+	int ret;
+	struct takeover_callback_state *state;
+	struct ctdb_public_ip *pip = (struct ctdb_public_ip *)indata.dptr;
+	char *ip = inet_ntoa(pip->sin.sin_addr);
+
+	/* update out node table */
+	ctdb->nodes[pip->vnn]->takeover_vnn = pip->takeover_vnn;
+
+	if (!ctdb_sys_have_ip(ip)) {
+		return 0;
+	}
+
+	DEBUG(0,("Release of IP %s/%u on interface %s\n", 
+		 ip, ctdb->nodes[ctdb->vnn]->public_netmask_bits, 
+		 ctdb->takeover.interface));
+
+	/* stop any previous arps */
+	talloc_free(ctdb->takeover.last_ctx);
+	ctdb->takeover.last_ctx = NULL;
+
+	state = talloc(ctdb, struct takeover_callback_state);
+	CTDB_NO_MEMORY(ctdb, state);
+
+	state->c = talloc_steal(state, c);
+	state->sin = talloc(state, struct sockaddr_in);       
+	CTDB_NO_MEMORY(ctdb, state->sin);
+	*state->sin = pip->sin;
+
+	ctdb_stop_monitoring(ctdb);
+
+	ret = ctdb_event_script_callback(ctdb, 
+					 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
+					 state, release_ip_callback, state,
+					 "releaseip %s %s %u",
+					 ctdb->takeover.interface, 
+					 ip,
+					 ctdb->nodes[ctdb->vnn]->public_netmask_bits);
+	if (ret != 0) {
+		DEBUG(0,(__location__ " Failed to release IP %s on interface %s\n",
+			 ip, ctdb->takeover.interface));
+		talloc_free(state);
+		return -1;
+	}
+
+	/* tell the control that we will be reply asynchronously */
+	*async_reply = true;
+
+	return 0;
+}
+
+
+/*
+  setup the event script
+*/
+int ctdb_set_event_script(struct ctdb_context *ctdb, const char *script)
+{
+	ctdb->takeover.event_script = talloc_strdup(ctdb, script);
+	CTDB_NO_MEMORY(ctdb, ctdb->takeover.event_script);
+	return 0;
+}
+
+/*
+  setup the public address list from a file
+*/
+int ctdb_set_public_addresses(struct ctdb_context *ctdb, const char *alist)
+{
+	char **lines;
+	int nlines;
+	int i;
+
+	lines = file_lines_load(alist, &nlines, ctdb);
+	if (lines == NULL) {
+		ctdb_set_error(ctdb, "Failed to load public address list '%s'\n", alist);
+		return -1;
+	}
+	while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
+		nlines--;
+	}
+
+	if (nlines != ctdb->num_nodes) {
+		DEBUG(0,("Number of lines in %s does not match number of nodes!\n", alist));
+		talloc_free(lines);
+		return -1;
+	}
+
+	for (i=0;i<nlines;i++) {
+		char *p;
+		struct in_addr in;
+
+		ctdb->nodes[i]->public_address = talloc_strdup(ctdb->nodes[i], lines[i]);
+		CTDB_NO_MEMORY(ctdb, ctdb->nodes[i]->public_address);
+		ctdb->nodes[i]->takeover_vnn = -1;
+
+		/* see if they supplied a netmask length */
+		p = strchr(ctdb->nodes[i]->public_address, '/');
+		if (!p) {
+			DEBUG(0,("You must supply a netmask for public address %s\n",
+				 ctdb->nodes[i]->public_address));
+			return -1;
+		}
+		*p = 0;
+		ctdb->nodes[i]->public_netmask_bits = atoi(p+1);
+
+		if (ctdb->nodes[i]->public_netmask_bits > 32) {
+			DEBUG(0, ("Illegal netmask for IP %s\n", ctdb->nodes[i]->public_address));
+			return -1;
+		}
+
+		if (inet_aton(ctdb->nodes[i]->public_address, &in) == 0) {
+			DEBUG(0,("Badly formed IP '%s' in public address list\n", ctdb->nodes[i]->public_address));
+			return -1;
+		}
+	}
+
+	talloc_free(lines);
+	return 0;
+}
+
+/*
+  see if two IPs are on the same subnet
+ */
+static bool ctdb_same_subnet(const char *ip1, const char *ip2, uint8_t netmask_bits)
+{
+	struct in_addr in1, in2;
+	uint32_t mask;
+
+	inet_aton(ip1, &in1);
+	inet_aton(ip2, &in2);
+
+	mask = ~((1LL<<(32-netmask_bits))-1);
+
+	if ((ntohl(in1.s_addr) & mask) != (ntohl(in2.s_addr) & mask)) {
+		return false;
+	}
+
+	return true;
+}
+
+
+/*
+  try to find an available node to take a given nodes IP that meets the
+  criterion given by the flags
+ */
+static void ctdb_takeover_find_node(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap,
+				    int start_node, uint32_t mask_flags)
+{
+	int j;
+	for (j=(start_node+1)%nodemap->num;
+	     j != start_node;
+	     j=(j+1)%nodemap->num) {
+		if (!(nodemap->nodes[j].flags & mask_flags) &&
+		    ctdb_same_subnet(ctdb->nodes[j]->public_address, 
+				     ctdb->nodes[start_node]->public_address, 
+				     ctdb->nodes[j]->public_netmask_bits)) {
+			ctdb->nodes[start_node]->takeover_vnn = nodemap->nodes[j].vnn;
+			break;
+		}
+	}
+}
+
+
+/*
+  make any IP alias changes for public addresses that are necessary 
+ */
+int ctdb_takeover_run(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
+{
+	int i, j;
+	int ret;
+	struct ctdb_public_ip ip;
+
+	ZERO_STRUCT(ip);
+
+	/* Work out which node will look after each public IP.
+	 * takeover_node cycles over the nodes and is incremented each time a 
+	 * node has been assigned to take over for another node.
+	 * This spreads the failed nodes out across the remaining
+	 * nodes more evenly
+	 */
+	for (i=0;i<nodemap->num;i++) {
+		if (!(nodemap->nodes[i].flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED))) {
+			ctdb->nodes[i]->takeover_vnn = nodemap->nodes[i].vnn;
+		} else {
+			ctdb->nodes[i]->takeover_vnn = (uint32_t)-1;	
+
+			ctdb_takeover_find_node(ctdb, nodemap, i, NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED);
+			
+			/* if no enabled node can take it, then we
+			   might as well use any active node. It
+			   probably means that some subsystem (such as
+			   NFS) is sick on all nodes. Best we can do
+			   is to keep the other services up. */
+			if (ctdb->nodes[i]->takeover_vnn == (uint32_t)-1) {
+				ctdb_takeover_find_node(ctdb, nodemap, i, NODE_FLAGS_INACTIVE);
+			}
+
+			if (ctdb->nodes[i]->takeover_vnn == (uint32_t)-1) {
+				DEBUG(0,(__location__ " No node available on same network to take %s\n",
+					 ctdb->nodes[i]->public_address));
+			}
+		}
+	}	
+
+	/* at this point ctdb->nodes[i]->takeover_vnn is the vnn which will own each IP */
+
+	/* now tell all nodes to delete any alias that they should not
+	   have.  This will be a NOOP on nodes that don't currently
+	   hold the given alias */
+	for (i=0;i<nodemap->num;i++) {
+		/* don't talk to unconnected nodes, but do talk to banned nodes */
+		if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+			continue;
+		}
+
+		/* tell this node to delete all of the aliases that it should not have */
+		for (j=0;j<nodemap->num;j++) {
+			if (ctdb->nodes[j]->takeover_vnn != nodemap->nodes[i].vnn) {
+				ip.vnn = j;
+				ip.takeover_vnn = ctdb->nodes[j]->takeover_vnn;
+				ip.sin.sin_family = AF_INET;
+				inet_aton(ctdb->nodes[j]->public_address, &ip.sin.sin_addr);
+
+				ret = ctdb_ctrl_release_ip(ctdb, TAKEOVER_TIMEOUT(),
+							   nodemap->nodes[i].vnn, 
+							   &ip);
+				if (ret != 0) {
+					DEBUG(0,("Failed to tell vnn %u to release IP %s\n",
+						 nodemap->nodes[i].vnn,
+						 ctdb->nodes[j]->public_address));
+					return -1;
+				}
+			}
+		}
+	}
+
+	/* tell all nodes to get their own IPs */
+	for (i=0;i<nodemap->num;i++) {
+		if (ctdb->nodes[i]->takeover_vnn == -1) {
+			/* this IP won't be taken over */
+			continue;
+		}
+		ip.vnn = i;
+		ip.takeover_vnn = ctdb->nodes[i]->takeover_vnn;
+		ip.sin.sin_family = AF_INET;
+		inet_aton(ctdb->nodes[i]->public_address, &ip.sin.sin_addr);
+
+		ret = ctdb_ctrl_takeover_ip(ctdb, TAKEOVER_TIMEOUT(), 
+					    ctdb->nodes[i]->takeover_vnn, 
+					    &ip);
+		if (ret != 0) {
+			DEBUG(0,("Failed asking vnn %u to take over IP %s\n",
+				 ctdb->nodes[i]->takeover_vnn, 
+				 ctdb->nodes[i]->public_address));
+			return -1;
+		}
+	}
+
+	return 0;
+}
+
+
+/*
+  destroy a ctdb_client_ip structure
+ */
+static int ctdb_client_ip_destructor(struct ctdb_client_ip *ip)
+{
+	DLIST_REMOVE(ip->ctdb->client_ip_list, ip);
+	return 0;
+}
+
+/*
+  called by a client to inform us of a TCP connection that it is managing
+  that should tickled with an ACK when IP takeover is done
+ */
+int32_t ctdb_control_tcp_client(struct ctdb_context *ctdb, uint32_t client_id, uint32_t vnn,
+				TDB_DATA indata)
+{
+	struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
+	struct ctdb_control_tcp *p = (struct ctdb_control_tcp *)indata.dptr;
+	struct ctdb_tcp_list *tcp;
+	struct ctdb_control_tcp_vnn t;
+	int ret;
+	TDB_DATA data;
+	struct ctdb_client_ip *ip;
+
+	ip = talloc(client, struct ctdb_client_ip);
+	CTDB_NO_MEMORY(ctdb, ip);
+
+	ip->ctdb = ctdb;
+	ip->ip = p->dest;
+	ip->client_id = client_id;
+	talloc_set_destructor(ip, ctdb_client_ip_destructor);
+	DLIST_ADD(ctdb->client_ip_list, ip);
+
+	tcp = talloc(client, struct ctdb_tcp_list);
+	CTDB_NO_MEMORY(ctdb, tcp);
+
+	tcp->vnn   = vnn;
+	tcp->saddr = p->src;
+	tcp->daddr = p->dest;
+
+	DLIST_ADD(client->tcp_list, tcp);
+
+	t.vnn  = vnn;
+	t.src  = p->src;
+	t.dest = p->dest;
+
+	data.dptr = (uint8_t *)&t;
+	data.dsize = sizeof(t);
+
+	/* tell all nodes about this tcp connection */
+	ret = ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, 
+				       CTDB_CONTROL_TCP_ADD,
+				       0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+	if (ret != 0) {
+		DEBUG(0,(__location__ " Failed to send CTDB_CONTROL_TCP_ADD\n"));
+		return -1;
+	}
+
+	return 0;
+}
+
+/*
+  see if two sockaddr_in are the same
+ */
+static bool same_sockaddr_in(struct sockaddr_in *in1, struct sockaddr_in *in2)
+{
+	return in1->sin_family == in2->sin_family &&
+		in1->sin_port == in2->sin_port &&
+		in1->sin_addr.s_addr == in2->sin_addr.s_addr;
+}
+
+/*
+  find a tcp address on a list
+ */
+static struct ctdb_tcp_list *ctdb_tcp_find(struct ctdb_tcp_list *list, 
+					   struct ctdb_tcp_list *tcp)
+{
+	while (list) {
+		if (same_sockaddr_in(&list->saddr, &tcp->saddr) &&
+		    same_sockaddr_in(&list->daddr, &tcp->daddr)) {
+			return list;
+		}
+		list = list->next;
+	}
+	return NULL;
+}
+
+/*
+  called by a daemon to inform us of a TCP connection that one of its
+  clients managing that should tickled with an ACK when IP takeover is
+  done
+ */
+int32_t ctdb_control_tcp_add(struct ctdb_context *ctdb, TDB_DATA indata)
+{
+	struct ctdb_control_tcp_vnn *p = (struct ctdb_control_tcp_vnn *)indata.dptr;
+	struct ctdb_tcp_list *tcp;
+
+	tcp = talloc(ctdb, struct ctdb_tcp_list);
+	CTDB_NO_MEMORY(ctdb, tcp);
+
+	tcp->vnn   = p->vnn;
+	tcp->saddr = p->src;
+	tcp->daddr = p->dest;
+
+	if (NULL == ctdb_tcp_find(ctdb->tcp_list, tcp)) {
+		DLIST_ADD(ctdb->tcp_list, tcp);
+		DEBUG(2,("Added tickle info for %s:%u from vnn %u\n",
+			 inet_ntoa(tcp->daddr.sin_addr), ntohs(tcp->daddr.sin_port),
+			 tcp->vnn));
+	} else {
+		DEBUG(4,("Already had tickle info for %s:%u from vnn %u\n",
+			 inet_ntoa(tcp->daddr.sin_addr), ntohs(tcp->daddr.sin_port),
+			 tcp->vnn));
+	}
+
+	return 0;
+}
+
+/*
+  called by a daemon to inform us of a TCP connection that one of its
+  clients managing that should tickled with an ACK when IP takeover is
+  done
+ */
+int32_t ctdb_control_tcp_remove(struct ctdb_context *ctdb, TDB_DATA indata)
+{
+	struct ctdb_control_tcp_vnn *p = (struct ctdb_control_tcp_vnn *)indata.dptr;
+	struct ctdb_tcp_list t, *tcp;
+
+	t.vnn   = p->vnn;
+	t.saddr = p->src;
+	t.daddr = p->dest;
+
+	tcp = ctdb_tcp_find(ctdb->tcp_list, &t);
+	if (tcp) {
+		DEBUG(2,("Removed tickle info for %s:%u from vnn %u\n",
+			 inet_ntoa(tcp->daddr.sin_addr), ntohs(tcp->daddr.sin_port),
+			 tcp->vnn));
+		DLIST_REMOVE(ctdb->tcp_list, tcp);
+		talloc_free(tcp);
+	}
+
+	return 0;
+}
+
+
+/*
+  called when a daemon restarts - wipes all tcp entries from that vnn
+ */
+int32_t ctdb_control_startup(struct ctdb_context *ctdb, uint32_t vnn)
+{
+	struct ctdb_tcp_list *tcp, *next;	
+	for (tcp=ctdb->tcp_list;tcp;tcp=next) {
+		next = tcp->next;
+		if (tcp->vnn == vnn) {
+			DLIST_REMOVE(ctdb->tcp_list, tcp);
+			talloc_free(tcp);
+		}
+
+		/* and tell the new guy about any that he should have
+		   from us */
+		if (tcp->vnn == ctdb->vnn) {
+			struct ctdb_control_tcp_vnn t;
+			TDB_DATA data;
+
+			t.vnn  = tcp->vnn;
+			t.src  = tcp->saddr;
+			t.dest = tcp->daddr;
+
+			data.dptr = (uint8_t *)&t;
+			data.dsize = sizeof(t);
+
+			ctdb_daemon_send_control(ctdb, vnn, 0, 
+						 CTDB_CONTROL_TCP_ADD,
+						 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+		}
+	}
+	return 0;
+}
+
+
+/*
+  called when a client structure goes away - hook to remove
+  elements from the tcp_list in all daemons
+ */
+void ctdb_takeover_client_destructor_hook(struct ctdb_client *client)
+{
+	while (client->tcp_list) {
+		TDB_DATA data;
+		struct ctdb_control_tcp_vnn p;
+		struct ctdb_tcp_list *tcp = client->tcp_list;
+		DLIST_REMOVE(client->tcp_list, tcp);
+		p.vnn = tcp->vnn;
+		p.src = tcp->saddr;
+		p.dest = tcp->daddr;
+		data.dptr = (uint8_t *)&p;
+		data.dsize = sizeof(p);
+		ctdb_daemon_send_control(client->ctdb, CTDB_BROADCAST_CONNECTED, 0, 
+					 CTDB_CONTROL_TCP_REMOVE,
+					 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+		talloc_free(tcp);
+	}
+}
+
+
+/*
+  release all IPs on shutdown
+ */
+void ctdb_release_all_ips(struct ctdb_context *ctdb)
+{
+	int i;
+
+	if (!ctdb->takeover.enabled) {
+		return;
+	}
+
+	for (i=0;i<ctdb->num_nodes;i++) {
+		struct ctdb_node *node = ctdb->nodes[i];
+		if (ctdb_sys_have_ip(node->public_address)) {
+			struct in_addr in;
+			ctdb_event_script(ctdb, "releaseip %s %s %u",
+					  ctdb->takeover.interface, 
+					  node->public_address,
+					  node->public_netmask_bits);
+			if (inet_aton(node->public_address, &in) != 0) {
+				release_kill_clients(ctdb, in);
+			}
+		}
+	}
+}
+
+
+/*
+  get list of public IPs
+ */
+int32_t ctdb_control_get_public_ips(struct ctdb_context *ctdb, struct ctdb_req_control *c, TDB_DATA *outdata)
+{
+	int i, len;
+	struct ctdb_all_public_ips *ips;
+
+	len = offsetof(struct ctdb_all_public_ips, ips) + ctdb->num_nodes*sizeof(struct ctdb_public_ip);
+
+	ips = talloc_zero_size(outdata, len);
+	CTDB_NO_MEMORY(ctdb, ips);
+
+	outdata->dsize = len;
+	outdata->dptr  = (uint8_t *)ips;
+
+	ips->num = ctdb->num_nodes;
+	for(i=0;i<ctdb->num_nodes;i++){
+		ips->ips[i].vnn = i;
+		ips->ips[i].takeover_vnn = ctdb->nodes[i]->takeover_vnn;
+		ips->ips[i].sin.sin_family = AF_INET;
+		if (ctdb->nodes[i]->public_address) {
+			inet_aton(ctdb->nodes[i]->public_address, &ips->ips[i].sin.sin_addr);
+		}
+	}
+
+	return 0;
+}
diff --git a/source4/cluster/ctdb/server/ctdb_traverse.c b/source4/cluster/ctdb/server/ctdb_traverse.c
new file mode 100644
index 0000000000..d44c27401a
--- /dev/null
+++ b/source4/cluster/ctdb/server/ctdb_traverse.c
@@ -0,0 +1,463 @@
+/* 
+   efficient async ctdb traverse
+
+   Copyright (C) Andrew Tridgell  2007
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/filesys.h"
+#include "system/wait.h"
+#include "db_wrap.h"
+#include "lib/tdb/include/tdb.h"
+#include "../include/ctdb_private.h"
+
+typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
+
+/*
+  handle returned to caller - freeing this handler will kill the child and 
+  terminate the traverse
+ */
+struct ctdb_traverse_local_handle {
+	struct ctdb_db_context *ctdb_db;
+	int fd[2];
+	pid_t child;
+	void *private_data;
+	ctdb_traverse_fn_t callback;
+	struct timeval start_time;
+	struct ctdb_queue *queue;
+};
+
+/*
+  called when data is available from the child
+ */
+static void ctdb_traverse_local_handler(uint8_t *rawdata, size_t length, void *private_data)
+{
+	struct ctdb_traverse_local_handle *h = talloc_get_type(private_data, 
+							       struct ctdb_traverse_local_handle);
+	TDB_DATA key, data;
+	ctdb_traverse_fn_t callback = h->callback;
+	void *p = h->private_data;
+	struct ctdb_rec_data *tdata = (struct ctdb_rec_data *)rawdata;
+
+	if (rawdata == NULL || length < 4 || length != tdata->length) {
+		/* end of traverse */
+		talloc_free(h);
+		callback(p, tdb_null, tdb_null);
+		return;
+	}
+
+	key.dsize = tdata->keylen;
+	key.dptr  = &tdata->data[0];
+	data.dsize = tdata->datalen;
+	data.dptr = &tdata->data[tdata->keylen];
+
+	callback(p, key, data);	
+}
+
+/*
+  destroy a in-flight traverse operation
+ */
+static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
+{
+	kill(h->child, SIGKILL);
+	waitpid(h->child, NULL, 0);
+	return 0;
+}
+
+/*
+  callback from tdb_traverse_read()
+ */
+static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
+{
+	struct ctdb_traverse_local_handle *h = talloc_get_type(p, 
+							       struct ctdb_traverse_local_handle);
+	struct ctdb_rec_data *d;
+	struct ctdb_ltdb_header *hdr;
+
+	/* filter out non-authoritative and zero-length records */
+	hdr = (struct ctdb_ltdb_header *)data.dptr;
+	if (data.dsize <= sizeof(struct ctdb_ltdb_header) ||
+	    hdr->dmaster != h->ctdb_db->ctdb->vnn) {
+		return 0;
+	}
+
+	d = ctdb_marshall_record(h, 0, key, data);
+	if (d == NULL) {
+		/* error handling is tricky in this child code .... */
+		return -1;
+	}
+
+	if (write(h->fd[1], (uint8_t *)d, d->length) != d->length) {
+		return -1;
+	}
+	return 0;
+}
+
+
+/*
+  setup a non-blocking traverse of a local ltdb. The callback function
+  will be called on every record in the local ltdb. To stop the
+  travserse, talloc_free() the travserse_handle.
+
+  The traverse is finished when the callback is called with tdb_null for key and data
+ */
+static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
+							      ctdb_traverse_fn_t callback,
+							      void *private_data)
+{
+	struct ctdb_traverse_local_handle *h;
+	int ret;
+
+	h = talloc_zero(ctdb_db, struct ctdb_traverse_local_handle);
+	if (h == NULL) {
+		return NULL;
+	}
+
+	ret = pipe(h->fd);
+
+	if (ret != 0) {
+		talloc_free(h);
+		return NULL;
+	}
+
+	h->child = fork();
+
+	if (h->child == (pid_t)-1) {
+		close(h->fd[0]);
+		close(h->fd[1]);
+		talloc_free(h);
+		return NULL;
+	}
+
+	h->callback = callback;
+	h->private_data = private_data;
+	h->ctdb_db = ctdb_db;
+
+	if (h->child == 0) {
+		/* start the traverse in the child */
+		close(h->fd[0]);
+		tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
+		_exit(0);
+	}
+
+	close(h->fd[1]);
+	talloc_set_destructor(h, traverse_local_destructor);
+
+	/*
+	  setup a packet queue between the child and the parent. This
+	  copes with all the async and packet boundary issues
+	 */
+	h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h);
+	if (h->queue == NULL) {
+		talloc_free(h);
+		return NULL;
+	}
+
+	h->start_time = timeval_current();
+
+	return h;
+}
+
+
+struct ctdb_traverse_all_handle {
+	struct ctdb_context *ctdb;
+	uint32_t reqid;
+	ctdb_traverse_fn_t callback;
+	void *private_data;
+	uint32_t null_count;
+};
+
+/*
+  destroy a traverse_all op
+ */
+static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
+{
+	ctdb_reqid_remove(state->ctdb, state->reqid);
+	return 0;
+}
+
+struct ctdb_traverse_all {
+	uint32_t db_id;
+	uint32_t reqid;
+	uint32_t vnn;
+};
+
+/* called when a traverse times out */
+static void ctdb_traverse_all_timeout(struct event_context *ev, struct timed_event *te, 
+				      struct timeval t, void *private_data)
+{
+	struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
+
+	state->ctdb->statistics.timeouts.traverse++;
+
+	state->callback(state->private_data, tdb_null, tdb_null);
+	talloc_free(state);
+}
+
+/*
+  setup a cluster-wide non-blocking traverse of a ctdb. The
+  callback function will be called on every record in the local
+  ltdb. To stop the travserse, talloc_free() the traverse_handle.
+
+  The traverse is finished when the callback is called with tdb_null
+  for key and data
+ */
+static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
+								 ctdb_traverse_fn_t callback,
+								 void *private_data)
+{
+	struct ctdb_traverse_all_handle *state;
+	struct ctdb_context *ctdb = ctdb_db->ctdb;
+	int ret;
+	TDB_DATA data;
+	struct ctdb_traverse_all r;
+
+	state = talloc(ctdb_db, struct ctdb_traverse_all_handle);
+	if (state == NULL) {
+		return NULL;
+	}
+
+	state->ctdb = ctdb;
+	state->reqid = ctdb_reqid_new(ctdb_db->ctdb, state);
+	state->callback = callback;
+	state->private_data = private_data;
+	state->null_count = 0;
+	
+	talloc_set_destructor(state, ctdb_traverse_all_destructor);
+
+	r.db_id = ctdb_db->db_id;
+	r.reqid = state->reqid;
+	r.vnn   = ctdb->vnn;
+
+	data.dptr = (uint8_t *)&r;
+	data.dsize = sizeof(r);
+
+	/* tell all the nodes in the cluster to start sending records to this node */
+	ret = ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0, 
+				       CTDB_CONTROL_TRAVERSE_ALL,
+				       0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+	if (ret != 0) {
+		talloc_free(state);
+		return NULL;
+	}
+
+	/* timeout the traverse */
+	event_add_timed(ctdb->ev, state, 
+			timeval_current_ofs(ctdb->tunable.traverse_timeout, 0), 
+			ctdb_traverse_all_timeout, state);
+
+	return state;
+}
+
+struct traverse_all_state {
+	struct ctdb_context *ctdb;
+	struct ctdb_traverse_local_handle *h;
+	uint32_t reqid;
+	uint32_t srcnode;
+};
+
+/*
+  called for each record during a traverse all 
+ */
+static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
+{
+	struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
+	int ret;
+	struct ctdb_rec_data *d;
+	TDB_DATA cdata;
+
+	d = ctdb_marshall_record(state, state->reqid, key, data);
+	if (d == NULL) {
+		/* darn .... */
+		DEBUG(0,("Out of memory in traverse_all_callback\n"));
+		return;
+	}
+
+	cdata.dptr = (uint8_t *)d;
+	cdata.dsize = d->length;
+
+	ret = ctdb_daemon_send_control(state->ctdb, state->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
+				       0, CTDB_CTRL_FLAG_NOREPLY, cdata, NULL, NULL);
+	if (ret != 0) {
+		DEBUG(0,("Failed to send traverse data\n"));
+	}
+
+	if (key.dsize == 0 && data.dsize == 0) {
+		/* we're done */
+		talloc_free(state);
+	}
+}
+
+/*
+  called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
+  setup a traverse of our local ltdb, sending the records as
+  CTDB_CONTROL_TRAVERSE_DATA records back to the originator
+ */
+int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
+{
+	struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
+	struct traverse_all_state *state;
+	struct ctdb_db_context *ctdb_db;
+
+	if (data.dsize != sizeof(struct ctdb_traverse_all)) {
+		DEBUG(0,("Invalid size in ctdb_control_traverse_all\n"));
+		return -1;
+	}
+
+	ctdb_db = find_ctdb_db(ctdb, c->db_id);
+	if (ctdb_db == NULL) {
+		return -1;
+	}
+
+	state = talloc(ctdb_db, struct traverse_all_state);
+	if (state == NULL) {
+		return -1;
+	}
+
+	state->reqid = c->reqid;
+	state->srcnode = c->vnn;
+	state->ctdb = ctdb;
+
+	state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
+	if (state->h == NULL) {
+		talloc_free(state);
+		return -1;
+	}
+
+	return 0;
+}
+
+
+/*
+  called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
+  call the traverse_all callback with the record
+ */
+int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
+{
+	struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
+	struct ctdb_traverse_all_handle *state;
+	TDB_DATA key;
+	ctdb_traverse_fn_t callback;
+	void *private_data;
+
+	if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
+		DEBUG(0,("Bad record size in ctdb_control_traverse_data\n"));
+		return -1;
+	}
+
+	state = ctdb_reqid_find(ctdb, d->reqid, struct ctdb_traverse_all_handle);
+	if (state == NULL || d->reqid != state->reqid) {
+		/* traverse might have been terminated already */
+		return -1;
+	}
+
+	key.dsize = d->keylen;
+	key.dptr  = &d->data[0];
+	data.dsize = d->datalen;
+	data.dptr = &d->data[d->keylen];
+
+	if (key.dsize == 0 && data.dsize == 0) {
+		state->null_count++;
+		if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
+			return 0;
+		}
+	}
+
+	callback = state->callback;
+	private_data = state->private_data;
+
+	callback(private_data, key, data);
+	if (key.dsize == 0 && data.dsize == 0) {
+		/* we've received all of the null replies, so all
+		   nodes are finished */
+		talloc_free(state);
+	}
+	return 0;
+}	
+
+struct traverse_start_state {
+	struct ctdb_context *ctdb;
+	struct ctdb_traverse_all_handle *h;
+	uint32_t srcnode;
+	uint32_t reqid;
+	uint64_t srvid;
+};
+
+/*
+  callback which sends records as messages to the client
+ */
+static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
+{
+	struct traverse_start_state *state;
+	struct ctdb_rec_data *d;
+	TDB_DATA cdata;
+
+	state = talloc_get_type(p, struct traverse_start_state);
+
+	d = ctdb_marshall_record(state, state->reqid, key, data);
+	if (d == NULL) {
+		return;
+	}
+
+	cdata.dptr = (uint8_t *)d;
+	cdata.dsize = d->length;
+
+	ctdb_dispatch_message(state->ctdb, state->srvid, cdata);
+	if (key.dsize == 0 && data.dsize == 0) {
+		/* end of traverse */
+		talloc_free(state);
+	}
+}
+
+/*
+  start a traverse_all - called as a control from a client
+ */
+int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data, 
+				    TDB_DATA *outdata, uint32_t srcnode)
+{
+	struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
+	struct traverse_start_state *state;
+	struct ctdb_db_context *ctdb_db;
+
+	if (data.dsize != sizeof(*d)) {
+		DEBUG(0,("Bad record size in ctdb_control_traverse_start\n"));
+		return -1;
+	}
+
+	ctdb_db = find_ctdb_db(ctdb, d->db_id);
+	if (ctdb_db == NULL) {
+		return -1;
+	}
+
+	state = talloc(ctdb_db, struct traverse_start_state);
+	if (state == NULL) {
+		return -1;
+	}
+	
+	state->srcnode = srcnode;
+	state->reqid = d->reqid;
+	state->srvid = d->srvid;
+	state->ctdb = ctdb;
+
+	state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
+	if (state->h == NULL) {
+		talloc_free(state);
+		return -1;
+	}
+
+	return 0;
+}
diff --git a/source4/cluster/ctdb/server/ctdb_tunables.c b/source4/cluster/ctdb/server/ctdb_tunables.c
new file mode 100644
index 0000000000..491c965656
--- /dev/null
+++ b/source4/cluster/ctdb/server/ctdb_tunables.c
@@ -0,0 +1,163 @@
+/* 
+   ctdb tunables code
+
+   Copyright (C) Andrew Tridgell  2007
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+#include "includes.h"
+#include "../include/ctdb_private.h"
+
+static const struct {
+	const char *name;
+	uint32_t default_v;
+	size_t offset;	
+} tunable_map[] = {
+	{ "MaxRedirectCount",     3,  offsetof(struct ctdb_tunable, max_redirect_count) },
+	{ "SeqnumFrequency",      1,  offsetof(struct ctdb_tunable, seqnum_frequency) },
+	{ "ControlTimeout",      60, offsetof(struct ctdb_tunable, control_timeout) },
+	{ "TraverseTimeout",     20, offsetof(struct ctdb_tunable, traverse_timeout) },
+	{ "KeepaliveInterval",    2,  offsetof(struct ctdb_tunable, keepalive_interval) },
+	{ "KeepaliveLimit",       5,  offsetof(struct ctdb_tunable, keepalive_limit) },
+	{ "MaxLACount",           7,  offsetof(struct ctdb_tunable, max_lacount) },
+	{ "RecoverTimeout",       5,  offsetof(struct ctdb_tunable, recover_timeout) },
+	{ "RecoverInterval",      1,  offsetof(struct ctdb_tunable, recover_interval) },
+	{ "ElectionTimeout",      3,  offsetof(struct ctdb_tunable, election_timeout) },
+	{ "TakeoverTimeout",      5,  offsetof(struct ctdb_tunable, takeover_timeout) },
+	{ "MonitorInterval",     15,  offsetof(struct ctdb_tunable, monitor_interval) },
+	{ "EventScriptTimeout",  20,  offsetof(struct ctdb_tunable, script_timeout) },
+	{ "RecoveryGracePeriod", 60,  offsetof(struct ctdb_tunable, recovery_grace_period) },
+	{ "RecoveryBanPeriod",  300,  offsetof(struct ctdb_tunable, recovery_ban_period) },
+	{ "DatabaseHashSize", 10000,  offsetof(struct ctdb_tunable, database_hash_size) },
+	{ "RerecoveryTimeout",   10,  offsetof(struct ctdb_tunable, rerecovery_timeout) },
+};
+
+/*
+  set all tunables to defaults
+ */
+void ctdb_tunables_set_defaults(struct ctdb_context *ctdb)
+{
+	int i;
+	for (i=0;i<ARRAY_SIZE(tunable_map);i++) {
+		*(uint32_t *)(tunable_map[i].offset + (uint8_t*)&ctdb->tunable) = tunable_map[i].default_v;
+	}
+}
+
+
+/*
+  get a tunable
+ */
+int32_t ctdb_control_get_tunable(struct ctdb_context *ctdb, TDB_DATA indata, 
+				 TDB_DATA *outdata)
+{
+	struct ctdb_control_get_tunable *t = 
+		(struct ctdb_control_get_tunable *)indata.dptr;
+	char *name;
+	uint32_t val;
+	int i;
+
+	if (indata.dsize < sizeof(*t) ||
+	    t->length > indata.dsize - offsetof(struct ctdb_control_get_tunable, name)) {
+		DEBUG(0,("Bad indata in ctdb_control_get_tunable\n"));
+		return -1;
+	}
+
+	name = talloc_strndup(ctdb, (char*)t->name, t->length);
+	CTDB_NO_MEMORY(ctdb, name);
+
+	for (i=0;i<ARRAY_SIZE(tunable_map);i++) {
+		if (strcasecmp(name, tunable_map[i].name) == 0) break;
+	}
+	talloc_free(name);
+	
+	if (i == ARRAY_SIZE(tunable_map)) {
+		return -1;
+	}
+
+	val = *(uint32_t *)(tunable_map[i].offset + (uint8_t*)&ctdb->tunable);
+
+	outdata->dptr = (uint8_t *)talloc(outdata, uint32_t);
+	CTDB_NO_MEMORY(ctdb, outdata->dptr);
+
+	*(uint32_t *)outdata->dptr = val;
+	outdata->dsize = sizeof(uint32_t);
+
+	return 0;
+}
+
+
+/*
+  set a tunable
+ */
+int32_t ctdb_control_set_tunable(struct ctdb_context *ctdb, TDB_DATA indata)
+{
+	struct ctdb_control_set_tunable *t = 
+		(struct ctdb_control_set_tunable *)indata.dptr;
+	char *name;
+	int i;
+
+	if (indata.dsize < sizeof(*t) ||
+	    t->length > indata.dsize - offsetof(struct ctdb_control_set_tunable, name)) {
+		DEBUG(0,("Bad indata in ctdb_control_set_tunable\n"));
+		return -1;
+	}
+
+	name = talloc_strndup(ctdb, (char *)t->name, t->length);
+	CTDB_NO_MEMORY(ctdb, name);
+
+	for (i=0;i<ARRAY_SIZE(tunable_map);i++) {
+		if (strcasecmp(name, tunable_map[i].name) == 0) break;
+	}
+
+	talloc_free(name);
+	
+	if (i == ARRAY_SIZE(tunable_map)) {
+		return -1;
+	}
+	
+	*(uint32_t *)(tunable_map[i].offset + (uint8_t*)&ctdb->tunable) = t->value;
+
+	return 0;
+}
+
+/*
+  list tunables
+ */
+int32_t ctdb_control_list_tunables(struct ctdb_context *ctdb, TDB_DATA *outdata)
+{
+	char *list = NULL;
+	int i;
+	struct ctdb_control_list_tunable *t;
+
+	list = talloc_strdup(outdata, tunable_map[0].name);
+	CTDB_NO_MEMORY(ctdb, list);
+
+	for (i=1;i<ARRAY_SIZE(tunable_map);i++) {
+		list = talloc_asprintf_append(list, ":%s", tunable_map[i].name);
+		CTDB_NO_MEMORY(ctdb, list);		
+	}
+
+	outdata->dsize = offsetof(struct ctdb_control_list_tunable, data) + 
+		strlen(list) + 1;
+	outdata->dptr = talloc_size(outdata, outdata->dsize);
+	CTDB_NO_MEMORY(ctdb, outdata->dptr);
+
+	t = (struct ctdb_control_list_tunable *)outdata->dptr;
+	t->length = strlen(list)+1;
+
+	memcpy(t->data, list, t->length);
+	talloc_free(list);
+
+	return 0;	
+}
diff --git a/source4/cluster/ctdb/server/ctdbd.c b/source4/cluster/ctdb/server/ctdbd.c
new file mode 100644
index 0000000000..2c4a23b673
--- /dev/null
+++ b/source4/cluster/ctdb/server/ctdbd.c
@@ -0,0 +1,229 @@
+/* 
+   standalone ctdb daemon
+
+   Copyright (C) Andrew Tridgell  2006
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/filesys.h"
+#include "popt.h"
+#include "system/wait.h"
+#include "cmdline.h"
+#include "../include/ctdb_private.h"
+
+static void block_signal(int signum)
+{
+	struct sigaction act;
+
+	memset(&act, 0, sizeof(act));
+
+	act.sa_handler = SIG_IGN;
+	sigemptyset(&act.sa_mask);
+	sigaddset(&act.sa_mask, signum);
+	sigaction(signum, &act, NULL);
+}
+
+static struct {
+	const char *nlist;
+	const char *transport;
+	const char *myaddress;
+	const char *public_address_list;
+	const char *public_interface;
+	const char *event_script;
+	const char *logfile;
+	const char *recovery_lock_file;
+	const char *db_dir;
+} options = {
+	.nlist = ETCDIR "/ctdb/nodes",
+	.transport = "tcp",
+	.event_script = ETCDIR "/ctdb/events",
+	.logfile = VARDIR "/log/log.ctdb",
+	.db_dir = VARDIR "/ctdb",
+};
+
+
+/*
+  called by the transport layer when a packet comes in
+*/
+static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
+{
+	struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
+
+	ctdb->statistics.node_packets_recv++;
+
+	/* up the counter for this source node, so we know its alive */
+	if (ctdb_validate_vnn(ctdb, hdr->srcnode)) {
+		/* as a special case, redirected calls don't increment the rx_cnt */
+		if (hdr->operation != CTDB_REQ_CALL ||
+		    ((struct ctdb_req_call *)hdr)->hopcount == 0) {
+			ctdb->nodes[hdr->srcnode]->rx_cnt++;
+		}
+	}
+
+	ctdb_input_pkt(ctdb, hdr);
+}
+
+
+
+static const struct ctdb_upcalls ctdb_upcalls = {
+	.recv_pkt       = ctdb_recv_pkt,
+	.node_dead      = ctdb_node_dead,
+	.node_connected = ctdb_node_connected
+};
+
+
+
+/*
+  main program
+*/
+int main(int argc, const char *argv[])
+{
+	struct ctdb_context *ctdb;
+	int interactive = 0;
+
+	struct poptOption popt_options[] = {
+		POPT_AUTOHELP
+		POPT_CTDB_CMDLINE
+		{ "interactive", 'i', POPT_ARG_NONE, &interactive, 0, "don't fork", NULL },
+		{ "public-addresses", 0, POPT_ARG_STRING, &options.public_address_list, 0, "public address list file", "filename" },
+		{ "public-interface", 0, POPT_ARG_STRING, &options.public_interface, 0, "public interface", "interface"},
+		{ "event-script", 0, POPT_ARG_STRING, &options.event_script, 0, "event script", "filename" },
+		{ "logfile", 0, POPT_ARG_STRING, &options.logfile, 0, "log file location", "filename" },
+		{ "nlist", 0, POPT_ARG_STRING, &options.nlist, 0, "node list file", "filename" },
+		{ "listen", 0, POPT_ARG_STRING, &options.myaddress, 0, "address to listen on", "address" },
+		{ "transport", 0, POPT_ARG_STRING, &options.transport, 0, "protocol transport", NULL },
+		{ "dbdir", 0, POPT_ARG_STRING, &options.db_dir, 0, "directory for the tdb files", NULL },
+		{ "reclock", 0, POPT_ARG_STRING, &options.recovery_lock_file, 0, "location of recovery lock file", "filename" },
+		POPT_TABLEEND
+	};
+	int opt, ret;
+	const char **extra_argv;
+	int extra_argc = 0;
+	poptContext pc;
+	struct event_context *ev;
+
+	pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
+
+	while ((opt = poptGetNextOpt(pc)) != -1) {
+		switch (opt) {
+		default:
+			fprintf(stderr, "Invalid option %s: %s\n", 
+				poptBadOption(pc, 0), poptStrerror(opt));
+			exit(1);
+		}
+	}
+
+	/* setup the remaining options for the main program to use */
+	extra_argv = poptGetArgs(pc);
+	if (extra_argv) {
+		extra_argv++;
+		while (extra_argv[extra_argc]) extra_argc++;
+	}
+
+	if (!options.recovery_lock_file) {
+		DEBUG(0,("You must specifiy the location of a recovery lock file with --reclock\n"));
+		exit(1);
+	}
+
+	block_signal(SIGPIPE);
+
+	ev = event_context_init(NULL);
+
+	ctdb = ctdb_cmdline_init(ev);
+
+	ctdb->recovery_mode    = CTDB_RECOVERY_NORMAL;
+	ctdb->recovery_master  = (uint32_t)-1;
+	ctdb->upcalls          = &ctdb_upcalls;
+	ctdb->idr              = idr_init(ctdb);
+	ctdb->recovery_lock_fd = -1;
+	ctdb->monitoring_mode  = CTDB_MONITORING_ACTIVE;
+
+	ctdb_tunables_set_defaults(ctdb);
+
+	ret = ctdb_set_recovery_lock_file(ctdb, options.recovery_lock_file);
+	if (ret == -1) {
+		printf("ctdb_set_recovery_lock_file failed - %s\n", ctdb_errstr(ctdb));
+		exit(1);
+	}
+
+	ret = ctdb_set_transport(ctdb, options.transport);
+	if (ret == -1) {
+		printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb));
+		exit(1);
+	}
+
+	/* tell ctdb what address to listen on */
+	if (options.myaddress) {
+		ret = ctdb_set_address(ctdb, options.myaddress);
+		if (ret == -1) {
+			printf("ctdb_set_address failed - %s\n", ctdb_errstr(ctdb));
+			exit(1);
+		}
+	}
+
+	/* tell ctdb what nodes are available */
+	ret = ctdb_set_nlist(ctdb, options.nlist);
+	if (ret == -1) {
+		printf("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb));
+		exit(1);
+	}
+
+	if (options.db_dir) {
+		ret = ctdb_set_tdb_dir(ctdb, options.db_dir);
+		if (ret == -1) {
+			printf("ctdb_set_tdb_dir failed - %s\n", ctdb_errstr(ctdb));
+			exit(1);
+		}
+	}
+
+	ret = ctdb_set_logfile(ctdb, options.logfile);
+	if (ret == -1) {
+		printf("ctdb_set_logfile to %s failed - %s\n", options.logfile, ctdb_errstr(ctdb));
+		exit(1);
+	}
+
+	if (options.public_interface) {
+		ctdb->takeover.interface = talloc_strdup(ctdb, options.public_interface);
+		CTDB_NO_MEMORY(ctdb, ctdb->takeover.interface);
+	}
+
+	if (options.public_address_list) {
+		ret = ctdb_set_public_addresses(ctdb, options.public_address_list);
+		if (ret == -1) {
+			printf("Unable to setup public address list\n");
+			exit(1);
+		}
+		ctdb->takeover.enabled = true;
+	}
+
+	ret = ctdb_set_event_script(ctdb, options.event_script);
+	if (ret == -1) {
+		printf("Unable to setup event script\n");
+		exit(1);
+	}
+
+	/* useful default logfile */
+	if (ctdb->logfile == NULL) {
+		char *name = talloc_asprintf(ctdb, "%s/log.ctdb.vnn%u", 
+					     VARDIR, ctdb->vnn);
+		ctdb_set_logfile(ctdb, name);
+		talloc_free(name);
+	}
+
+	/* start the protocol running (as a child) */
+	return ctdb_start_daemon(ctdb, interactive?False:True);
+}
diff --git a/source4/cluster/ctdb/server/eventscript.c b/source4/cluster/ctdb/server/eventscript.c
new file mode 100644
index 0000000000..e23157056c
--- /dev/null
+++ b/source4/cluster/ctdb/server/eventscript.c
@@ -0,0 +1,191 @@
+/* 
+   event script handling
+
+   Copyright (C) Andrew Tridgell  2007
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "system/filesys.h"
+#include "system/wait.h"
+#include "../include/ctdb_private.h"
+#include "lib/events/events.h"
+
+/*
+  run the event script - varargs version
+ */
+static int ctdb_event_script_v(struct ctdb_context *ctdb, const char *fmt, va_list ap)
+{
+	char *options, *cmdstr;
+	int ret;
+	va_list ap2;
+	struct stat st;
+
+	if (stat(ctdb->takeover.event_script, &st) != 0 && 
+	    errno == ENOENT) {
+		DEBUG(0,("No event script found at '%s'\n", ctdb->takeover.event_script));
+		return 0;
+	}
+
+	va_copy(ap2, ap);
+	options  = talloc_vasprintf(ctdb, fmt, ap2);
+	va_end(ap2);
+	CTDB_NO_MEMORY(ctdb, options);
+
+	cmdstr = talloc_asprintf(ctdb, "%s %s", ctdb->takeover.event_script, options);
+	CTDB_NO_MEMORY(ctdb, cmdstr);
+
+	ret = system(cmdstr);
+	if (ret != -1) {
+		ret = WEXITSTATUS(ret);
+	}
+
+	talloc_free(cmdstr);
+	talloc_free(options);
+
+	return ret;
+}
+
+/*
+  run the event script
+ */
+int ctdb_event_script(struct ctdb_context *ctdb, const char *fmt, ...)
+{
+	va_list ap;
+	int ret;
+
+	va_start(ap, fmt);
+	ret = ctdb_event_script_v(ctdb, fmt, ap);
+	va_end(ap);
+
+	return ret;
+}
+
+
+struct ctdb_event_script_state {
+	struct ctdb_context *ctdb;
+	pid_t child;
+	void (*callback)(struct ctdb_context *, int, void *);
+	int fd[2];
+	void *private_data;
+};
+
+/* called when child is finished */
+static void ctdb_event_script_handler(struct event_context *ev, struct fd_event *fde, 
+				      uint16_t flags, void *p)
+{
+	struct ctdb_event_script_state *state = 
+		talloc_get_type(p, struct ctdb_event_script_state);
+	int status = -1;
+	void (*callback)(struct ctdb_context *, int, void *) = state->callback;
+	void *private_data = state->private_data;
+	struct ctdb_context *ctdb = state->ctdb;
+
+	waitpid(state->child, &status, 0);
+	if (status != -1) {
+		status = WEXITSTATUS(status);
+	}
+	talloc_set_destructor(state, NULL);
+	talloc_free(state);
+	callback(ctdb, status, private_data);
+}
+
+
+/* called when child times out */
+static void ctdb_event_script_timeout(struct event_context *ev, struct timed_event *te, 
+				      struct timeval t, void *p)
+{
+	struct ctdb_event_script_state *state = talloc_get_type(p, struct ctdb_event_script_state);
+	void (*callback)(struct ctdb_context *, int, void *) = state->callback;
+	void *private_data = state->private_data;
+	struct ctdb_context *ctdb = state->ctdb;
+
+	DEBUG(0,("event script timed out\n"));
+	talloc_free(state);
+	callback(ctdb, -1, private_data);
+}
+
+/*
+  destroy a running event script
+ */
+static int event_script_destructor(struct ctdb_event_script_state *state)
+{
+	kill(state->child, SIGKILL);
+	waitpid(state->child, NULL, 0);
+	return 0;
+}
+
+/*
+  run the event script in the background, calling the callback when 
+  finished
+ */
+int ctdb_event_script_callback(struct ctdb_context *ctdb, 
+			       struct timeval timeout,
+			       TALLOC_CTX *mem_ctx,
+			       void (*callback)(struct ctdb_context *, int, void *),
+			       void *private_data,
+			       const char *fmt, ...)
+{
+	struct ctdb_event_script_state *state;
+	va_list ap;
+	int ret;
+
+	state = talloc(mem_ctx, struct ctdb_event_script_state);
+	CTDB_NO_MEMORY(ctdb, state);
+
+	state->ctdb = ctdb;
+	state->callback = callback;
+	state->private_data = private_data;
+	
+	ret = pipe(state->fd);
+	if (ret != 0) {
+		talloc_free(state);
+		return -1;
+	}
+
+	state->child = fork();
+
+	if (state->child == (pid_t)-1) {
+		close(state->fd[0]);
+		close(state->fd[1]);
+		talloc_free(state);
+		return -1;
+	}
+
+	if (state->child == 0) {
+		close(state->fd[0]);
+		ctdb_set_realtime(false);
+		set_close_on_exec(state->fd[1]);
+		va_start(ap, fmt);
+		ret = ctdb_event_script_v(ctdb, fmt, ap);
+		va_end(ap);
+		_exit(ret);
+	}
+
+	talloc_set_destructor(state, event_script_destructor);
+
+	close(state->fd[1]);
+
+	event_add_fd(ctdb->ev, state, state->fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
+		     ctdb_event_script_handler, state);
+
+	if (!timeval_is_zero(&timeout)) {
+		event_add_timed(ctdb->ev, state, timeout, ctdb_event_script_timeout, state);
+	}
+
+	return 0;
+}
+
+
-- 
cgit