summaryrefslogtreecommitdiff
path: root/source4/cluster
diff options
context:
space:
mode:
authorAndrew Tridgell <tridge@samba.org>2007-01-19 03:54:48 +0000
committerGerald (Jerry) Carter <jerry@samba.org>2007-10-10 14:43:46 -0500
commit5cb78383fafa15c2ff7a4ccd194cccd5cf5cd263 (patch)
tree050e2f47faf234685cd7f20ab7e4f37e6521f7a2 /source4/cluster
parent8c3d15f6caa3f1ffda86755fa9b7ff9602cbb022 (diff)
downloadsamba-5cb78383fafa15c2ff7a4ccd194cccd5cf5cd263.tar.gz
samba-5cb78383fafa15c2ff7a4ccd194cccd5cf5cd263.tar.bz2
samba-5cb78383fafa15c2ff7a4ccd194cccd5cf5cd263.zip
r20889: import ctdb cluster backend from bzr
it will be interesting to see how the build farm handles this (This used to be commit 53be449630bd67d649a9e70cc7e25a9799c0616b)
Diffstat (limited to 'source4/cluster')
-rw-r--r--source4/cluster/config.mk5
-rw-r--r--source4/cluster/ctdb/brlock_ctdb.c971
-rw-r--r--source4/cluster/ctdb/common/ctdb.c287
-rw-r--r--source4/cluster/ctdb/common/ctdb_call.c653
-rw-r--r--source4/cluster/ctdb/common/ctdb_ltdb.c139
-rw-r--r--source4/cluster/ctdb/common/ctdb_util.c103
-rw-r--r--source4/cluster/ctdb/config.mk24
-rw-r--r--source4/cluster/ctdb/ctdb_cluster.c138
-rw-r--r--source4/cluster/ctdb/ctdb_cluster.h23
-rw-r--r--source4/cluster/ctdb/include/ctdb.h117
-rw-r--r--source4/cluster/ctdb/include/ctdb_private.h216
-rw-r--r--source4/cluster/ctdb/tcp/ctdb_tcp.h76
-rw-r--r--source4/cluster/ctdb/tcp/tcp_connect.c191
-rw-r--r--source4/cluster/ctdb/tcp/tcp_init.c102
-rw-r--r--source4/cluster/ctdb/tcp/tcp_io.c254
-rw-r--r--source4/cluster/ctdb/tests/ctdb_bench.c228
-rw-r--r--source4/cluster/ctdb/tests/ctdb_test.c207
17 files changed, 3733 insertions, 1 deletions
diff --git a/source4/cluster/config.mk b/source4/cluster/config.mk
index 934bc55252..c5c2ea970a 100644
--- a/source4/cluster/config.mk
+++ b/source4/cluster/config.mk
@@ -1,4 +1,7 @@
+include ctdb/config.mk
####################
[SUBSYSTEM::CLUSTER]
-OBJ_FILES = cluster.o
+OBJ_FILES = cluster.o \
+ local.o
+PRIVATE_DEPENDENCIES = ctdb
diff --git a/source4/cluster/ctdb/brlock_ctdb.c b/source4/cluster/ctdb/brlock_ctdb.c
new file mode 100644
index 0000000000..bcfa566b76
--- /dev/null
+++ b/source4/cluster/ctdb/brlock_ctdb.c
@@ -0,0 +1,971 @@
+/*
+ Unix SMB/CIFS implementation.
+
+ generic byte range locking code - ctdb backend
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+*/
+
+#include "includes.h"
+#include "system/filesys.h"
+#include "lib/tdb/include/tdb.h"
+#include "messaging/messaging.h"
+#include "db_wrap.h"
+#include "lib/messaging/irpc.h"
+#include "libcli/libcli.h"
+#include "cluster/cluster.h"
+#include "ntvfs/common/brlock.h"
+#include "cluster/ctdb/include/ctdb.h"
+
+enum my_functions {FUNC_BRL_LOCK=1, FUNC_BRL_UNLOCK=2,
+ FUNC_BRL_REMOVE_PENDING=3, FUNC_BRL_LOCKTEST=4,
+ FUNC_BRL_CLOSE=5};
+
+/*
+ in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies
+ a file. For a local posix filesystem this will usually be a combination
+ of the device and inode numbers of the file, but it can be anything
+ that uniquely idetifies a file for locking purposes, as long
+ as it is applied consistently.
+*/
+
+/* this struct is typically attached to tcon */
+struct brl_context {
+ struct ctdb_context *ctdb;
+ struct server_id server;
+ struct messaging_context *messaging_ctx;
+};
+
+/*
+ the lock context contains the elements that define whether one
+ lock is the same as another lock
+*/
+struct lock_context {
+ struct server_id server;
+ uint16_t smbpid;
+ struct brl_context *ctx;
+};
+
+/* The data in brlock records is an unsorted linear array of these
+ records. It is unnecessary to store the count as tdb provides the
+ size of the record */
+struct lock_struct {
+ struct lock_context context;
+ struct ntvfs_handle *ntvfs;
+ uint64_t start;
+ uint64_t size;
+ enum brl_type lock_type;
+ void *notify_ptr;
+};
+
+/* this struct is attached to on open file handle */
+struct brl_handle {
+ DATA_BLOB key;
+ struct ntvfs_handle *ntvfs;
+ struct lock_struct last_lock;
+};
+
+/*
+ Open up the brlock.tdb database. Close it down using
+ talloc_free(). We need the messaging_ctx to allow for
+ pending lock notifications.
+*/
+static struct brl_context *brl_ctdb_init(TALLOC_CTX *mem_ctx, struct server_id server,
+ struct messaging_context *messaging_ctx)
+{
+ struct ctdb_context *ctdb = talloc_get_type(cluster_private(), struct ctdb_context);
+ struct brl_context *brl;
+
+ brl = talloc(mem_ctx, struct brl_context);
+ if (brl == NULL) {
+ return NULL;
+ }
+
+ brl->ctdb = ctdb;
+ brl->server = server;
+ brl->messaging_ctx = messaging_ctx;
+
+ DEBUG(0,("brl_ctdb_init: brl=%p\n", brl));
+
+ return brl;
+}
+
+static struct brl_handle *brl_ctdb_create_handle(TALLOC_CTX *mem_ctx, struct ntvfs_handle *ntvfs,
+ DATA_BLOB *file_key)
+{
+ struct brl_handle *brlh;
+
+ brlh = talloc(mem_ctx, struct brl_handle);
+ if (brlh == NULL) {
+ return NULL;
+ }
+
+ brlh->key = *file_key;
+ brlh->ntvfs = ntvfs;
+ ZERO_STRUCT(brlh->last_lock);
+
+ return brlh;
+}
+
+/*
+ see if two locking contexts are equal
+*/
+static BOOL brl_ctdb_same_context(struct lock_context *ctx1, struct lock_context *ctx2)
+{
+ return (cluster_id_equal(&ctx1->server, &ctx2->server) &&
+ ctx1->smbpid == ctx2->smbpid &&
+ ctx1->ctx == ctx2->ctx);
+}
+
+/*
+ see if lck1 and lck2 overlap
+*/
+static BOOL brl_ctdb_overlap(struct lock_struct *lck1,
+ struct lock_struct *lck2)
+{
+ /* this extra check is not redundent - it copes with locks
+ that go beyond the end of 64 bit file space */
+ if (lck1->size != 0 &&
+ lck1->start == lck2->start &&
+ lck1->size == lck2->size) {
+ return True;
+ }
+
+ if (lck1->start >= (lck2->start+lck2->size) ||
+ lck2->start >= (lck1->start+lck1->size)) {
+ return False;
+ }
+ return True;
+}
+
+/*
+ See if lock2 can be added when lock1 is in place.
+*/
+static BOOL brl_ctdb_conflict(struct lock_struct *lck1,
+ struct lock_struct *lck2)
+{
+ /* pending locks don't conflict with anything */
+ if (lck1->lock_type >= PENDING_READ_LOCK ||
+ lck2->lock_type >= PENDING_READ_LOCK) {
+ return False;
+ }
+
+ if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
+ return False;
+ }
+
+ if (brl_ctdb_same_context(&lck1->context, &lck2->context) &&
+ lck2->lock_type == READ_LOCK && lck1->ntvfs == lck2->ntvfs) {
+ return False;
+ }
+
+ return brl_ctdb_overlap(lck1, lck2);
+}
+
+
+/*
+ Check to see if this lock conflicts, but ignore our own locks on the
+ same fnum only.
+*/
+static BOOL brl_ctdb_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2)
+{
+ /* pending locks don't conflict with anything */
+ if (lck1->lock_type >= PENDING_READ_LOCK ||
+ lck2->lock_type >= PENDING_READ_LOCK) {
+ return False;
+ }
+
+ if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK)
+ return False;
+
+ /*
+ * note that incoming write calls conflict with existing READ
+ * locks even if the context is the same. JRA. See LOCKTEST7
+ * in smbtorture.
+ */
+ if (brl_ctdb_same_context(&lck1->context, &lck2->context) &&
+ lck1->ntvfs == lck2->ntvfs &&
+ (lck2->lock_type == READ_LOCK || lck1->lock_type == WRITE_LOCK)) {
+ return False;
+ }
+
+ return brl_ctdb_overlap(lck1, lck2);
+}
+
+
+/*
+ amazingly enough, w2k3 "remembers" whether the last lock failure
+ is the same as this one and changes its error code. I wonder if any
+ app depends on this?
+*/
+static NTSTATUS brl_ctdb_lock_failed(struct brl_handle *brlh, struct lock_struct *lock)
+{
+ /*
+ * this function is only called for non pending lock!
+ */
+
+ /*
+ * if the notify_ptr is non NULL,
+ * it means that we're at the end of a pending lock
+ * and the real lock is requested after the timeout went by
+ * In this case we need to remember the last_lock and always
+ * give FILE_LOCK_CONFLICT
+ */
+ if (lock->notify_ptr) {
+ brlh->last_lock = *lock;
+ return NT_STATUS_FILE_LOCK_CONFLICT;
+ }
+
+ /*
+ * amazing the little things you learn with a test
+ * suite. Locks beyond this offset (as a 64 bit
+ * number!) always generate the conflict error code,
+ * unless the top bit is set
+ */
+ if (lock->start >= 0xEF000000 && (lock->start >> 63) == 0) {
+ brlh->last_lock = *lock;
+ return NT_STATUS_FILE_LOCK_CONFLICT;
+ }
+
+ /*
+ * if the current lock matches the last failed lock on the file handle
+ * and starts at the same offset, then FILE_LOCK_CONFLICT should be returned
+ */
+ if (cluster_id_equal(&lock->context.server, &brlh->last_lock.context.server) &&
+ lock->context.ctx == brlh->last_lock.context.ctx &&
+ lock->ntvfs == brlh->last_lock.ntvfs &&
+ lock->start == brlh->last_lock.start) {
+ return NT_STATUS_FILE_LOCK_CONFLICT;
+ }
+
+ brlh->last_lock = *lock;
+ return NT_STATUS_LOCK_NOT_GRANTED;
+}
+
+
+static void show_locks(const char *op, struct lock_struct *locks, int count)
+{
+ int i;
+ DEBUG(0,("OP: %s\n", op));
+ for (i=0;i<count;i++) {
+ DEBUG(0,("%2d: %4d %4d %d.%d.%d %p %p\n",
+ i, (int)locks[i].start, (int)locks[i].size,
+ locks[i].context.server.node,
+ locks[i].context.server.id,
+ locks[i].context.smbpid,
+ locks[i].context.ctx,
+ locks[i].ntvfs));
+ }
+}
+
+
+struct ctdb_lock_req {
+ uint16_t smbpid;
+ uint64_t start;
+ uint64_t size;
+ enum brl_type lock_type;
+ void *notify_ptr;
+ struct server_id server;
+ struct brl_context *brl;
+ struct ntvfs_handle *ntvfs;
+};
+
+/*
+ ctdb call handling brl_lock()
+*/
+static int brl_ctdb_lock_func(struct ctdb_call *call)
+{
+ struct ctdb_lock_req *req = (struct ctdb_lock_req *)call->call_data->dptr;
+ TDB_DATA dbuf;
+ int count=0, i;
+ struct lock_struct lock, *locks=NULL;
+ NTSTATUS status = NT_STATUS_OK;
+
+#if 0
+ /* if this is a pending lock, then with the chainlock held we
+ try to get the real lock. If we succeed then we don't need
+ to make it pending. This prevents a possible race condition
+ where the pending lock gets created after the lock that is
+ preventing the real lock gets removed */
+ if (lock_type >= PENDING_READ_LOCK) {
+ enum brl_type rw = (lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
+
+ /* here we need to force that the last_lock isn't overwritten */
+ lock = brlh->last_lock;
+ status = brl_ctdb_lock(brl, brlh, smbpid, start, size, rw, NULL);
+ brlh->last_lock = lock;
+
+ if (NT_STATUS_IS_OK(status)) {
+ tdb_chainunlock(brl->w->tdb, kbuf);
+ return NT_STATUS_OK;
+ }
+ }
+#endif
+
+ dbuf = call->record_data;
+
+ ZERO_STRUCT(lock);
+ lock.context.smbpid = req->smbpid;
+ lock.context.server = req->server;
+ lock.context.ctx = req->brl;
+ lock.ntvfs = req->ntvfs;
+ lock.start = req->start;
+ lock.size = req->size;
+ lock.lock_type = req->lock_type;
+ lock.notify_ptr = req->notify_ptr;
+
+ {
+ int xlen = sizeof(lock);
+ uint8_t *xx = &lock;
+ int ii, fd = open("/dev/null", O_WRONLY);
+ for (ii=0;ii<xlen;ii++) {
+ write(fd, &xx[ii], 1);
+ }
+ close(fd);
+ }
+
+ if (dbuf.dptr) {
+ /* there are existing locks - make sure they don't conflict */
+ locks = (struct lock_struct *)dbuf.dptr;
+ count = dbuf.dsize / sizeof(*locks);
+
+ show_locks("lock", locks, count);
+
+ for (i=0; i<count; i++) {
+ if (brl_ctdb_conflict(&locks[i], &lock)) {
+ status = NT_STATUS_LOCK_NOT_GRANTED;
+ goto reply;
+ }
+ }
+ }
+
+ call->new_data = talloc(call, TDB_DATA);
+ if (call->new_data == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+
+ call->new_data->dptr = talloc_size(call, dbuf.dsize + sizeof(lock));
+ if (call->new_data->dptr == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+ memcpy(call->new_data->dptr, locks, dbuf.dsize);
+ memcpy(call->new_data->dptr+dbuf.dsize, &lock, sizeof(lock));
+ call->new_data->dsize = dbuf.dsize + sizeof(lock);
+
+ if (req->lock_type >= PENDING_READ_LOCK) {
+ status = NT_STATUS_LOCK_NOT_GRANTED;
+ }
+
+ DEBUG(0,("lock: size now %d\n", call->new_data->dsize));
+
+reply:
+ call->reply_data = talloc(call, TDB_DATA);
+ if (call->reply_data == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+
+ call->reply_data->dptr = talloc_size(call, sizeof(NTSTATUS));
+ call->reply_data->dsize = sizeof(NTSTATUS);
+ if (call->reply_data->dptr == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+ *(NTSTATUS *)call->reply_data->dptr = status;
+
+ return 0;
+}
+
+
+/*
+ Lock a range of bytes. The lock_type can be a PENDING_*_LOCK, in
+ which case a real lock is first tried, and if that fails then a
+ pending lock is created. When the pending lock is triggered (by
+ someone else closing an overlapping lock range) a messaging
+ notification is sent, identified by the notify_ptr
+*/
+static NTSTATUS brl_ctdb_lock(struct brl_context *brl,
+ struct brl_handle *brlh,
+ uint16_t smbpid,
+ uint64_t start, uint64_t size,
+ enum brl_type lock_type,
+ void *notify_ptr)
+{
+ TDB_DATA kbuf, rbuf, sbuf;
+ struct ctdb_lock_req req;
+ NTSTATUS status;
+ int ret;
+
+ kbuf.dptr = brlh->key.data;
+ kbuf.dsize = brlh->key.length;
+
+ rbuf.dptr = (uint8_t *)&req;
+ rbuf.dsize = sizeof(req);
+
+ ZERO_STRUCT(req);
+ req.smbpid = smbpid;
+ req.start = start;
+ req.size = size;
+ req.lock_type = lock_type;
+ req.notify_ptr = notify_ptr;
+ req.server = brl->server;
+ req.brl = brl;
+ req.ntvfs = brlh->ntvfs;
+
+ ret = ctdb_call(brl->ctdb, kbuf, FUNC_BRL_LOCK, &rbuf, &sbuf);
+ if (ret == -1) {
+ DEBUG(0,("ctdb_call failed - %s\n", __location__));
+ return NT_STATUS_INTERNAL_DB_CORRUPTION;
+ }
+
+ status = *(NTSTATUS *)sbuf.dptr;
+ talloc_free(sbuf.dptr);
+
+ return status;
+}
+
+#if 0
+/*
+ we are removing a lock that might be holding up a pending lock. Scan for pending
+ locks that cover this range and if we find any then notify the server that it should
+ retry the lock
+*/
+static void brl_ctdb_notify_unlock(struct brl_context *brl,
+ struct lock_struct *locks, int count,
+ struct lock_struct *removed_lock)
+{
+ int i, last_notice;
+
+ /* the last_notice logic is to prevent stampeding on a lock
+ range. It prevents us sending hundreds of notifies on the
+ same range of bytes. It doesn't prevent all possible
+ stampedes, but it does prevent the most common problem */
+ last_notice = -1;
+
+ for (i=0;i<count;i++) {
+ if (locks[i].lock_type >= PENDING_READ_LOCK &&
+ brl_ctdb_overlap(&locks[i], removed_lock)) {
+ if (last_notice != -1 && brl_ctdb_overlap(&locks[i], &locks[last_notice])) {
+ continue;
+ }
+ if (locks[i].lock_type == PENDING_WRITE_LOCK) {
+ last_notice = i;
+ }
+ messaging_send_ptr(brl->messaging_ctx, locks[i].context.server,
+ MSG_BRL_RETRY, locks[i].notify_ptr);
+ }
+ }
+}
+#endif
+
+/*
+ send notifications for all pending locks - the file is being closed by this
+ user
+*/
+static void brl_ctdb_notify_all(struct brl_context *brl,
+ struct lock_struct *locks, int count)
+{
+ int i;
+ for (i=0;i<count;i++) {
+ if (locks->lock_type >= PENDING_READ_LOCK) {
+// brl_ctdb_notify_unlock(brl, locks, count, &locks[i]);
+ }
+ }
+}
+
+struct ctdb_unlock_req {
+ uint16_t smbpid;
+ uint64_t start;
+ uint64_t size;
+ struct server_id server;
+ struct brl_context *brl;
+ struct ntvfs_handle *ntvfs;
+};
+
+/*
+ Unlock a range of bytes.
+*/
+static int brl_ctdb_unlock_func(struct ctdb_call *call)
+{
+ struct ctdb_unlock_req *req = (struct ctdb_unlock_req *)call->call_data->dptr;
+ TDB_DATA dbuf;
+ int count, i;
+ struct lock_struct *locks;
+ struct lock_context context;
+ NTSTATUS status = NT_STATUS_OK;
+
+ dbuf = call->record_data;
+
+ context.smbpid = req->smbpid;
+ context.server = req->server;
+ context.ctx = req->brl;
+
+ /* there are existing locks - find a match */
+ locks = (struct lock_struct *)dbuf.dptr;
+ count = dbuf.dsize / sizeof(*locks);
+
+ show_locks("unlock", locks, count);
+
+ for (i=0; i<count; i++) {
+ struct lock_struct *lock = &locks[i];
+
+ if (brl_ctdb_same_context(&lock->context, &context) &&
+ lock->ntvfs == req->ntvfs &&
+ lock->start == req->start &&
+ lock->size == req->size &&
+ lock->lock_type < PENDING_READ_LOCK) {
+// struct lock_struct removed_lock = *lock;
+
+ call->new_data = talloc(call, TDB_DATA);
+ if (call->new_data == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+
+ call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(lock));
+ if (call->new_data->dptr == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+ call->new_data->dsize = dbuf.dsize - sizeof(lock);
+
+ memcpy(call->new_data->dptr, locks, i*sizeof(lock));
+ memcpy(call->new_data->dptr+i*sizeof(lock), locks+i+1,
+ (count-(i+1))*sizeof(lock));
+
+ if (count > 1) {
+ /* send notifications for any relevant pending locks */
+// brl_ctdb_notify_unlock(req->brl, locks, count, &removed_lock);
+ }
+ break;
+ }
+ }
+
+ if (call->new_data) {
+ DEBUG(0,("unlock: size now %d\n", call->new_data->dsize));
+ }
+
+ if (i == count) {
+ /* we didn't find it */
+ status = NT_STATUS_RANGE_NOT_LOCKED;
+ }
+
+ call->reply_data = talloc(call, TDB_DATA);
+ if (call->reply_data == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+
+ call->reply_data->dptr = talloc_size(call, sizeof(NTSTATUS));
+ call->reply_data->dsize = sizeof(NTSTATUS);
+ if (call->reply_data->dptr == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+ *(NTSTATUS *)call->reply_data->dptr = status;
+
+ return 0;
+}
+
+
+/*
+ Unlock a range of bytes.
+*/
+static NTSTATUS brl_ctdb_unlock(struct brl_context *brl,
+ struct brl_handle *brlh,
+ uint16_t smbpid,
+ uint64_t start, uint64_t size)
+{
+ TDB_DATA kbuf, rbuf, sbuf;
+ struct ctdb_unlock_req req;
+ NTSTATUS status;
+ int ret;
+
+ kbuf.dptr = brlh->key.data;
+ kbuf.dsize = brlh->key.length;
+
+ rbuf.dptr = (uint8_t *)&req;
+ rbuf.dsize = sizeof(req);
+
+ ZERO_STRUCT(req);
+ req.smbpid = smbpid;
+ req.start = start;
+ req.size = size;
+ req.server = brl->server;
+ req.brl = brl;
+ req.ntvfs = brlh->ntvfs;
+
+ ret = ctdb_call(brl->ctdb, kbuf, FUNC_BRL_UNLOCK, &rbuf, &sbuf);
+ if (ret == -1) {
+ DEBUG(0,("ctdb_call failed - %s\n", __location__));
+ return NT_STATUS_INTERNAL_DB_CORRUPTION;
+ }
+
+ status = *(NTSTATUS *)sbuf.dptr;
+ talloc_free(sbuf.dptr);
+
+ return status;
+}
+
+
+struct ctdb_remove_pending_req {
+ struct server_id server;
+ void *notify_ptr;
+};
+
+/*
+ remove a pending lock. This is called when the caller has either
+ given up trying to establish a lock or when they have succeeded in
+ getting it. In either case they no longer need to be notified.
+*/
+static int brl_ctdb_remove_pending_func(struct ctdb_call *call)
+{
+ struct ctdb_remove_pending_req *req = (struct ctdb_remove_pending_req *)call->call_data->dptr;
+ TDB_DATA dbuf;
+ int count, i;
+ struct lock_struct *locks;
+ NTSTATUS status = NT_STATUS_OK;
+
+ dbuf = call->record_data;
+
+ /* there are existing locks - find a match */
+ locks = (struct lock_struct *)dbuf.dptr;
+ count = dbuf.dsize / sizeof(*locks);
+
+ show_locks("remove_pending", locks, count);
+
+ for (i=0; i<count; i++) {
+ struct lock_struct *lock = &locks[i];
+
+ if (lock->lock_type >= PENDING_READ_LOCK &&
+ lock->notify_ptr == req->notify_ptr &&
+ cluster_id_equal(&lock->context.server, &req->server)) {
+ call->new_data = talloc(call, TDB_DATA);
+ if (call->new_data == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+
+ call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(lock));
+ if (call->new_data->dptr == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+ call->new_data->dsize = dbuf.dsize - sizeof(lock);
+
+ memcpy(call->new_data->dptr, locks, i*sizeof(lock));
+ memcpy(call->new_data->dptr+i*sizeof(lock), locks+i+1,
+ (count-(i+1))*sizeof(lock));
+ break;
+ }
+ }
+
+ if (call->new_data) {
+ DEBUG(0,("remove_pending: size now %d\n", call->new_data->dsize));
+ }
+
+ if (i == count) {
+ /* we didn't find it */
+ status = NT_STATUS_RANGE_NOT_LOCKED;
+ }
+
+ call->reply_data = talloc(call, TDB_DATA);
+ if (call->reply_data == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+
+ call->reply_data->dptr = talloc_size(call, sizeof(NTSTATUS));
+ call->reply_data->dsize = sizeof(NTSTATUS);
+ if (call->reply_data->dptr == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+ *(NTSTATUS *)call->reply_data->dptr = status;
+
+ return 0;
+}
+
+static NTSTATUS brl_ctdb_remove_pending(struct brl_context *brl,
+ struct brl_handle *brlh,
+ void *notify_ptr)
+{
+ TDB_DATA kbuf, rbuf, sbuf;
+ struct ctdb_remove_pending_req req;
+ NTSTATUS status;
+ int ret;
+
+ kbuf.dptr = brlh->key.data;
+ kbuf.dsize = brlh->key.length;
+
+ rbuf.dptr = (uint8_t *)&req;
+ rbuf.dsize = sizeof(req);
+
+ ZERO_STRUCT(req);
+ req.notify_ptr = notify_ptr;
+ req.server = brl->server;
+
+ ret = ctdb_call(brl->ctdb, kbuf, FUNC_BRL_REMOVE_PENDING, &rbuf, &sbuf);
+ if (ret == -1) {
+ DEBUG(0,("ctdb_call failed - %s\n", __location__));
+ return NT_STATUS_INTERNAL_DB_CORRUPTION;
+ }
+
+ status = *(NTSTATUS *)sbuf.dptr;
+ talloc_free(sbuf.dptr);
+
+ return status;
+}
+
+
+struct ctdb_locktest_req {
+ uint16_t smbpid;
+ uint64_t start;
+ uint64_t size;
+ enum brl_type lock_type;
+ struct brl_context *brl;
+ struct server_id server;
+ struct ntvfs_handle *ntvfs;
+};
+
+/*
+ remove a pending lock. This is called when the caller has either
+ given up trying to establish a lock or when they have succeeded in
+ getting it. In either case they no longer need to be notified.
+*/
+static int brl_ctdb_locktest_func(struct ctdb_call *call)
+{
+ struct ctdb_locktest_req *req = (struct ctdb_locktest_req *)call->call_data->dptr;
+ TDB_DATA dbuf;
+ int count, i;
+ struct lock_struct *locks, lock;
+ NTSTATUS status = NT_STATUS_OK;
+
+ lock.context.smbpid = req->smbpid;
+ lock.context.server = req->server;
+ lock.context.ctx = req->brl;
+ lock.ntvfs = req->ntvfs;
+ lock.start = req->start;
+ lock.size = req->size;
+ lock.lock_type = req->lock_type;
+
+ dbuf = call->record_data;
+
+ /* there are existing locks - find a match */
+ locks = (struct lock_struct *)dbuf.dptr;
+ count = dbuf.dsize / sizeof(*locks);
+
+ show_locks("locktest", locks, count);
+
+ for (i=0; i<count; i++) {
+ if (brl_ctdb_conflict_other(&locks[i], &lock)) {
+ status = NT_STATUS_FILE_LOCK_CONFLICT;
+ break;
+ }
+ }
+
+ call->reply_data = talloc(call, TDB_DATA);
+ if (call->reply_data == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+
+ call->reply_data->dptr = talloc_size(call, sizeof(NTSTATUS));
+ call->reply_data->dsize = sizeof(NTSTATUS);
+ if (call->reply_data->dptr == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+ *(NTSTATUS *)call->reply_data->dptr = status;
+
+ return 0;
+}
+
+/*
+ Test if we are allowed to perform IO on a region of an open file
+*/
+static NTSTATUS brl_ctdb_locktest(struct brl_context *brl,
+ struct brl_handle *brlh,
+ uint16_t smbpid,
+ uint64_t start, uint64_t size,
+ enum brl_type lock_type)
+{
+ TDB_DATA kbuf, rbuf, sbuf;
+ struct ctdb_locktest_req req;
+ NTSTATUS status;
+ int ret;
+
+ kbuf.dptr = brlh->key.data;
+ kbuf.dsize = brlh->key.length;
+
+ rbuf.dptr = (uint8_t *)&req;
+ rbuf.dsize = sizeof(req);
+
+ ZERO_STRUCT(req);
+ req.smbpid = smbpid;
+ req.start = start;
+ req.size = size;
+ req.lock_type = lock_type;
+ req.server = brl->server;
+ req.brl = brl;
+ req.ntvfs = brlh->ntvfs;
+
+ ret = ctdb_call(brl->ctdb, kbuf, FUNC_BRL_LOCKTEST, &rbuf, &sbuf);
+ if (ret == -1) {
+ DEBUG(0,("ctdb_call failed - %s\n", __location__));
+ return NT_STATUS_INTERNAL_DB_CORRUPTION;
+ }
+
+ status = *(NTSTATUS *)sbuf.dptr;
+ talloc_free(sbuf.dptr);
+
+ return status;
+}
+
+
+struct ctdb_close_req {
+ struct brl_context *brl;
+ struct server_id server;
+ struct ntvfs_handle *ntvfs;
+};
+
+/*
+ remove a pending lock. This is called when the caller has either
+ given up trying to establish a lock or when they have succeeded in
+ getting it. In either case they no longer need to be notified.
+*/
+static int brl_ctdb_close_func(struct ctdb_call *call)
+{
+ struct ctdb_close_req *req = (struct ctdb_close_req *)call->call_data->dptr;
+ TDB_DATA dbuf;
+ int count, dcount=0, i;
+ struct lock_struct *locks;
+ NTSTATUS status = NT_STATUS_OK;
+
+ dbuf = call->record_data;
+
+ /* there are existing locks - find a match */
+ locks = (struct lock_struct *)dbuf.dptr;
+ count = dbuf.dsize / sizeof(*locks);
+
+ show_locks("close", locks, count);
+
+ DEBUG(0,("closing ctx=%p server=%d.%d ntvfs=%p\n",
+ req->brl, req->server.node, req->server.id, req->ntvfs));
+
+ for (i=0; i<count; i++) {
+ struct lock_struct *lock = &locks[i];
+
+ if (lock->context.ctx == req->brl &&
+ cluster_id_equal(&lock->context.server, &req->server) &&
+ lock->ntvfs == req->ntvfs) {
+ /* found it - delete it */
+ if (count > 1 && i < count-1) {
+ memmove(&locks[i], &locks[i+1],
+ sizeof(*locks)*((count-1) - i));
+ }
+ count--;
+ i--;
+ dcount++;
+ }
+ }
+
+ if (dcount > 0) {
+ call->new_data = talloc(call, TDB_DATA);
+ if (call->new_data == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+
+ call->new_data->dptr = talloc_size(call, count*sizeof(struct lock_struct));
+ if (call->new_data->dptr == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+ call->new_data->dsize = count*sizeof(struct lock_struct);
+
+ memcpy(call->new_data->dptr, locks, count*sizeof(struct lock_struct));
+ }
+
+ if (call->new_data) {
+ DEBUG(0,("close: size now %d\n", call->new_data->dsize));
+ }
+
+ DEBUG(0,("brl_ctdb_close_func dcount=%d count=%d\n", dcount, count));
+
+ call->reply_data = talloc(call, TDB_DATA);
+ if (call->reply_data == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+
+ call->reply_data->dptr = talloc_size(call, sizeof(NTSTATUS));
+ call->reply_data->dsize = sizeof(NTSTATUS);
+ if (call->reply_data->dptr == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+ *(NTSTATUS *)call->reply_data->dptr = status;
+
+ return 0;
+}
+
+/*
+ Test if we are allowed to perform IO on a region of an open file
+*/
+static NTSTATUS brl_ctdb_close(struct brl_context *brl,
+ struct brl_handle *brlh)
+{
+ TDB_DATA kbuf, rbuf, sbuf;
+ struct ctdb_close_req req;
+ NTSTATUS status;
+ int ret;
+
+ kbuf.dptr = brlh->key.data;
+ kbuf.dsize = brlh->key.length;
+
+ rbuf.dptr = (uint8_t *)&req;
+ rbuf.dsize = sizeof(req);
+
+ ZERO_STRUCT(req);
+ req.brl = brl;
+ req.server = brl->server;
+ req.ntvfs = brlh->ntvfs;
+
+ DEBUG(0,("brl_ctdb_close %u.%u %p\n", req.server.node, req.server.id, brl));
+
+ ret = ctdb_call(brl->ctdb, kbuf, FUNC_BRL_CLOSE, &rbuf, &sbuf);
+ if (ret == -1) {
+ DEBUG(0,("ctdb_call failed - %s\n", __location__));
+ return NT_STATUS_INTERNAL_DB_CORRUPTION;
+ }
+
+ status = *(NTSTATUS *)sbuf.dptr;
+ talloc_free(sbuf.dptr);
+
+ return status;
+}
+
+
+static const struct brlock_ops brlock_tdb_ops = {
+ .brl_init = brl_ctdb_init,
+ .brl_create_handle = brl_ctdb_create_handle,
+ .brl_lock = brl_ctdb_lock,
+ .brl_unlock = brl_ctdb_unlock,
+ .brl_remove_pending = brl_ctdb_remove_pending,
+ .brl_locktest = brl_ctdb_locktest,
+ .brl_close = brl_ctdb_close
+};
+
+
+void brl_ctdb_init_ops(void)
+{
+ struct ctdb_context *ctdb = talloc_get_type(cluster_private(), struct ctdb_context);
+
+ brl_set_ops(&brlock_tdb_ops);
+
+ ctdb_set_call(ctdb, brl_ctdb_lock_func, FUNC_BRL_LOCK);
+ ctdb_set_call(ctdb, brl_ctdb_unlock_func, FUNC_BRL_UNLOCK);
+ ctdb_set_call(ctdb, brl_ctdb_remove_pending_func, FUNC_BRL_REMOVE_PENDING);
+ ctdb_set_call(ctdb, brl_ctdb_locktest_func, FUNC_BRL_LOCKTEST);
+ ctdb_set_call(ctdb, brl_ctdb_close_func, FUNC_BRL_CLOSE);
+
+}
diff --git a/source4/cluster/ctdb/common/ctdb.c b/source4/cluster/ctdb/common/ctdb.c
new file mode 100644
index 0000000000..ad0345b3c7
--- /dev/null
+++ b/source4/cluster/ctdb/common/ctdb.c
@@ -0,0 +1,287 @@
+/*
+ ctdb main protocol code
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#include "includes.h"
+#include "lib/tdb/include/tdb.h"
+#include "lib/events/events.h"
+#include "lib/util/dlinklist.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "cluster/ctdb/include/ctdb_private.h"
+
+/*
+ choose the transport we will use
+*/
+int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
+{
+ int ctdb_tcp_init(struct ctdb_context *ctdb);
+
+ if (strcmp(transport, "tcp") == 0) {
+ return ctdb_tcp_init(ctdb);
+ }
+ ctdb_set_error(ctdb, "Unknown transport '%s'\n", transport);
+ return -1;
+}
+
+/*
+ set some ctdb flags
+*/
+void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
+{
+ ctdb->flags |= flags;
+}
+
+
+/*
+ add a node to the list of active nodes
+*/
+static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
+{
+ struct ctdb_node *node, **nodep;
+
+ nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
+ CTDB_NO_MEMORY(ctdb, nodep);
+
+ ctdb->nodes = nodep;
+ nodep = &ctdb->nodes[ctdb->num_nodes];
+ (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
+ CTDB_NO_MEMORY(ctdb, *nodep);
+ node = *nodep;
+
+ if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
+ return -1;
+ }
+ node->ctdb = ctdb;
+ node->name = talloc_asprintf(node, "%s:%u",
+ node->address.address,
+ node->address.port);
+ /* for now we just set the vnn to the line in the file - this
+ will change! */
+ node->vnn = ctdb->num_nodes;
+
+ if (ctdb->methods->add_node(node) != 0) {
+ talloc_free(node);
+ return -1;
+ }
+
+ if (ctdb_same_address(&ctdb->address, &node->address)) {
+ ctdb->vnn = node->vnn;
+ }
+
+ ctdb->num_nodes++;
+
+ return 0;
+}
+
+/*
+ setup the node list from a file
+*/
+int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
+{
+ char **lines;
+ int nlines;
+ int i;
+
+ lines = file_lines_load(nlist, &nlines, ctdb);
+ if (lines == NULL) {
+ ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
+ return -1;
+ }
+
+ for (i=0;i<nlines;i++) {
+ if (ctdb_add_node(ctdb, lines[i]) != 0) {
+ talloc_free(lines);
+ return -1;
+ }
+ }
+
+ talloc_free(lines);
+ return 0;
+}
+
+/*
+ setup the local node address
+*/
+int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
+{
+ if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
+ return -1;
+ }
+
+ ctdb->name = talloc_asprintf(ctdb, "%s:%u",
+ ctdb->address.address,
+ ctdb->address.port);
+ return 0;
+}
+
+/*
+ add a node to the list of active nodes
+*/
+int ctdb_set_call(struct ctdb_context *ctdb, ctdb_fn_t fn, int id)
+{
+ struct ctdb_registered_call *call;
+
+ call = talloc(ctdb, struct ctdb_registered_call);
+ call->fn = fn;
+ call->id = id;
+
+ DLIST_ADD(ctdb->calls, call);
+ return 0;
+}
+
+/*
+ return the vnn of this node
+*/
+uint32_t ctdb_get_vnn(struct ctdb_context *ctdb)
+{
+ return ctdb->vnn;
+}
+
+/*
+ start the protocol going
+*/
+int ctdb_start(struct ctdb_context *ctdb)
+{
+ return ctdb->methods->start(ctdb);
+}
+
+/*
+ called by the transport layer when a packet comes in
+*/
+static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
+{
+ struct ctdb_req_header *hdr;
+ if (length < sizeof(*hdr)) {
+ ctdb_set_error(ctdb, "Bad packet length %d\n", length);
+ return;
+ }
+ hdr = (struct ctdb_req_header *)data;
+ if (length != hdr->length) {
+ ctdb_set_error(ctdb, "Bad header length %d expected %d\n",
+ hdr->length, length);
+ return;
+ }
+
+ DEBUG(0,("got ctdb op %d reqid %d\n", hdr->operation, hdr->reqid));
+
+ switch (hdr->operation) {
+ case CTDB_REQ_CALL:
+ ctdb_request_call(ctdb, hdr);
+ break;
+
+ case CTDB_REPLY_CALL:
+ ctdb_reply_call(ctdb, hdr);
+ break;
+
+ case CTDB_REPLY_ERROR:
+ ctdb_reply_error(ctdb, hdr);
+ break;
+
+ case CTDB_REPLY_REDIRECT:
+ ctdb_reply_redirect(ctdb, hdr);
+ break;
+
+ case CTDB_REQ_DMASTER:
+ ctdb_request_dmaster(ctdb, hdr);
+ break;
+
+ case CTDB_REPLY_DMASTER:
+ ctdb_reply_dmaster(ctdb, hdr);
+ break;
+
+ default:
+ printf("Packet with unknown operation %d\n", hdr->operation);
+ talloc_free(hdr);
+ break;
+ }
+}
+
+/*
+ called by the transport layer when a node is dead
+*/
+static void ctdb_node_dead(struct ctdb_node *node)
+{
+ node->ctdb->num_connected--;
+ printf("%s: node %s is dead: %d connected\n",
+ node->ctdb->name, node->name, node->ctdb->num_connected);
+}
+
+/*
+ called by the transport layer when a node is dead
+*/
+static void ctdb_node_connected(struct ctdb_node *node)
+{
+ node->ctdb->num_connected++;
+ printf("%s: connected to %s - %d connected\n",
+ node->ctdb->name, node->name, node->ctdb->num_connected);
+}
+
+/*
+ wait for all nodes to be connected
+*/
+void ctdb_connect_wait(struct ctdb_context *ctdb)
+{
+ int expected = ctdb->num_nodes - 1;
+ if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {
+ expected++;
+ }
+ while (ctdb->num_connected != expected) {
+ event_loop_once(ctdb->ev);
+ }
+}
+
+/*
+ wait until we're the only node left
+*/
+void ctdb_wait_loop(struct ctdb_context *ctdb)
+{
+ int expected = 0;
+ if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {
+ expected++;
+ }
+ while (ctdb->num_connected > expected) {
+ event_loop_once(ctdb->ev);
+ }
+}
+
+static const struct ctdb_upcalls ctdb_upcalls = {
+ .recv_pkt = ctdb_recv_pkt,
+ .node_dead = ctdb_node_dead,
+ .node_connected = ctdb_node_connected
+};
+
+/*
+ initialise the ctdb daemon.
+
+ NOTE: In current code the daemon does not fork. This is for testing purposes only
+ and to simplify the code.
+*/
+struct ctdb_context *ctdb_init(struct event_context *ev)
+{
+ struct ctdb_context *ctdb;
+
+ ctdb = talloc_zero(ev, struct ctdb_context);
+ ctdb->ev = ev;
+ ctdb->upcalls = &ctdb_upcalls;
+ ctdb->idr = idr_init(ctdb);
+
+ return ctdb;
+}
+
diff --git a/source4/cluster/ctdb/common/ctdb_call.c b/source4/cluster/ctdb/common/ctdb_call.c
new file mode 100644
index 0000000000..2bedccc86a
--- /dev/null
+++ b/source4/cluster/ctdb/common/ctdb_call.c
@@ -0,0 +1,653 @@
+/*
+ ctdb_call protocol code
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+/*
+ see http://wiki.samba.org/index.php/Samba_%26_Clustering for
+ protocol design and packet details
+*/
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/tdb/include/tdb.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "cluster/ctdb/include/ctdb_private.h"
+
+
+/*
+ queue a packet or die
+*/
+static void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+ struct ctdb_node *node;
+ DEBUG(0,("queueing destnode=%u srcnode=%u\n", hdr->destnode, hdr->srcnode));
+ node = ctdb->nodes[hdr->destnode];
+ if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
+ ctdb_fatal(ctdb, "Unable to queue packet\n");
+ }
+}
+
+
+/*
+ local version of ctdb_call
+*/
+static int ctdb_call_local(struct ctdb_context *ctdb, TDB_DATA key,
+ struct ctdb_ltdb_header *header, TDB_DATA *data,
+ int call_id, TDB_DATA *call_data, TDB_DATA *reply_data,
+ uint32_t caller)
+{
+ struct ctdb_call *c;
+ struct ctdb_registered_call *fn;
+
+ c = talloc(ctdb, struct ctdb_call);
+ CTDB_NO_MEMORY(ctdb, c);
+
+ c->key = key;
+ c->call_data = call_data;
+ c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
+ c->record_data.dsize = data->dsize;
+ CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
+ c->new_data = NULL;
+ c->reply_data = NULL;
+
+ for (fn=ctdb->calls;fn;fn=fn->next) {
+ if (fn->id == call_id) break;
+ }
+ if (fn == NULL) {
+ ctdb_set_error(ctdb, "Unknown call id %u\n", call_id);
+ return -1;
+ }
+
+ if (fn->fn(c) != 0) {
+ ctdb_set_error(ctdb, "ctdb_call %u failed\n", call_id);
+ return -1;
+ }
+
+ if (header->laccessor != caller) {
+ header->lacount = 0;
+ }
+ header->laccessor = caller;
+ header->lacount++;
+
+ /* we need to force the record to be written out if this was a remote access,
+ so that the lacount is updated */
+ if (c->new_data == NULL && header->laccessor != ctdb->vnn) {
+ c->new_data = &c->record_data;
+ }
+
+ if (c->new_data) {
+ if (ctdb_ltdb_store(ctdb, key, header, *c->new_data) != 0) {
+ ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
+ return -1;
+ }
+ }
+
+ if (reply_data) {
+ if (c->reply_data) {
+ *reply_data = *c->reply_data;
+ talloc_steal(ctdb, reply_data->dptr);
+ } else {
+ reply_data->dptr = NULL;
+ reply_data->dsize = 0;
+ }
+ }
+
+ talloc_free(c);
+
+ return 0;
+}
+
+/*
+ send an error reply
+*/
+static void ctdb_send_error(struct ctdb_context *ctdb,
+ struct ctdb_req_header *hdr, uint32_t status,
+ const char *fmt, ...)
+{
+ va_list ap;
+ struct ctdb_reply_error *r;
+ char *msg;
+ int len;
+
+ va_start(ap, fmt);
+ msg = talloc_vasprintf(ctdb, fmt, ap);
+ if (msg == NULL) {
+ ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
+ }
+ va_end(ap);
+
+ len = strlen(msg)+1;
+ r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + len);
+ CTDB_NO_MEMORY_FATAL(ctdb, r);
+ r->hdr.length = sizeof(*r) + len;
+ r->hdr.operation = CTDB_REPLY_ERROR;
+ r->hdr.destnode = hdr->srcnode;
+ r->hdr.srcnode = ctdb->vnn;
+ r->hdr.reqid = hdr->reqid;
+ r->status = status;
+ r->msglen = len;
+ memcpy(&r->msg[0], msg, len);
+
+ talloc_free(msg);
+
+ ctdb_queue_packet(ctdb, &r->hdr);
+
+ talloc_free(r);
+}
+
+
+/*
+ send a redirect reply
+*/
+static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
+ struct ctdb_req_call *c,
+ struct ctdb_ltdb_header *header)
+{
+ struct ctdb_reply_redirect *r;
+
+ r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r));
+ CTDB_NO_MEMORY_FATAL(ctdb, r);
+ r->hdr.length = sizeof(*r);
+ r->hdr.operation = CTDB_REPLY_REDIRECT;
+ r->hdr.destnode = c->hdr.srcnode;
+ r->hdr.srcnode = ctdb->vnn;
+ r->hdr.reqid = c->hdr.reqid;
+ r->dmaster = header->dmaster;
+
+ ctdb_queue_packet(ctdb, &r->hdr);
+
+ talloc_free(r);
+}
+
+/*
+ send a dmaster request (give another node the dmaster for a record)
+
+ This is always sent to the lmaster, which ensures that the lmaster
+ always knows who the dmaster is. The lmaster will then send a
+ CTDB_REPLY_DMASTER to the new dmaster
+*/
+static void ctdb_call_send_dmaster(struct ctdb_context *ctdb,
+ struct ctdb_req_call *c,
+ struct ctdb_ltdb_header *header,
+ TDB_DATA *key, TDB_DATA *data)
+{
+ struct ctdb_req_dmaster *r;
+ int len;
+
+ len = sizeof(*r) + key->dsize + data->dsize;
+ r = ctdb->methods->allocate_pkt(ctdb, len);
+ CTDB_NO_MEMORY_FATAL(ctdb, r);
+ r->hdr.length = len;
+ r->hdr.operation = CTDB_REQ_DMASTER;
+ r->hdr.destnode = ctdb_lmaster(ctdb, key);
+ r->hdr.srcnode = ctdb->vnn;
+ r->hdr.reqid = c->hdr.reqid;
+ r->dmaster = header->laccessor;
+ r->keylen = key->dsize;
+ r->datalen = data->dsize;
+ memcpy(&r->data[0], key->dptr, key->dsize);
+ memcpy(&r->data[key->dsize], data->dptr, data->dsize);
+
+ if (r->hdr.destnode == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
+ /* we are the lmaster - don't send to ourselves */
+ DEBUG(0,("XXXX local ctdb_req_dmaster\n"));
+ ctdb_request_dmaster(ctdb, &r->hdr);
+ } else {
+ ctdb_queue_packet(ctdb, &r->hdr);
+
+ /* update the ltdb to record the new dmaster */
+ header->dmaster = r->hdr.destnode;
+ ctdb_ltdb_store(ctdb, *key, header, *data);
+ }
+
+ talloc_free(r);
+}
+
+
+/*
+ called when a CTDB_REQ_DMASTER packet comes in
+
+ this comes into the lmaster for a record when the current dmaster
+ wants to give up the dmaster role and give it to someone else
+*/
+void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+ struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
+ struct ctdb_reply_dmaster *r;
+ TDB_DATA key, data;
+ struct ctdb_ltdb_header header;
+ int ret;
+
+ key.dptr = c->data;
+ key.dsize = c->keylen;
+ data.dptr = c->data + c->keylen;
+ data.dsize = c->datalen;
+
+ DEBUG(0,("request dmaster reqid=%d\n", hdr->reqid));
+
+ /* fetch the current record */
+ ret = ctdb_ltdb_fetch(ctdb, key, &header, &data);
+ if (ret != 0) {
+ ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
+ return;
+ }
+
+ {
+ int i, fd = open("/dev/null", O_WRONLY);
+ for (i=0;i<data.dsize;i++) {
+ write(fd, &data.dptr[i], 1);
+ }
+ close(fd);
+ }
+
+ /* its a protocol error if the sending node is not the current dmaster */
+ if (header.dmaster != hdr->srcnode) {
+ ctdb_fatal(ctdb, "dmaster request from non-master");
+ return;
+ }
+
+ DEBUG(0,("request dmaster reqid=%d %s\n", hdr->reqid, __location__));
+
+ header.dmaster = c->dmaster;
+ if (ctdb_ltdb_store(ctdb, key, &header, data) != 0) {
+ ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster");
+ return;
+ }
+
+ {
+ int i, fd = open("/dev/null", O_WRONLY);
+ for (i=0;i<data.dsize;i++) {
+ write(fd, &data.dptr[i], 1);
+ }
+ close(fd);
+ }
+
+ /* send the CTDB_REPLY_DMASTER */
+ r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + data.dsize);
+ CTDB_NO_MEMORY_FATAL(ctdb, r);
+ r->hdr.length = sizeof(*r) + data.dsize;
+ r->hdr.operation = CTDB_REPLY_DMASTER;
+ r->hdr.destnode = c->dmaster;
+ r->hdr.srcnode = ctdb->vnn;
+ r->hdr.reqid = hdr->reqid;
+ r->datalen = data.dsize;
+ memcpy(&r->data[0], data.dptr, data.dsize);
+
+ {
+ int i, fd = open("/dev/null", O_WRONLY);
+ for (i=0;i<data.dsize;i++) {
+ write(fd, &data.dptr[i], 1);
+ }
+ close(fd);
+ }
+
+ DEBUG(0,("request dmaster reqid=%d %s\n", hdr->reqid, __location__));
+
+ if (0 && r->hdr.destnode == r->hdr.srcnode) {
+ ctdb_reply_dmaster(ctdb, &r->hdr);
+ } else {
+ ctdb_queue_packet(ctdb, &r->hdr);
+ DEBUG(0,("request dmaster reqid=%d %s\n", hdr->reqid, __location__));
+
+ talloc_free(r);
+ }
+}
+
+
+/*
+ called when a CTDB_REQ_CALL packet comes in
+*/
+void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+ struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
+ TDB_DATA key, data, call_data, reply_data;
+ struct ctdb_reply_call *r;
+ int ret;
+ struct ctdb_ltdb_header header;
+
+ key.dptr = c->data;
+ key.dsize = c->keylen;
+ call_data.dptr = c->data + c->keylen;
+ call_data.dsize = c->calldatalen;
+
+ /* determine if we are the dmaster for this key. This also
+ fetches the record data (if any), thus avoiding a 2nd fetch of the data
+ if the call will be answered locally */
+ ret = ctdb_ltdb_fetch(ctdb, key, &header, &data);
+ if (ret != 0) {
+ ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
+ return;
+ }
+
+ /* if we are not the dmaster, then send a redirect to the
+ requesting node */
+ if (header.dmaster != ctdb->vnn) {
+ ctdb_call_send_redirect(ctdb, c, &header);
+ talloc_free(data.dptr);
+ return;
+ }
+
+ /* if this nodes has done enough consecutive calls on the same record
+ then give them the record */
+ if (header.laccessor == c->hdr.srcnode &&
+ header.lacount >= CTDB_MAX_LACOUNT) {
+ ctdb_call_send_dmaster(ctdb, c, &header, &key, &data);
+ talloc_free(data.dptr);
+ return;
+ }
+
+ ctdb_call_local(ctdb, key, &header, &data, c->callid,
+ call_data.dsize?&call_data:NULL,
+ &reply_data, c->hdr.srcnode);
+
+ r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + reply_data.dsize);
+ CTDB_NO_MEMORY_FATAL(ctdb, r);
+ r->hdr.length = sizeof(*r) + reply_data.dsize;
+ r->hdr.operation = CTDB_REPLY_CALL;
+ r->hdr.destnode = hdr->srcnode;
+ r->hdr.srcnode = hdr->destnode;
+ r->hdr.reqid = hdr->reqid;
+ r->datalen = reply_data.dsize;
+ memcpy(&r->data[0], reply_data.dptr, reply_data.dsize);
+
+ ctdb_queue_packet(ctdb, &r->hdr);
+
+ talloc_free(reply_data.dptr);
+ talloc_free(r);
+}
+
+enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR};
+
+/*
+ state of a in-progress ctdb call
+*/
+struct ctdb_call_state {
+ enum call_state state;
+ struct ctdb_req_call *c;
+ struct ctdb_node *node;
+ const char *errmsg;
+ TDB_DATA call_data;
+ TDB_DATA reply_data;
+ TDB_DATA key;
+ int redirect_count;
+ struct ctdb_ltdb_header header;
+};
+
+
+/*
+ called when a CTDB_REPLY_CALL packet comes in
+
+ This packet comes in response to a CTDB_REQ_CALL request packet. It
+ contains any reply data freom the call
+*/
+void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+ struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
+ struct ctdb_call_state *state;
+ TDB_DATA reply_data;
+
+ state = idr_find(ctdb->idr, hdr->reqid);
+
+ reply_data.dptr = c->data;
+ reply_data.dsize = c->datalen;
+
+ state->reply_data = reply_data;
+
+ talloc_steal(state, c);
+
+ state->state = CTDB_CALL_DONE;
+}
+
+/*
+ called when a CTDB_REPLY_DMASTER packet comes in
+
+ This packet comes in from the lmaster response to a CTDB_REQ_CALL
+ request packet. It means that the current dmaster wants to give us
+ the dmaster role
+*/
+void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+ struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
+ struct ctdb_call_state *state;
+ TDB_DATA data;
+
+ state = idr_find(ctdb->idr, hdr->reqid);
+
+ data.dptr = c->data;
+ data.dsize = c->datalen;
+
+ talloc_steal(state, c);
+
+ /* we're now the dmaster - update our local ltdb with new header
+ and data */
+ state->header.dmaster = ctdb->vnn;
+
+ if (ctdb_ltdb_store(ctdb, state->key, &state->header, data) != 0) {
+ ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
+ return;
+ }
+
+ ctdb_call_local(ctdb, state->key, &state->header, &data, state->c->callid,
+ state->call_data.dsize?&state->call_data:NULL,
+ &state->reply_data, ctdb->vnn);
+
+ state->state = CTDB_CALL_DONE;
+}
+
+
+/*
+ called when a CTDB_REPLY_ERROR packet comes in
+*/
+void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+ struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
+ struct ctdb_call_state *state;
+
+ state = idr_find(ctdb->idr, hdr->reqid);
+
+ talloc_steal(state, c);
+
+ state->state = CTDB_CALL_ERROR;
+ state->errmsg = (char *)c->msg;
+}
+
+
+/*
+ called when a CTDB_REPLY_REDIRECT packet comes in
+
+ This packet arrives when we have sent a CTDB_REQ_CALL request and
+ the node that received it is not the dmaster for the given key. We
+ are given a hint as to what node to try next.
+*/
+void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+ struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr;
+ struct ctdb_call_state *state;
+
+ state = idr_find(ctdb->idr, hdr->reqid);
+
+ talloc_steal(state, c);
+
+ /* don't allow for too many redirects */
+ if (state->redirect_count++ == CTDB_MAX_REDIRECT) {
+ c->dmaster = ctdb_lmaster(ctdb, &state->key);
+ }
+
+ /* send it off again */
+ state->node = ctdb->nodes[c->dmaster];
+
+ ctdb_queue_packet(ctdb, &state->c->hdr);
+}
+
+/*
+ destroy a ctdb_call
+*/
+static int ctdb_call_destructor(struct ctdb_call_state *state)
+{
+// idr_remove(state->node->ctdb->idr, state->c->hdr.reqid);
+ return 0;
+}
+
+
+/*
+ called when a ctdb_call times out
+*/
+void ctdb_call_timeout(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private)
+{
+ struct ctdb_call_state *state = talloc_get_type(private, struct ctdb_call_state);
+ state->state = CTDB_CALL_ERROR;
+ ctdb_set_error(state->node->ctdb, "ctdb_call %u timed out",
+ state->c->hdr.reqid);
+}
+
+/*
+ construct an event driven local ctdb_call
+
+ this is used so that locally processed ctdb_call requests are processed
+ in an event driven manner
+*/
+struct ctdb_call_state *ctdb_call_local_send(struct ctdb_context *ctdb,
+ TDB_DATA key, int call_id,
+ TDB_DATA *call_data, TDB_DATA *reply_data,
+ struct ctdb_ltdb_header *header,
+ TDB_DATA *data)
+{
+ struct ctdb_call_state *state;
+ int ret;
+
+ state = talloc_zero(ctdb, struct ctdb_call_state);
+ CTDB_NO_MEMORY_NULL(ctdb, state);
+
+ state->state = CTDB_CALL_DONE;
+ state->node = ctdb->nodes[ctdb->vnn];
+
+ ret = ctdb_call_local(ctdb, key, header, data,
+ call_id, call_data, &state->reply_data,
+ ctdb->vnn);
+ return state;
+}
+
+
+/*
+ make a remote ctdb call - async send
+
+ This constructs a ctdb_call request and queues it for processing.
+ This call never blocks.
+*/
+struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb,
+ TDB_DATA key, int call_id,
+ TDB_DATA *call_data, TDB_DATA *reply_data)
+{
+ uint32_t len;
+ struct ctdb_call_state *state;
+ int ret;
+ struct ctdb_ltdb_header header;
+ TDB_DATA data;
+
+ /*
+ if we are the dmaster for this key then we don't need to
+ send it off at all, we can bypass the network and handle it
+ locally. To find out if we are the dmaster we need to look
+ in our ltdb
+ */
+ ret = ctdb_ltdb_fetch(ctdb, key, &header, &data);
+ if (ret != 0) return NULL;
+
+ if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
+ return ctdb_call_local_send(ctdb, key, call_id, call_data, reply_data,
+ &header, &data);
+ }
+
+ state = talloc_zero(ctdb, struct ctdb_call_state);
+ CTDB_NO_MEMORY_NULL(ctdb, state);
+
+ len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
+ state->c = ctdb->methods->allocate_pkt(ctdb, len);
+ CTDB_NO_MEMORY_NULL(ctdb, state->c);
+
+ state->c->hdr.length = len;
+ state->c->hdr.operation = CTDB_REQ_CALL;
+ state->c->hdr.destnode = header.dmaster;
+ state->c->hdr.srcnode = ctdb->vnn;
+ /* this limits us to 16k outstanding messages - not unreasonable */
+ state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
+ DEBUG(0,("Allocate reqid %u\n", state->c->hdr.reqid));
+ state->c->callid = call_id;
+ state->c->keylen = key.dsize;
+ state->c->calldatalen = call_data?call_data->dsize:0;
+ memcpy(&state->c->data[0], key.dptr, key.dsize);
+ if (call_data) {
+ memcpy(&state->c->data[key.dsize], call_data->dptr, call_data->dsize);
+ state->call_data.dptr = &state->c->data[key.dsize];
+ state->call_data.dsize = call_data->dsize;
+ }
+ state->key.dptr = &state->c->data[0];
+ state->key.dsize = key.dsize;
+
+ state->node = ctdb->nodes[header.dmaster];
+ state->state = CTDB_CALL_WAIT;
+ state->header = header;
+
+ talloc_set_destructor(state, ctdb_call_destructor);
+
+ ctdb_queue_packet(ctdb, &state->c->hdr);
+
+ event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0),
+ ctdb_call_timeout, state);
+ return state;
+}
+
+
+/*
+ make a remote ctdb call - async recv.
+
+ This is called when the program wants to wait for a ctdb_call to complete and get the
+ results. This call will block unless the call has already completed.
+*/
+int ctdb_call_recv(struct ctdb_call_state *state, TDB_DATA *reply_data)
+{
+ while (state->state < CTDB_CALL_DONE) {
+ event_loop_once(state->node->ctdb->ev);
+ }
+ if (state->state != CTDB_CALL_DONE) {
+ ctdb_set_error(state->node->ctdb, "%s", state->errmsg);
+ talloc_free(state);
+ return -1;
+ }
+ if (reply_data) {
+ reply_data->dptr = talloc_memdup(state->node->ctdb,
+ state->reply_data.dptr,
+ state->reply_data.dsize);
+ reply_data->dsize = state->reply_data.dsize;
+ }
+ talloc_free(state);
+ return 0;
+}
+
+/*
+ full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
+*/
+int ctdb_call(struct ctdb_context *ctdb,
+ TDB_DATA key, int call_id,
+ TDB_DATA *call_data, TDB_DATA *reply_data)
+{
+ struct ctdb_call_state *state;
+ state = ctdb_call_send(ctdb, key, call_id, call_data, reply_data);
+ return ctdb_call_recv(state, reply_data);
+}
diff --git a/source4/cluster/ctdb/common/ctdb_ltdb.c b/source4/cluster/ctdb/common/ctdb_ltdb.c
new file mode 100644
index 0000000000..bc15a3e898
--- /dev/null
+++ b/source4/cluster/ctdb/common/ctdb_ltdb.c
@@ -0,0 +1,139 @@
+/*
+ ctdb ltdb code
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/tdb/include/tdb.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "cluster/ctdb/include/ctdb_private.h"
+
+/*
+ attach to a specific database
+*/
+int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags,
+ int open_flags, mode_t mode)
+{
+ /* when we have a separate daemon this will need to be a real
+ file, not a TDB_INTERNAL, so the parent can access it to
+ for ltdb bypass */
+ ctdb->ltdb = tdb_open(name, 0, /* tdb_flags */ TDB_INTERNAL, open_flags, mode);
+ if (ctdb->ltdb == NULL) {
+ ctdb_set_error(ctdb, "Failed to open tdb %s\n", name);
+ return -1;
+ }
+ return 0;
+}
+
+/*
+ return the lmaster given a key
+*/
+uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
+{
+ return ctdb_hash(key) % ctdb->num_nodes;
+}
+
+
+/*
+ construct an initial header for a record with no ltdb header yet
+*/
+static void ltdb_initial_header(struct ctdb_context *ctdb,
+ TDB_DATA key,
+ struct ctdb_ltdb_header *header)
+{
+ header->rsn = 0;
+ /* initial dmaster is the lmaster */
+ header->dmaster = ctdb_lmaster(ctdb, &key);
+ header->laccessor = header->dmaster;
+ header->lacount = 0;
+}
+
+
+/*
+ fetch a record from the ltdb, separating out the header information
+ and returning the body of the record. A valid (initial) header is
+ returned if the record is not present
+*/
+int ctdb_ltdb_fetch(struct ctdb_context *ctdb,
+ TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA *data)
+{
+ TDB_DATA rec;
+
+ rec = tdb_fetch(ctdb->ltdb, key);
+ if (rec.dsize < sizeof(*header)) {
+ /* return an initial header */
+ free(rec.dptr);
+ ltdb_initial_header(ctdb, key, header);
+ data->dptr = NULL;
+ data->dsize = 0;
+ return 0;
+ }
+
+ *header = *(struct ctdb_ltdb_header *)rec.dptr;
+
+ data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
+ data->dptr = talloc_memdup(ctdb, sizeof(struct ctdb_ltdb_header)+rec.dptr,
+ data->dsize);
+ free(rec.dptr);
+ CTDB_NO_MEMORY(ctdb, data->dptr);
+
+ {
+ int i, fd = open("/dev/null", O_WRONLY);
+ for (i=0;i<data->dsize;i++) {
+ write(fd, &data->dptr[i], 1);
+ }
+ close(fd);
+ }
+
+ return 0;
+}
+
+
+/*
+ fetch a record from the ltdb, separating out the header information
+ and returning the body of the record. A valid (initial) header is
+ returned if the record is not present
+*/
+int ctdb_ltdb_store(struct ctdb_context *ctdb, TDB_DATA key,
+ struct ctdb_ltdb_header *header, TDB_DATA data)
+{
+ TDB_DATA rec;
+ int ret;
+
+ rec.dsize = sizeof(*header) + data.dsize;
+ rec.dptr = talloc_size(ctdb, rec.dsize);
+ CTDB_NO_MEMORY(ctdb, rec.dptr);
+
+ memcpy(rec.dptr, header, sizeof(*header));
+ memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
+
+ {
+ int i, fd = open("/dev/null", O_WRONLY);
+ for (i=0;i<rec.dsize;i++) {
+ write(fd, &rec.dptr[i], 1);
+ }
+ close(fd);
+ }
+
+ ret = tdb_store(ctdb->ltdb, key, rec, TDB_REPLACE);
+ talloc_free(rec.dptr);
+
+ return ret;
+}
diff --git a/source4/cluster/ctdb/common/ctdb_util.c b/source4/cluster/ctdb/common/ctdb_util.c
new file mode 100644
index 0000000000..8e25759609
--- /dev/null
+++ b/source4/cluster/ctdb/common/ctdb_util.c
@@ -0,0 +1,103 @@
+/*
+ ctdb utility code
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/tdb/include/tdb.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "cluster/ctdb/include/ctdb_private.h"
+
+/*
+ return error string for last error
+*/
+const char *ctdb_errstr(struct ctdb_context *ctdb)
+{
+ return ctdb->err_msg;
+}
+
+
+/*
+ remember an error message
+*/
+void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...)
+{
+ va_list ap;
+ talloc_free(ctdb->err_msg);
+ va_start(ap, fmt);
+ ctdb->err_msg = talloc_vasprintf(ctdb, fmt, ap);
+ DEBUG(0,("ctdb error: %s\n", ctdb->err_msg));
+ va_end(ap);
+}
+
+
+/*
+ a fatal internal error occurred - no hope for recovery
+*/
+void ctdb_fatal(struct ctdb_context *ctdb, const char *msg)
+{
+ DEBUG(0,("ctdb fatal error: %s\n", msg));
+ fprintf(stderr, "ctdb fatal error: '%s'\n", msg);
+ abort();
+}
+
+/*
+ parse a IP:port pair
+*/
+int ctdb_parse_address(struct ctdb_context *ctdb,
+ TALLOC_CTX *mem_ctx, const char *str,
+ struct ctdb_address *address)
+{
+ char *p;
+ p = strchr(str, ':');
+ if (p == NULL) {
+ ctdb_set_error(ctdb, "Badly formed node '%s'\n", str);
+ return -1;
+ }
+
+ address->address = talloc_strndup(mem_ctx, str, p-str);
+ address->port = strtoul(p+1, NULL, 0);
+ return 0;
+}
+
+
+/*
+ check if two addresses are the same
+*/
+bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2)
+{
+ return strcmp(a1->address, a2->address) == 0 && a1->port == a2->port;
+}
+
+
+/*
+ hash function for mapping data to a VNN - taken from tdb
+*/
+uint32_t ctdb_hash(const TDB_DATA *key)
+{
+ uint32_t value; /* Used to compute the hash value. */
+ uint32_t i; /* Used to cycle through random values. */
+
+ /* Set the initial value from the key size. */
+ for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
+ value = (value + (key->dptr[i] << (i*5 % 24)));
+
+ return (1103515243 * value + 12345);
+}
diff --git a/source4/cluster/ctdb/config.mk b/source4/cluster/ctdb/config.mk
new file mode 100644
index 0000000000..90897eedd3
--- /dev/null
+++ b/source4/cluster/ctdb/config.mk
@@ -0,0 +1,24 @@
+##################
+[MODULE::brlock_ctdb]
+SUBSYSTEM = ntvfs_common
+OBJ_FILES = brlock_ctdb.o
+
+##################
+[MODULE::ctdb_tcp]
+SUBSYSTEM = CLUSTER
+OBJ_FILES = \
+ tcp/tcp_init.o \
+ tcp/tcp_io.o \
+ tcp/tcp_connect.o
+
+##################
+[MODULE::ctdb]
+SUBSYSTEM = CLUSTER
+OBJ_FILES = \
+ ctdb_cluster.o \
+ common/ctdb.o \
+ common/ctdb_call.o \
+ common/ctdb_ltdb.o \
+ common/ctdb_util.o
+PRIVATE_DEPENDENCIES = ctdb_tcp
+PUBLIC_DEPENDENCIES = LIBTDB LIBTALLOC
diff --git a/source4/cluster/ctdb/ctdb_cluster.c b/source4/cluster/ctdb/ctdb_cluster.c
new file mode 100644
index 0000000000..df16f2f8b5
--- /dev/null
+++ b/source4/cluster/ctdb/ctdb_cluster.c
@@ -0,0 +1,138 @@
+/*
+ Unix SMB/CIFS implementation.
+
+ ctdb clustering hooks
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "cluster/cluster.h"
+#include "system/filesys.h"
+#include "cluster/cluster_private.h"
+#include "lib/tdb/include/tdb.h"
+#include "cluster/ctdb/include/ctdb.h"
+
+struct cluster_state {
+ struct ctdb_context *ctdb;
+};
+
+
+/*
+ return a server_id for a ctdb node
+*/
+static struct server_id ctdb_id(struct cluster_ops *ops, uint32_t id)
+{
+ struct ctdb_context *ctdb = ops->private;
+ struct server_id server_id;
+ server_id.node = ctdb_get_vnn(ctdb);
+ server_id.id = id;
+ return server_id;
+}
+
+
+/*
+ return a server_id as a string
+*/
+static const char *ctdb_id_string(struct cluster_ops *ops,
+ TALLOC_CTX *mem_ctx, struct server_id id)
+{
+ return talloc_asprintf(mem_ctx, "%u.%u", id.node, id.id);
+}
+
+static struct cluster_ops cluster_ctdb_ops = {
+ .cluster_id = ctdb_id,
+ .cluster_id_string = ctdb_id_string,
+ .private = NULL
+};
+
+/* initialise ctdb */
+void cluster_ctdb_init(struct event_context *ev)
+{
+ const char *nlist;
+ const char *address;
+ const char *transport;
+ struct cluster_state *state;
+ int ret;
+
+ nlist = lp_parm_string(-1, "ctdb", "nlist");
+ if (nlist == NULL) return;
+
+ address = lp_parm_string(-1, "ctdb", "address");
+ if (address == NULL) return;
+
+ transport = lp_parm_string(-1, "ctdb", "transport");
+ if (transport == NULL) {
+ transport = "tcp";
+ }
+
+ state = talloc(ev, struct cluster_state);
+ if (state == NULL) goto failed;
+
+ state->ctdb = ctdb_init(ev);
+ if (state->ctdb == NULL) goto failed;
+
+ cluster_ctdb_ops.private = state->ctdb;
+
+ ret = ctdb_set_transport(state->ctdb, transport);
+ if (ret == -1) {
+ DEBUG(0,("ctdb_set_transport failed - %s\n",
+ ctdb_errstr(state->ctdb)));
+ goto failed;
+ }
+
+// ctdb_set_flags(state->ctdb, CTDB_FLAG_SELF_CONNECT);
+
+ /* tell ctdb what address to listen on */
+ ret = ctdb_set_address(state->ctdb, address);
+ if (ret == -1) {
+ DEBUG(0,("ctdb_set_address failed - %s\n", ctdb_errstr(state->ctdb)));
+ goto failed;
+ }
+
+ /* tell ctdb what nodes are available */
+ ret = ctdb_set_nlist(state->ctdb, nlist);
+ if (ret == -1) {
+ DEBUG(0,("ctdb_set_nlist failed - %s\n", ctdb_errstr(state->ctdb)));
+ goto failed;
+ }
+
+ ret = ctdb_attach(state->ctdb, "cluster.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666);
+ if (ret == -1) {
+ DEBUG(0,("ctdb_attach failed - %s\n", ctdb_errstr(state->ctdb)));
+ goto failed;
+ }
+
+ /* start the protocol running */
+ ret = ctdb_start(state->ctdb);
+ if (ret == -1) {
+ DEBUG(0,("ctdb_start failed - %s\n", ctdb_errstr(state->ctdb)));
+ goto failed;
+ }
+
+ /* wait until all nodes are connected (should not be needed
+ outide of test code) */
+ ctdb_connect_wait(state->ctdb);
+
+ cluster_set_ops(&cluster_ctdb_ops);
+ return;
+
+failed:
+ DEBUG(0,("cluster_ctdb_init failed\n"));
+ talloc_free(state);
+}
diff --git a/source4/cluster/ctdb/ctdb_cluster.h b/source4/cluster/ctdb/ctdb_cluster.h
new file mode 100644
index 0000000000..5f93df960c
--- /dev/null
+++ b/source4/cluster/ctdb/ctdb_cluster.h
@@ -0,0 +1,23 @@
+/*
+ Unix SMB/CIFS implementation.
+
+ ctdb clustering hooks - header
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+*/
+
+void cluster_ctdb_init(struct event_context *ev);
diff --git a/source4/cluster/ctdb/include/ctdb.h b/source4/cluster/ctdb/include/ctdb.h
new file mode 100644
index 0000000000..21b9b5d1ce
--- /dev/null
+++ b/source4/cluster/ctdb/include/ctdb.h
@@ -0,0 +1,117 @@
+/*
+ ctdb database library
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#ifndef _CTDB_H
+#define _CTDB_H
+
+/*
+ structure passed to a ctdb call function
+*/
+struct ctdb_call {
+ TDB_DATA key; /* record key */
+ TDB_DATA record_data; /* current data in the record */
+ TDB_DATA *new_data; /* optionally updated record data */
+ TDB_DATA *call_data; /* optionally passed from caller */
+ TDB_DATA *reply_data; /* optionally returned by function */
+};
+
+#define CTDB_ERR_INVALID 1
+#define CTDB_ERR_NOMEM 2
+
+/*
+ ctdb flags
+*/
+#define CTDB_FLAG_SELF_CONNECT (1<<0)
+
+
+struct event_context;
+
+/*
+ initialise ctdb subsystem
+*/
+struct ctdb_context *ctdb_init(struct event_context *ev);
+
+/*
+ choose the transport
+*/
+int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport);
+
+/*
+ set some flags
+*/
+void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags);
+
+/*
+ tell ctdb what address to listen on, in transport specific format
+*/
+int ctdb_set_address(struct ctdb_context *ctdb, const char *address);
+
+/*
+ tell ctdb what nodes are available. This takes a filename, which will contain
+ 1 node address per line, in a transport specific format
+*/
+int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist);
+
+/*
+ start the ctdb protocol
+*/
+int ctdb_start(struct ctdb_context *ctdb);
+
+/*
+ error string for last ctdb error
+*/
+const char *ctdb_errstr(struct ctdb_context *);
+
+/* a ctdb call function */
+typedef int (*ctdb_fn_t)(struct ctdb_call *);
+
+/*
+ setup a ctdb call function
+*/
+int ctdb_set_call(struct ctdb_context *ctdb, ctdb_fn_t fn, int id);
+
+/*
+ attach to a ctdb database
+*/
+int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags,
+ int open_flags, mode_t mode);
+
+
+/*
+ make a ctdb call. The associated ctdb call function will be called on the DMASTER
+ for the given record
+*/
+int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
+ TDB_DATA *call_data, TDB_DATA *reply_data);
+
+/*
+ wait for all nodes to be connected - useful for test code
+*/
+void ctdb_connect_wait(struct ctdb_context *ctdb);
+
+/*
+ wait until we're the only node left
+*/
+void ctdb_wait_loop(struct ctdb_context *ctdb);
+
+/* return vnn of this node */
+uint32_t ctdb_get_vnn(struct ctdb_context *ctdb);
+
+#endif
diff --git a/source4/cluster/ctdb/include/ctdb_private.h b/source4/cluster/ctdb/include/ctdb_private.h
new file mode 100644
index 0000000000..d373e3af32
--- /dev/null
+++ b/source4/cluster/ctdb/include/ctdb_private.h
@@ -0,0 +1,216 @@
+/*
+ ctdb database library
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#ifndef _CTDB_PRIVATE_H
+#define _CTDB_PRIVATE_H
+
+#include "ctdb.h"
+
+/*
+ an installed ctdb remote call
+*/
+struct ctdb_registered_call {
+ struct ctdb_registered_call *next, *prev;
+ uint32_t id;
+ ctdb_fn_t fn;
+};
+
+/*
+ this address structure might need to be generalised later for some
+ transports
+*/
+struct ctdb_address {
+ const char *address;
+ int port;
+};
+
+/*
+ state associated with one node
+*/
+struct ctdb_node {
+ struct ctdb_context *ctdb;
+ struct ctdb_address address;
+ const char *name; /* for debug messages */
+ void *private; /* private to transport */
+ uint32_t vnn;
+};
+
+/*
+ transport specific methods
+*/
+struct ctdb_methods {
+ int (*start)(struct ctdb_context *); /* start protocol processing */
+ int (*add_node)(struct ctdb_node *); /* setup a new node */
+ int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length);
+ void *(*allocate_pkt)(struct ctdb_context *, size_t );
+};
+
+/*
+ transport calls up to the ctdb layer
+*/
+struct ctdb_upcalls {
+ /* recv_pkt is called when a packet comes in */
+ void (*recv_pkt)(struct ctdb_context *, uint8_t *data, uint32_t length);
+
+ /* node_dead is called when an attempt to send to a node fails */
+ void (*node_dead)(struct ctdb_node *);
+
+ /* node_connected is called when a connection to a node is established */
+ void (*node_connected)(struct ctdb_node *);
+};
+
+/* main state of the ctdb daemon */
+struct ctdb_context {
+ struct event_context *ev;
+ struct ctdb_address address;
+ const char *name;
+ uint32_t vnn; /* our own vnn */
+ uint32_t num_nodes;
+ uint32_t num_connected;
+ unsigned flags;
+ struct idr_context *idr;
+ struct ctdb_node **nodes; /* array of nodes in the cluster - indexed by vnn */
+ struct ctdb_registered_call *calls; /* list of registered calls */
+ char *err_msg;
+ struct tdb_context *ltdb;
+ const struct ctdb_methods *methods; /* transport methods */
+ const struct ctdb_upcalls *upcalls; /* transport upcalls */
+ void *private; /* private to transport */
+};
+
+#define CTDB_NO_MEMORY(ctdb, p) do { if (!(p)) { \
+ ctdb_set_error(ctdb, "Out of memory at %s:%d", __FILE__, __LINE__); \
+ return -1; }} while (0)
+
+#define CTDB_NO_MEMORY_NULL(ctdb, p) do { if (!(p)) { \
+ ctdb_set_error(ctdb, "Out of memory at %s:%d", __FILE__, __LINE__); \
+ return NULL; }} while (0)
+
+#define CTDB_NO_MEMORY_FATAL(ctdb, p) do { if (!(p)) { \
+ ctdb_fatal(ctdb, "Out of memory in " __location__ ); \
+ }} while (0)
+
+/* arbitrary maximum timeout for ctdb operations */
+#define CTDB_REQ_TIMEOUT 10
+
+/* max number of redirects before we ask the lmaster */
+#define CTDB_MAX_REDIRECT 2
+
+/* number of consecutive calls from the same node before we give them
+ the record */
+#define CTDB_MAX_LACOUNT 7000
+
+/*
+ the extended header for records in the ltdb
+*/
+struct ctdb_ltdb_header {
+ uint64_t rsn;
+ uint32_t dmaster;
+ uint32_t laccessor;
+ uint32_t lacount;
+};
+
+
+/*
+ operation IDs
+*/
+enum ctdb_operation {
+ CTDB_REQ_CALL = 0,
+ CTDB_REPLY_CALL = 1,
+ CTDB_REPLY_REDIRECT = 2,
+ CTDB_REQ_DMASTER = 3,
+ CTDB_REPLY_DMASTER = 4,
+ CTDB_REPLY_ERROR = 5
+};
+
+/*
+ packet structures
+*/
+struct ctdb_req_header {
+ uint32_t length;
+ uint32_t operation;
+ uint32_t destnode;
+ uint32_t srcnode;
+ uint32_t reqid;
+};
+
+struct ctdb_req_call {
+ struct ctdb_req_header hdr;
+ uint32_t callid;
+ uint32_t keylen;
+ uint32_t calldatalen;
+ uint8_t data[0]; /* key[] followed by calldata[] */
+};
+
+struct ctdb_reply_call {
+ struct ctdb_req_header hdr;
+ uint32_t datalen;
+ uint8_t data[0];
+};
+
+struct ctdb_reply_error {
+ struct ctdb_req_header hdr;
+ uint32_t status;
+ uint32_t msglen;
+ uint8_t msg[0];
+};
+
+struct ctdb_reply_redirect {
+ struct ctdb_req_header hdr;
+ uint32_t dmaster;
+};
+
+struct ctdb_req_dmaster {
+ struct ctdb_req_header hdr;
+ uint32_t dmaster;
+ uint32_t keylen;
+ uint32_t datalen;
+ uint8_t data[0];
+};
+
+struct ctdb_reply_dmaster {
+ struct ctdb_req_header hdr;
+ uint32_t datalen;
+ uint8_t data[0];
+};
+
+/* internal prototypes */
+void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...);
+void ctdb_fatal(struct ctdb_context *ctdb, const char *msg);
+bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2);
+int ctdb_parse_address(struct ctdb_context *ctdb,
+ TALLOC_CTX *mem_ctx, const char *str,
+ struct ctdb_address *address);
+uint32_t ctdb_hash(const TDB_DATA *key);
+void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
+void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
+void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
+void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
+void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
+void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
+
+uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key);
+int ctdb_ltdb_fetch(struct ctdb_context *ctdb,
+ TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA *data);
+int ctdb_ltdb_store(struct ctdb_context *ctdb, TDB_DATA key,
+ struct ctdb_ltdb_header *header, TDB_DATA data);
+
+
+#endif
diff --git a/source4/cluster/ctdb/tcp/ctdb_tcp.h b/source4/cluster/ctdb/tcp/ctdb_tcp.h
new file mode 100644
index 0000000000..0f8ce300b4
--- /dev/null
+++ b/source4/cluster/ctdb/tcp/ctdb_tcp.h
@@ -0,0 +1,76 @@
+/*
+ ctdb database library
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+
+/* ctdb_tcp main state */
+struct ctdb_tcp {
+ int listen_fd;
+};
+
+/*
+ incoming packet structure - only used when we get a partial packet
+ on read
+*/
+struct ctdb_tcp_partial {
+ uint8_t *data;
+ uint32_t length;
+};
+
+
+/*
+ state associated with an incoming connection
+*/
+struct ctdb_incoming {
+ struct ctdb_context *ctdb;
+ int fd;
+ struct ctdb_tcp_partial partial;
+};
+
+/*
+ outgoing packet structure - only allocated when we can't write immediately
+ to the socket
+*/
+struct ctdb_tcp_packet {
+ struct ctdb_tcp_packet *next, *prev;
+ uint8_t *data;
+ uint32_t length;
+};
+
+/*
+ state associated with one tcp node
+*/
+struct ctdb_tcp_node {
+ int fd;
+ struct fd_event *fde;
+ struct ctdb_tcp_packet *queue;
+};
+
+
+/* prototypes internal to tcp transport */
+void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde,
+ uint16_t flags, void *private);
+void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
+ uint16_t flags, void *private);
+int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length);
+int ctdb_tcp_listen(struct ctdb_context *ctdb);
+void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private);
+
+#define CTDB_TCP_ALIGNMENT 8
diff --git a/source4/cluster/ctdb/tcp/tcp_connect.c b/source4/cluster/ctdb/tcp/tcp_connect.c
new file mode 100644
index 0000000000..2404144ac1
--- /dev/null
+++ b/source4/cluster/ctdb/tcp/tcp_connect.c
@@ -0,0 +1,191 @@
+/*
+ ctdb over TCP
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/tdb/include/tdb.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "cluster/ctdb/include/ctdb_private.h"
+#include "ctdb_tcp.h"
+
+static void set_nonblocking(int fd)
+{
+ unsigned v;
+ v = fcntl(fd, F_GETFL, 0);
+ fcntl(fd, F_SETFL, v | O_NONBLOCK);
+}
+
+
+/*
+ called when socket becomes writeable on connect
+*/
+static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *fde,
+ uint16_t flags, void *private)
+{
+ struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
+ struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
+ struct ctdb_tcp_node);
+ struct ctdb_context *ctdb = node->ctdb;
+ int error = 0;
+ socklen_t len = sizeof(error);
+
+ if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
+ error != 0) {
+ talloc_free(fde);
+ close(tnode->fd);
+ tnode->fd = -1;
+ event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0),
+ ctdb_tcp_node_connect, node);
+ return;
+ }
+
+ talloc_free(fde);
+ tnode->fde = event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_READ,
+ ctdb_tcp_node_write, node);
+
+ /* tell the ctdb layer we are connected */
+ node->ctdb->upcalls->node_connected(node);
+
+ if (tnode->queue) {
+ EVENT_FD_WRITEABLE(tnode->fde);
+ }
+}
+
+/*
+ called when we should try and establish a tcp connection to a node
+*/
+void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private)
+{
+ struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
+ struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
+ struct ctdb_tcp_node);
+ struct ctdb_context *ctdb = node->ctdb;
+ struct sockaddr_in sock_out;
+
+ tnode->fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+
+ set_nonblocking(tnode->fd);
+
+ inet_pton(AF_INET, node->address.address, &sock_out.sin_addr);
+ sock_out.sin_port = htons(node->address.port);
+ sock_out.sin_family = PF_INET;
+
+ if (connect(tnode->fd, (struct sockaddr *)&sock_out, sizeof(sock_out)) != 0 &&
+ errno != EINPROGRESS) {
+ /* try again once a second */
+ close(tnode->fd);
+ event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0),
+ ctdb_tcp_node_connect, node);
+ return;
+ }
+
+ /* non-blocking connect - wait for write event */
+ event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_WRITE|EVENT_FD_READ,
+ ctdb_node_connect_write, node);
+}
+
+/*
+ destroy a ctdb_incoming structure
+*/
+static int ctdb_incoming_destructor(struct ctdb_incoming *in)
+{
+ close(in->fd);
+ in->fd = -1;
+ return 0;
+}
+
+/*
+ called when we get contacted by another node
+ currently makes no attempt to check if the connection is really from a ctdb
+ node in our cluster
+*/
+static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
+ uint16_t flags, void *private)
+{
+ struct ctdb_context *ctdb;
+ struct ctdb_tcp *ctcp;
+ struct sockaddr_in addr;
+ socklen_t len;
+ int fd;
+ struct ctdb_incoming *in;
+
+ ctdb = talloc_get_type(private, struct ctdb_context);
+ ctcp = talloc_get_type(ctdb->private, struct ctdb_tcp);
+ memset(&addr, 0, sizeof(addr));
+ len = sizeof(addr);
+ fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
+ if (fd == -1) return;
+
+ in = talloc_zero(ctdb, struct ctdb_incoming);
+ in->fd = fd;
+ in->ctdb = ctdb;
+
+ set_nonblocking(in->fd);
+
+ event_add_fd(ctdb->ev, in, in->fd, EVENT_FD_READ,
+ ctdb_tcp_incoming_read, in);
+
+ talloc_set_destructor(in, ctdb_incoming_destructor);
+}
+
+
+/*
+ listen on our own address
+*/
+int ctdb_tcp_listen(struct ctdb_context *ctdb)
+{
+ struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private, struct ctdb_tcp);
+ struct sockaddr_in sock;
+ int one = 1;
+
+ sock.sin_port = htons(ctdb->address.port);
+ sock.sin_family = PF_INET;
+ inet_pton(AF_INET, ctdb->address.address, &sock.sin_addr);
+
+ ctcp->listen_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (ctcp->listen_fd == -1) {
+ ctdb_set_error(ctdb, "socket failed\n");
+ return -1;
+ }
+
+ setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
+
+ if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sizeof(sock)) != 0) {
+ ctdb_set_error(ctdb, "bind failed\n");
+ close(ctcp->listen_fd);
+ ctcp->listen_fd = -1;
+ return -1;
+ }
+
+ if (listen(ctcp->listen_fd, 10) == -1) {
+ ctdb_set_error(ctdb, "listen failed\n");
+ close(ctcp->listen_fd);
+ ctcp->listen_fd = -1;
+ return -1;
+ }
+
+ event_add_fd(ctdb->ev, ctdb, ctcp->listen_fd, EVENT_FD_READ,
+ ctdb_listen_event, ctdb);
+
+ return 0;
+}
+
diff --git a/source4/cluster/ctdb/tcp/tcp_init.c b/source4/cluster/ctdb/tcp/tcp_init.c
new file mode 100644
index 0000000000..b8ee8cb30e
--- /dev/null
+++ b/source4/cluster/ctdb/tcp/tcp_init.c
@@ -0,0 +1,102 @@
+/*
+ ctdb over TCP
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#include "includes.h"
+#include "lib/tdb/include/tdb.h"
+#include "lib/events/events.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "cluster/ctdb/include/ctdb_private.h"
+#include "ctdb_tcp.h"
+
+/*
+ start the protocol going
+*/
+int ctdb_tcp_start(struct ctdb_context *ctdb)
+{
+ int i;
+
+ /* listen on our own address */
+ if (ctdb_tcp_listen(ctdb) != 0) return -1;
+
+ /* startup connections to the other servers - will happen on
+ next event loop */
+ for (i=0;i<ctdb->num_nodes;i++) {
+ struct ctdb_node *node = *(ctdb->nodes + i);
+ if (!(ctdb->flags & CTDB_FLAG_SELF_CONNECT) &&
+ ctdb_same_address(&ctdb->address, &node->address)) continue;
+ event_add_timed(ctdb->ev, node, timeval_zero(),
+ ctdb_tcp_node_connect, node);
+ }
+
+ return 0;
+}
+
+
+/*
+ initialise tcp portion of a ctdb node
+*/
+int ctdb_tcp_add_node(struct ctdb_node *node)
+{
+ struct ctdb_tcp_node *tnode;
+ tnode = talloc_zero(node, struct ctdb_tcp_node);
+ CTDB_NO_MEMORY(node->ctdb, tnode);
+
+ tnode->fd = -1;
+ node->private = tnode;
+ return 0;
+}
+
+
+/*
+ transport packet allocator - allows transport to control memory for packets
+*/
+void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size)
+{
+ /* tcp transport needs to round to 8 byte alignment to ensure
+ that we can use a length header and 64 bit elements in
+ structures */
+ size = (size+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
+ return talloc_size(ctdb, size);
+}
+
+
+static const struct ctdb_methods ctdb_tcp_methods = {
+ .start = ctdb_tcp_start,
+ .add_node = ctdb_tcp_add_node,
+ .queue_pkt = ctdb_tcp_queue_pkt,
+ .allocate_pkt = ctdb_tcp_allocate_pkt
+};
+
+/*
+ initialise tcp portion of ctdb
+*/
+int ctdb_tcp_init(struct ctdb_context *ctdb)
+{
+ struct ctdb_tcp *ctcp;
+ ctcp = talloc_zero(ctdb, struct ctdb_tcp);
+ CTDB_NO_MEMORY(ctdb, ctcp);
+
+ ctcp->listen_fd = -1;
+ ctdb->private = ctcp;
+ ctdb->methods = &ctdb_tcp_methods;
+ return 0;
+}
+
diff --git a/source4/cluster/ctdb/tcp/tcp_io.c b/source4/cluster/ctdb/tcp/tcp_io.c
new file mode 100644
index 0000000000..82e24f7260
--- /dev/null
+++ b/source4/cluster/ctdb/tcp/tcp_io.c
@@ -0,0 +1,254 @@
+/*
+ ctdb over TCP
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/util/dlinklist.h"
+#include "lib/tdb/include/tdb.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "cluster/ctdb/include/ctdb_private.h"
+#include "ctdb_tcp.h"
+
+
+/*
+ called when we fail to send a message to a node
+*/
+static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private)
+{
+ struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
+ struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
+ struct ctdb_tcp_node);
+
+ /* flush the queue */
+ while (tnode->queue) {
+ struct ctdb_tcp_packet *pkt = tnode->queue;
+ DLIST_REMOVE(tnode->queue, pkt);
+ talloc_free(pkt);
+ }
+
+ /* start a new connect cycle to try to re-establish the
+ link */
+ talloc_free(tnode->fde);
+ close(tnode->fd);
+ tnode->fd = -1;
+ event_add_timed(node->ctdb->ev, node, timeval_zero(),
+ ctdb_tcp_node_connect, node);
+}
+
+/*
+ called when socket becomes readable
+*/
+void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde,
+ uint16_t flags, void *private)
+{
+ struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
+ struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
+ struct ctdb_tcp_node);
+ if (flags & EVENT_FD_READ) {
+ /* getting a read event on this fd in the current tcp model is
+ always an error, as we have separate read and write
+ sockets. In future we may combine them, but for now it must
+ mean that the socket is dead, so we try to reconnect */
+ talloc_free(tnode->fde);
+ close(tnode->fd);
+ tnode->fd = -1;
+ event_add_timed(node->ctdb->ev, node, timeval_zero(),
+ ctdb_tcp_node_connect, node);
+ return;
+ }
+
+ while (tnode->queue) {
+ struct ctdb_tcp_packet *pkt = tnode->queue;
+ ssize_t n;
+
+ n = write(tnode->fd, pkt->data, pkt->length);
+
+ if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ event_add_timed(node->ctdb->ev, node, timeval_zero(),
+ ctdb_tcp_node_dead, node);
+ EVENT_FD_NOT_WRITEABLE(tnode->fde);
+ return;
+ }
+ if (n <= 0) return;
+
+ if (n != pkt->length) {
+ pkt->length -= n;
+ pkt->data += n;
+ return;
+ }
+
+ DLIST_REMOVE(tnode->queue, pkt);
+ talloc_free(pkt);
+ }
+
+ EVENT_FD_NOT_WRITEABLE(tnode->fde);
+}
+
+
+/*
+ called when an incoming connection is readable
+*/
+void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
+ uint16_t flags, void *private)
+{
+ struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
+ int num_ready = 0;
+ ssize_t nread;
+ uint8_t *data, *data_base;
+
+ if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
+ num_ready == 0) {
+ /* we've lost the link from another node. We don't
+ notify the upper layers, as we only want to trigger
+ a full node reorganisation when a send fails - that
+ allows nodes to restart without penalty as long as
+ the network is idle */
+ talloc_free(in);
+ return;
+ }
+
+ in->partial.data = talloc_realloc_size(in, in->partial.data,
+ num_ready + in->partial.length);
+ if (in->partial.data == NULL) {
+ /* not much we can do except drop the socket */
+ talloc_free(in);
+ return;
+ }
+
+ nread = read(in->fd, in->partial.data+in->partial.length, num_ready);
+ if (nread <= 0) {
+ /* the connection must be dead */
+ talloc_free(in);
+ return;
+ }
+
+ data = in->partial.data;
+ nread += in->partial.length;
+
+ in->partial.data = NULL;
+ in->partial.length = 0;
+
+ if (nread >= 4 && *(uint32_t *)data == nread) {
+ /* most common case - we got a whole packet in one go
+ tell the ctdb layer above that we have a packet */
+ in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread);
+ return;
+ }
+
+ data_base = data;
+
+ while (nread >= 4 && *(uint32_t *)data <= nread) {
+ /* we have at least one packet */
+ uint8_t *d2;
+ uint32_t len;
+ len = *(uint32_t *)data;
+ d2 = talloc_memdup(in, data, len);
+ if (d2 == NULL) {
+ /* sigh */
+ talloc_free(in);
+ return;
+ }
+ in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len);
+ data += len;
+ nread -= len;
+ return;
+ }
+
+ if (nread < 4 || *(uint32_t *)data > nread) {
+ /* we have only part of a packet */
+ if (data_base == data) {
+ in->partial.data = data;
+ in->partial.length = nread;
+ } else {
+ in->partial.data = talloc_memdup(in, data, nread);
+ if (in->partial.data == NULL) {
+ talloc_free(in);
+ return;
+ }
+ in->partial.length = nread;
+ talloc_free(data_base);
+ }
+ return;
+ }
+
+ talloc_free(data_base);
+}
+
+/*
+ queue a packet for sending
+*/
+int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
+{
+ struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
+ struct ctdb_tcp_node);
+ struct ctdb_tcp_packet *pkt;
+ uint32_t length2;
+
+ /* enforce the length and alignment rules from the tcp packet allocator */
+ length2 = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
+ *(uint32_t *)data = length2;
+
+ if (length2 != length) {
+ memset(data+length, 0, length2-length);
+ }
+ {
+ int i, fd = open("/dev/null", O_WRONLY);
+ for (i=0;i<length2;i++) {
+ write(fd, &data[i], 1);
+ }
+ close(fd);
+ }
+
+ /* if the queue is empty then try an immediate write, avoiding
+ queue overhead. This relies on non-blocking sockets */
+ if (tnode->queue == NULL && tnode->fd != -1) {
+ ssize_t n = write(tnode->fd, data, length2);
+ if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ event_add_timed(node->ctdb->ev, node, timeval_zero(),
+ ctdb_tcp_node_dead, node);
+ /* yes, we report success, as the dead node is
+ handled via a separate event */
+ return 0;
+ }
+ if (n > 0) {
+ data += n;
+ length2 -= n;
+ }
+ if (length2 == 0) return 0;
+ }
+
+ pkt = talloc(tnode, struct ctdb_tcp_packet);
+ CTDB_NO_MEMORY(node->ctdb, pkt);
+
+ pkt->data = talloc_memdup(pkt, data, length2);
+ CTDB_NO_MEMORY(node->ctdb, pkt->data);
+
+ pkt->length = length2;
+
+ if (tnode->queue == NULL && tnode->fd != -1) {
+ EVENT_FD_WRITEABLE(tnode->fde);
+ }
+
+ DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);
+
+ return 0;
+}
diff --git a/source4/cluster/ctdb/tests/ctdb_bench.c b/source4/cluster/ctdb/tests/ctdb_bench.c
new file mode 100644
index 0000000000..01a8cc0f15
--- /dev/null
+++ b/source4/cluster/ctdb/tests/ctdb_bench.c
@@ -0,0 +1,228 @@
+/*
+ simple ctdb benchmark
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/filesys.h"
+#include "popt.h"
+
+#include <sys/time.h>
+#include <time.h>
+
+static struct timeval tp1,tp2;
+
+static void start_timer(void)
+{
+ gettimeofday(&tp1,NULL);
+}
+
+static double end_timer(void)
+{
+ gettimeofday(&tp2,NULL);
+ return (tp2.tv_sec + (tp2.tv_usec*1.0e-6)) -
+ (tp1.tv_sec + (tp1.tv_usec*1.0e-6));
+}
+
+
+static int timelimit = 10;
+static int num_records = 10;
+static int num_repeats = 100;
+
+enum my_functions {FUNC_INCR=1, FUNC_FETCH=2};
+
+/*
+ ctdb call function to increment an integer
+*/
+static int incr_func(struct ctdb_call *call)
+{
+ if (call->record_data.dsize == 0) {
+ call->new_data = talloc(call, TDB_DATA);
+ if (call->new_data == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+ call->new_data->dptr = talloc_size(call, 4);
+ call->new_data->dsize = 4;
+ *(uint32_t *)call->new_data->dptr = 0;
+ } else {
+ call->new_data = &call->record_data;
+ }
+ (*(uint32_t *)call->new_data->dptr)++;
+ return 0;
+}
+
+/*
+ ctdb call function to fetch a record
+*/
+static int fetch_func(struct ctdb_call *call)
+{
+ call->reply_data = &call->record_data;
+ return 0;
+}
+
+/*
+ benchmark incrementing an integer
+*/
+static void bench_incr(struct ctdb_context *ctdb)
+{
+ TDB_DATA key, data;
+ int loops=0;
+ int ret, i;
+
+ start_timer();
+
+ while (1) {
+ uint32_t v = loops % num_records;
+ key.dptr = &v;
+ key.dsize = 4;
+ for (i=0;i<num_repeats;i++) {
+ ret = ctdb_call(ctdb, key, FUNC_INCR, NULL, NULL);
+ if (ret != 0) {
+ printf("incr call failed - %s\n", ctdb_errstr(ctdb));
+ return;
+ }
+ }
+ if (num_repeats * (++loops) % 10000 == 0) {
+ if (end_timer() > timelimit) break;
+ printf("Incr: %.2f ops/sec\r", num_repeats*loops/end_timer());
+ fflush(stdout);
+ }
+ }
+
+ ret = ctdb_call(ctdb, key, FUNC_FETCH, NULL, &data);
+ if (ret == -1) {
+ printf("ctdb_call FUNC_FETCH failed - %s\n", ctdb_errstr(ctdb));
+ return;
+ }
+
+ printf("Incr: %.2f ops/sec (loops=%d val=%d)\n",
+ num_repeats*loops/end_timer(), loops, *(uint32_t *)data.dptr);
+}
+
+/*
+ main program
+*/
+int main(int argc, const char *argv[])
+{
+ struct ctdb_context *ctdb;
+ const char *nlist = NULL;
+ const char *transport = "tcp";
+ const char *myaddress = NULL;
+ int self_connect=0;
+
+ struct poptOption popt_options[] = {
+ POPT_AUTOHELP
+ { "nlist", 0, POPT_ARG_STRING, &nlist, 0, "node list file", "filename" },
+ { "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" },
+ { "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL },
+ { "self-connect", 0, POPT_ARG_NONE, &self_connect, 0, "enable self connect", "boolean" },
+ { "timelimit", 't', POPT_ARG_INT, &timelimit, 0, "timelimit", "integer" },
+ { "num-records", 'r', POPT_ARG_INT, &num_records, 0, "num_records", "integer" },
+ POPT_TABLEEND
+ };
+ int opt;
+ const char **extra_argv;
+ int extra_argc = 0;
+ int ret;
+ poptContext pc;
+ struct event_context *ev;
+
+ pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
+
+ while ((opt = poptGetNextOpt(pc)) != -1) {
+ switch (opt) {
+ default:
+ fprintf(stderr, "Invalid option %s: %s\n",
+ poptBadOption(pc, 0), poptStrerror(opt));
+ exit(1);
+ }
+ }
+
+ /* setup the remaining options for the main program to use */
+ extra_argv = poptGetArgs(pc);
+ if (extra_argv) {
+ extra_argv++;
+ while (extra_argv[extra_argc]) extra_argc++;
+ }
+
+ if (nlist == NULL || myaddress == NULL) {
+ printf("You must provide a node list with --nlist and an address with --listen\n");
+ exit(1);
+ }
+
+ ev = event_context_init(NULL);
+
+ /* initialise ctdb */
+ ctdb = ctdb_init(ev);
+ if (ctdb == NULL) {
+ printf("Failed to init ctdb\n");
+ exit(1);
+ }
+
+ if (self_connect) {
+ ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT);
+ }
+
+ ret = ctdb_set_transport(ctdb, transport);
+ if (ret == -1) {
+ printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb));
+ exit(1);
+ }
+
+ /* tell ctdb what address to listen on */
+ ret = ctdb_set_address(ctdb, myaddress);
+ if (ret == -1) {
+ printf("ctdb_set_address failed - %s\n", ctdb_errstr(ctdb));
+ exit(1);
+ }
+
+ /* tell ctdb what nodes are available */
+ ret = ctdb_set_nlist(ctdb, nlist);
+ if (ret == -1) {
+ printf("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb));
+ exit(1);
+ }
+
+ /* setup a ctdb call function */
+ ret = ctdb_set_call(ctdb, incr_func, FUNC_INCR);
+ ret = ctdb_set_call(ctdb, fetch_func, FUNC_FETCH);
+
+ /* attach to a specific database */
+ ret = ctdb_attach(ctdb, "test.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666);
+ if (ret == -1) {
+ printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
+ exit(1);
+ }
+
+ /* start the protocol running */
+ ret = ctdb_start(ctdb);
+
+ /* wait until all nodes are connected (should not be needed
+ outside of test code) */
+ ctdb_connect_wait(ctdb);
+
+ bench_incr(ctdb);
+
+ /* go into a wait loop to allow other nodes to complete */
+ ctdb_wait_loop(ctdb);
+
+ /* shut it down */
+ talloc_free(ctdb);
+ return 0;
+}
diff --git a/source4/cluster/ctdb/tests/ctdb_test.c b/source4/cluster/ctdb/tests/ctdb_test.c
new file mode 100644
index 0000000000..5bc35ad332
--- /dev/null
+++ b/source4/cluster/ctdb/tests/ctdb_test.c
@@ -0,0 +1,207 @@
+/*
+ ctdb test harness
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/filesys.h"
+#include "popt.h"
+
+enum my_functions {FUNC_SORT=1, FUNC_FETCH=2};
+
+static int int_compare(int *i1, int *i2)
+{
+ return *i1 - *i2;
+}
+
+/*
+ add an integer into a record in sorted order
+*/
+static int sort_func(struct ctdb_call *call)
+{
+ if (call->call_data == NULL ||
+ call->call_data->dsize != sizeof(int)) {
+ return CTDB_ERR_INVALID;
+ }
+ call->new_data = talloc(call, TDB_DATA);
+ if (call->new_data == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+ call->new_data->dptr = talloc_size(call,
+ call->record_data.dsize +
+ call->call_data->dsize);
+ if (call->new_data->dptr == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+ call->new_data->dsize = call->record_data.dsize + call->call_data->dsize;
+ memcpy(call->new_data->dptr,
+ call->record_data.dptr, call->record_data.dsize);
+ memcpy(call->new_data->dptr+call->record_data.dsize,
+ call->call_data->dptr, call->call_data->dsize);
+
+ qsort(call->new_data->dptr, call->new_data->dsize / sizeof(int),
+ sizeof(int), (comparison_fn_t)int_compare);
+
+ return 0;
+}
+
+/*
+ ctdb call function to fetch a record
+*/
+static int fetch_func(struct ctdb_call *call)
+{
+ call->reply_data = &call->record_data;
+ return 0;
+}
+
+/*
+ main program
+*/
+int main(int argc, const char *argv[])
+{
+ struct ctdb_context *ctdb;
+ const char *nlist = NULL;
+ const char *transport = "tcp";
+ const char *myaddress = NULL;
+ int self_connect=0;
+
+ struct poptOption popt_options[] = {
+ POPT_AUTOHELP
+ { "nlist", 0, POPT_ARG_STRING, &nlist, 0, "node list file", "filename" },
+ { "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" },
+ { "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL },
+ { "self-connect", 0, POPT_ARG_NONE, &self_connect, 0, "enable self connect", "boolean" },
+ POPT_TABLEEND
+ };
+ int opt;
+ const char **extra_argv;
+ int extra_argc = 0;
+ int i, ret;
+ TDB_DATA key, data;
+ poptContext pc;
+ struct event_context *ev;
+
+ pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
+
+ while ((opt = poptGetNextOpt(pc)) != -1) {
+ switch (opt) {
+ default:
+ fprintf(stderr, "Invalid option %s: %s\n",
+ poptBadOption(pc, 0), poptStrerror(opt));
+ exit(1);
+ }
+ }
+
+ /* setup the remaining options for the main program to use */
+ extra_argv = poptGetArgs(pc);
+ if (extra_argv) {
+ extra_argv++;
+ while (extra_argv[extra_argc]) extra_argc++;
+ }
+
+ if (nlist == NULL || myaddress == NULL) {
+ printf("You must provide a node list with --nlist and an address with --listen\n");
+ exit(1);
+ }
+
+ ev = event_context_init(NULL);
+
+ /* initialise ctdb */
+ ctdb = ctdb_init(ev);
+ if (ctdb == NULL) {
+ printf("Failed to init ctdb\n");
+ exit(1);
+ }
+
+ if (self_connect) {
+ ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT);
+ }
+
+ ret = ctdb_set_transport(ctdb, transport);
+ if (ret == -1) {
+ printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb));
+ exit(1);
+ }
+
+ /* tell ctdb what address to listen on */
+ ret = ctdb_set_address(ctdb, myaddress);
+ if (ret == -1) {
+ printf("ctdb_set_address failed - %s\n", ctdb_errstr(ctdb));
+ exit(1);
+ }
+
+ /* tell ctdb what nodes are available */
+ ret = ctdb_set_nlist(ctdb, nlist);
+ if (ret == -1) {
+ printf("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb));
+ exit(1);
+ }
+
+ /* setup a ctdb call function */
+ ret = ctdb_set_call(ctdb, sort_func, FUNC_SORT);
+ ret = ctdb_set_call(ctdb, fetch_func, FUNC_FETCH);
+
+ /* attach to a specific database */
+ ret = ctdb_attach(ctdb, "test.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666);
+ if (ret == -1) {
+ printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
+ exit(1);
+ }
+
+ /* start the protocol running */
+ ret = ctdb_start(ctdb);
+
+ /* wait until all nodes are connected (should not be needed
+ outide of test code) */
+ ctdb_connect_wait(ctdb);
+
+ key.dptr = "test";
+ key.dsize = strlen("test")+1;
+
+ /* add some random data */
+ for (i=0;i<10;i++) {
+ int v = random() % 1000;
+ data.dptr = (uint8_t *)&v;
+ data.dsize = sizeof(v);
+ ret = ctdb_call(ctdb, key, FUNC_SORT, &data, NULL);
+ if (ret == -1) {
+ printf("ctdb_call FUNC_SORT failed - %s\n", ctdb_errstr(ctdb));
+ exit(1);
+ }
+ }
+
+ /* fetch the record */
+ ret = ctdb_call(ctdb, key, FUNC_FETCH, NULL, &data);
+ if (ret == -1) {
+ printf("ctdb_call FUNC_FETCH failed - %s\n", ctdb_errstr(ctdb));
+ exit(1);
+ }
+
+ for (i=0;i<data.dsize/sizeof(int);i++) {
+ printf("%3d\n", ((int *)data.dptr)[i]);
+ }
+ talloc_free(data.dptr);
+
+ /* go into a wait loop to allow other nodes to complete */
+ ctdb_wait_loop(ctdb);
+
+ /* shut it down */
+ talloc_free(ctdb);
+ return 0;
+}