diff options
author | Andrew Tridgell <tridge@samba.org> | 2007-01-19 03:54:48 +0000 |
---|---|---|
committer | Gerald (Jerry) Carter <jerry@samba.org> | 2007-10-10 14:43:46 -0500 |
commit | 5cb78383fafa15c2ff7a4ccd194cccd5cf5cd263 (patch) | |
tree | 050e2f47faf234685cd7f20ab7e4f37e6521f7a2 /source4/cluster | |
parent | 8c3d15f6caa3f1ffda86755fa9b7ff9602cbb022 (diff) | |
download | samba-5cb78383fafa15c2ff7a4ccd194cccd5cf5cd263.tar.gz samba-5cb78383fafa15c2ff7a4ccd194cccd5cf5cd263.tar.bz2 samba-5cb78383fafa15c2ff7a4ccd194cccd5cf5cd263.zip |
r20889: import ctdb cluster backend from bzr
it will be interesting to see how the build farm handles this
(This used to be commit 53be449630bd67d649a9e70cc7e25a9799c0616b)
Diffstat (limited to 'source4/cluster')
-rw-r--r-- | source4/cluster/config.mk | 5 | ||||
-rw-r--r-- | source4/cluster/ctdb/brlock_ctdb.c | 971 | ||||
-rw-r--r-- | source4/cluster/ctdb/common/ctdb.c | 287 | ||||
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_call.c | 653 | ||||
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_ltdb.c | 139 | ||||
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_util.c | 103 | ||||
-rw-r--r-- | source4/cluster/ctdb/config.mk | 24 | ||||
-rw-r--r-- | source4/cluster/ctdb/ctdb_cluster.c | 138 | ||||
-rw-r--r-- | source4/cluster/ctdb/ctdb_cluster.h | 23 | ||||
-rw-r--r-- | source4/cluster/ctdb/include/ctdb.h | 117 | ||||
-rw-r--r-- | source4/cluster/ctdb/include/ctdb_private.h | 216 | ||||
-rw-r--r-- | source4/cluster/ctdb/tcp/ctdb_tcp.h | 76 | ||||
-rw-r--r-- | source4/cluster/ctdb/tcp/tcp_connect.c | 191 | ||||
-rw-r--r-- | source4/cluster/ctdb/tcp/tcp_init.c | 102 | ||||
-rw-r--r-- | source4/cluster/ctdb/tcp/tcp_io.c | 254 | ||||
-rw-r--r-- | source4/cluster/ctdb/tests/ctdb_bench.c | 228 | ||||
-rw-r--r-- | source4/cluster/ctdb/tests/ctdb_test.c | 207 |
17 files changed, 3733 insertions, 1 deletions
diff --git a/source4/cluster/config.mk b/source4/cluster/config.mk index 934bc55252..c5c2ea970a 100644 --- a/source4/cluster/config.mk +++ b/source4/cluster/config.mk @@ -1,4 +1,7 @@ +include ctdb/config.mk #################### [SUBSYSTEM::CLUSTER] -OBJ_FILES = cluster.o +OBJ_FILES = cluster.o \ + local.o +PRIVATE_DEPENDENCIES = ctdb diff --git a/source4/cluster/ctdb/brlock_ctdb.c b/source4/cluster/ctdb/brlock_ctdb.c new file mode 100644 index 0000000000..bcfa566b76 --- /dev/null +++ b/source4/cluster/ctdb/brlock_ctdb.c @@ -0,0 +1,971 @@ +/* + Unix SMB/CIFS implementation. + + generic byte range locking code - ctdb backend + + Copyright (C) Andrew Tridgell 2006 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +*/ + +#include "includes.h" +#include "system/filesys.h" +#include "lib/tdb/include/tdb.h" +#include "messaging/messaging.h" +#include "db_wrap.h" +#include "lib/messaging/irpc.h" +#include "libcli/libcli.h" +#include "cluster/cluster.h" +#include "ntvfs/common/brlock.h" +#include "cluster/ctdb/include/ctdb.h" + +enum my_functions {FUNC_BRL_LOCK=1, FUNC_BRL_UNLOCK=2, + FUNC_BRL_REMOVE_PENDING=3, FUNC_BRL_LOCKTEST=4, + FUNC_BRL_CLOSE=5}; + +/* + in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies + a file. For a local posix filesystem this will usually be a combination + of the device and inode numbers of the file, but it can be anything + that uniquely idetifies a file for locking purposes, as long + as it is applied consistently. +*/ + +/* this struct is typically attached to tcon */ +struct brl_context { + struct ctdb_context *ctdb; + struct server_id server; + struct messaging_context *messaging_ctx; +}; + +/* + the lock context contains the elements that define whether one + lock is the same as another lock +*/ +struct lock_context { + struct server_id server; + uint16_t smbpid; + struct brl_context *ctx; +}; + +/* The data in brlock records is an unsorted linear array of these + records. It is unnecessary to store the count as tdb provides the + size of the record */ +struct lock_struct { + struct lock_context context; + struct ntvfs_handle *ntvfs; + uint64_t start; + uint64_t size; + enum brl_type lock_type; + void *notify_ptr; +}; + +/* this struct is attached to on open file handle */ +struct brl_handle { + DATA_BLOB key; + struct ntvfs_handle *ntvfs; + struct lock_struct last_lock; +}; + +/* + Open up the brlock.tdb database. Close it down using + talloc_free(). We need the messaging_ctx to allow for + pending lock notifications. +*/ +static struct brl_context *brl_ctdb_init(TALLOC_CTX *mem_ctx, struct server_id server, + struct messaging_context *messaging_ctx) +{ + struct ctdb_context *ctdb = talloc_get_type(cluster_private(), struct ctdb_context); + struct brl_context *brl; + + brl = talloc(mem_ctx, struct brl_context); + if (brl == NULL) { + return NULL; + } + + brl->ctdb = ctdb; + brl->server = server; + brl->messaging_ctx = messaging_ctx; + + DEBUG(0,("brl_ctdb_init: brl=%p\n", brl)); + + return brl; +} + +static struct brl_handle *brl_ctdb_create_handle(TALLOC_CTX *mem_ctx, struct ntvfs_handle *ntvfs, + DATA_BLOB *file_key) +{ + struct brl_handle *brlh; + + brlh = talloc(mem_ctx, struct brl_handle); + if (brlh == NULL) { + return NULL; + } + + brlh->key = *file_key; + brlh->ntvfs = ntvfs; + ZERO_STRUCT(brlh->last_lock); + + return brlh; +} + +/* + see if two locking contexts are equal +*/ +static BOOL brl_ctdb_same_context(struct lock_context *ctx1, struct lock_context *ctx2) +{ + return (cluster_id_equal(&ctx1->server, &ctx2->server) && + ctx1->smbpid == ctx2->smbpid && + ctx1->ctx == ctx2->ctx); +} + +/* + see if lck1 and lck2 overlap +*/ +static BOOL brl_ctdb_overlap(struct lock_struct *lck1, + struct lock_struct *lck2) +{ + /* this extra check is not redundent - it copes with locks + that go beyond the end of 64 bit file space */ + if (lck1->size != 0 && + lck1->start == lck2->start && + lck1->size == lck2->size) { + return True; + } + + if (lck1->start >= (lck2->start+lck2->size) || + lck2->start >= (lck1->start+lck1->size)) { + return False; + } + return True; +} + +/* + See if lock2 can be added when lock1 is in place. +*/ +static BOOL brl_ctdb_conflict(struct lock_struct *lck1, + struct lock_struct *lck2) +{ + /* pending locks don't conflict with anything */ + if (lck1->lock_type >= PENDING_READ_LOCK || + lck2->lock_type >= PENDING_READ_LOCK) { + return False; + } + + if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) { + return False; + } + + if (brl_ctdb_same_context(&lck1->context, &lck2->context) && + lck2->lock_type == READ_LOCK && lck1->ntvfs == lck2->ntvfs) { + return False; + } + + return brl_ctdb_overlap(lck1, lck2); +} + + +/* + Check to see if this lock conflicts, but ignore our own locks on the + same fnum only. +*/ +static BOOL brl_ctdb_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2) +{ + /* pending locks don't conflict with anything */ + if (lck1->lock_type >= PENDING_READ_LOCK || + lck2->lock_type >= PENDING_READ_LOCK) { + return False; + } + + if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) + return False; + + /* + * note that incoming write calls conflict with existing READ + * locks even if the context is the same. JRA. See LOCKTEST7 + * in smbtorture. + */ + if (brl_ctdb_same_context(&lck1->context, &lck2->context) && + lck1->ntvfs == lck2->ntvfs && + (lck2->lock_type == READ_LOCK || lck1->lock_type == WRITE_LOCK)) { + return False; + } + + return brl_ctdb_overlap(lck1, lck2); +} + + +/* + amazingly enough, w2k3 "remembers" whether the last lock failure + is the same as this one and changes its error code. I wonder if any + app depends on this? +*/ +static NTSTATUS brl_ctdb_lock_failed(struct brl_handle *brlh, struct lock_struct *lock) +{ + /* + * this function is only called for non pending lock! + */ + + /* + * if the notify_ptr is non NULL, + * it means that we're at the end of a pending lock + * and the real lock is requested after the timeout went by + * In this case we need to remember the last_lock and always + * give FILE_LOCK_CONFLICT + */ + if (lock->notify_ptr) { + brlh->last_lock = *lock; + return NT_STATUS_FILE_LOCK_CONFLICT; + } + + /* + * amazing the little things you learn with a test + * suite. Locks beyond this offset (as a 64 bit + * number!) always generate the conflict error code, + * unless the top bit is set + */ + if (lock->start >= 0xEF000000 && (lock->start >> 63) == 0) { + brlh->last_lock = *lock; + return NT_STATUS_FILE_LOCK_CONFLICT; + } + + /* + * if the current lock matches the last failed lock on the file handle + * and starts at the same offset, then FILE_LOCK_CONFLICT should be returned + */ + if (cluster_id_equal(&lock->context.server, &brlh->last_lock.context.server) && + lock->context.ctx == brlh->last_lock.context.ctx && + lock->ntvfs == brlh->last_lock.ntvfs && + lock->start == brlh->last_lock.start) { + return NT_STATUS_FILE_LOCK_CONFLICT; + } + + brlh->last_lock = *lock; + return NT_STATUS_LOCK_NOT_GRANTED; +} + + +static void show_locks(const char *op, struct lock_struct *locks, int count) +{ + int i; + DEBUG(0,("OP: %s\n", op)); + for (i=0;i<count;i++) { + DEBUG(0,("%2d: %4d %4d %d.%d.%d %p %p\n", + i, (int)locks[i].start, (int)locks[i].size, + locks[i].context.server.node, + locks[i].context.server.id, + locks[i].context.smbpid, + locks[i].context.ctx, + locks[i].ntvfs)); + } +} + + +struct ctdb_lock_req { + uint16_t smbpid; + uint64_t start; + uint64_t size; + enum brl_type lock_type; + void *notify_ptr; + struct server_id server; + struct brl_context *brl; + struct ntvfs_handle *ntvfs; +}; + +/* + ctdb call handling brl_lock() +*/ +static int brl_ctdb_lock_func(struct ctdb_call *call) +{ + struct ctdb_lock_req *req = (struct ctdb_lock_req *)call->call_data->dptr; + TDB_DATA dbuf; + int count=0, i; + struct lock_struct lock, *locks=NULL; + NTSTATUS status = NT_STATUS_OK; + +#if 0 + /* if this is a pending lock, then with the chainlock held we + try to get the real lock. If we succeed then we don't need + to make it pending. This prevents a possible race condition + where the pending lock gets created after the lock that is + preventing the real lock gets removed */ + if (lock_type >= PENDING_READ_LOCK) { + enum brl_type rw = (lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK); + + /* here we need to force that the last_lock isn't overwritten */ + lock = brlh->last_lock; + status = brl_ctdb_lock(brl, brlh, smbpid, start, size, rw, NULL); + brlh->last_lock = lock; + + if (NT_STATUS_IS_OK(status)) { + tdb_chainunlock(brl->w->tdb, kbuf); + return NT_STATUS_OK; + } + } +#endif + + dbuf = call->record_data; + + ZERO_STRUCT(lock); + lock.context.smbpid = req->smbpid; + lock.context.server = req->server; + lock.context.ctx = req->brl; + lock.ntvfs = req->ntvfs; + lock.start = req->start; + lock.size = req->size; + lock.lock_type = req->lock_type; + lock.notify_ptr = req->notify_ptr; + + { + int xlen = sizeof(lock); + uint8_t *xx = &lock; + int ii, fd = open("/dev/null", O_WRONLY); + for (ii=0;ii<xlen;ii++) { + write(fd, &xx[ii], 1); + } + close(fd); + } + + if (dbuf.dptr) { + /* there are existing locks - make sure they don't conflict */ + locks = (struct lock_struct *)dbuf.dptr; + count = dbuf.dsize / sizeof(*locks); + + show_locks("lock", locks, count); + + for (i=0; i<count; i++) { + if (brl_ctdb_conflict(&locks[i], &lock)) { + status = NT_STATUS_LOCK_NOT_GRANTED; + goto reply; + } + } + } + + call->new_data = talloc(call, TDB_DATA); + if (call->new_data == NULL) { + return CTDB_ERR_NOMEM; + } + + call->new_data->dptr = talloc_size(call, dbuf.dsize + sizeof(lock)); + if (call->new_data->dptr == NULL) { + return CTDB_ERR_NOMEM; + } + memcpy(call->new_data->dptr, locks, dbuf.dsize); + memcpy(call->new_data->dptr+dbuf.dsize, &lock, sizeof(lock)); + call->new_data->dsize = dbuf.dsize + sizeof(lock); + + if (req->lock_type >= PENDING_READ_LOCK) { + status = NT_STATUS_LOCK_NOT_GRANTED; + } + + DEBUG(0,("lock: size now %d\n", call->new_data->dsize)); + +reply: + call->reply_data = talloc(call, TDB_DATA); + if (call->reply_data == NULL) { + return CTDB_ERR_NOMEM; + } + + call->reply_data->dptr = talloc_size(call, sizeof(NTSTATUS)); + call->reply_data->dsize = sizeof(NTSTATUS); + if (call->reply_data->dptr == NULL) { + return CTDB_ERR_NOMEM; + } + *(NTSTATUS *)call->reply_data->dptr = status; + + return 0; +} + + +/* + Lock a range of bytes. The lock_type can be a PENDING_*_LOCK, in + which case a real lock is first tried, and if that fails then a + pending lock is created. When the pending lock is triggered (by + someone else closing an overlapping lock range) a messaging + notification is sent, identified by the notify_ptr +*/ +static NTSTATUS brl_ctdb_lock(struct brl_context *brl, + struct brl_handle *brlh, + uint16_t smbpid, + uint64_t start, uint64_t size, + enum brl_type lock_type, + void *notify_ptr) +{ + TDB_DATA kbuf, rbuf, sbuf; + struct ctdb_lock_req req; + NTSTATUS status; + int ret; + + kbuf.dptr = brlh->key.data; + kbuf.dsize = brlh->key.length; + + rbuf.dptr = (uint8_t *)&req; + rbuf.dsize = sizeof(req); + + ZERO_STRUCT(req); + req.smbpid = smbpid; + req.start = start; + req.size = size; + req.lock_type = lock_type; + req.notify_ptr = notify_ptr; + req.server = brl->server; + req.brl = brl; + req.ntvfs = brlh->ntvfs; + + ret = ctdb_call(brl->ctdb, kbuf, FUNC_BRL_LOCK, &rbuf, &sbuf); + if (ret == -1) { + DEBUG(0,("ctdb_call failed - %s\n", __location__)); + return NT_STATUS_INTERNAL_DB_CORRUPTION; + } + + status = *(NTSTATUS *)sbuf.dptr; + talloc_free(sbuf.dptr); + + return status; +} + +#if 0 +/* + we are removing a lock that might be holding up a pending lock. Scan for pending + locks that cover this range and if we find any then notify the server that it should + retry the lock +*/ +static void brl_ctdb_notify_unlock(struct brl_context *brl, + struct lock_struct *locks, int count, + struct lock_struct *removed_lock) +{ + int i, last_notice; + + /* the last_notice logic is to prevent stampeding on a lock + range. It prevents us sending hundreds of notifies on the + same range of bytes. It doesn't prevent all possible + stampedes, but it does prevent the most common problem */ + last_notice = -1; + + for (i=0;i<count;i++) { + if (locks[i].lock_type >= PENDING_READ_LOCK && + brl_ctdb_overlap(&locks[i], removed_lock)) { + if (last_notice != -1 && brl_ctdb_overlap(&locks[i], &locks[last_notice])) { + continue; + } + if (locks[i].lock_type == PENDING_WRITE_LOCK) { + last_notice = i; + } + messaging_send_ptr(brl->messaging_ctx, locks[i].context.server, + MSG_BRL_RETRY, locks[i].notify_ptr); + } + } +} +#endif + +/* + send notifications for all pending locks - the file is being closed by this + user +*/ +static void brl_ctdb_notify_all(struct brl_context *brl, + struct lock_struct *locks, int count) +{ + int i; + for (i=0;i<count;i++) { + if (locks->lock_type >= PENDING_READ_LOCK) { +// brl_ctdb_notify_unlock(brl, locks, count, &locks[i]); + } + } +} + +struct ctdb_unlock_req { + uint16_t smbpid; + uint64_t start; + uint64_t size; + struct server_id server; + struct brl_context *brl; + struct ntvfs_handle *ntvfs; +}; + +/* + Unlock a range of bytes. +*/ +static int brl_ctdb_unlock_func(struct ctdb_call *call) +{ + struct ctdb_unlock_req *req = (struct ctdb_unlock_req *)call->call_data->dptr; + TDB_DATA dbuf; + int count, i; + struct lock_struct *locks; + struct lock_context context; + NTSTATUS status = NT_STATUS_OK; + + dbuf = call->record_data; + + context.smbpid = req->smbpid; + context.server = req->server; + context.ctx = req->brl; + + /* there are existing locks - find a match */ + locks = (struct lock_struct *)dbuf.dptr; + count = dbuf.dsize / sizeof(*locks); + + show_locks("unlock", locks, count); + + for (i=0; i<count; i++) { + struct lock_struct *lock = &locks[i]; + + if (brl_ctdb_same_context(&lock->context, &context) && + lock->ntvfs == req->ntvfs && + lock->start == req->start && + lock->size == req->size && + lock->lock_type < PENDING_READ_LOCK) { +// struct lock_struct removed_lock = *lock; + + call->new_data = talloc(call, TDB_DATA); + if (call->new_data == NULL) { + return CTDB_ERR_NOMEM; + } + + call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(lock)); + if (call->new_data->dptr == NULL) { + return CTDB_ERR_NOMEM; + } + call->new_data->dsize = dbuf.dsize - sizeof(lock); + + memcpy(call->new_data->dptr, locks, i*sizeof(lock)); + memcpy(call->new_data->dptr+i*sizeof(lock), locks+i+1, + (count-(i+1))*sizeof(lock)); + + if (count > 1) { + /* send notifications for any relevant pending locks */ +// brl_ctdb_notify_unlock(req->brl, locks, count, &removed_lock); + } + break; + } + } + + if (call->new_data) { + DEBUG(0,("unlock: size now %d\n", call->new_data->dsize)); + } + + if (i == count) { + /* we didn't find it */ + status = NT_STATUS_RANGE_NOT_LOCKED; + } + + call->reply_data = talloc(call, TDB_DATA); + if (call->reply_data == NULL) { + return CTDB_ERR_NOMEM; + } + + call->reply_data->dptr = talloc_size(call, sizeof(NTSTATUS)); + call->reply_data->dsize = sizeof(NTSTATUS); + if (call->reply_data->dptr == NULL) { + return CTDB_ERR_NOMEM; + } + *(NTSTATUS *)call->reply_data->dptr = status; + + return 0; +} + + +/* + Unlock a range of bytes. +*/ +static NTSTATUS brl_ctdb_unlock(struct brl_context *brl, + struct brl_handle *brlh, + uint16_t smbpid, + uint64_t start, uint64_t size) +{ + TDB_DATA kbuf, rbuf, sbuf; + struct ctdb_unlock_req req; + NTSTATUS status; + int ret; + + kbuf.dptr = brlh->key.data; + kbuf.dsize = brlh->key.length; + + rbuf.dptr = (uint8_t *)&req; + rbuf.dsize = sizeof(req); + + ZERO_STRUCT(req); + req.smbpid = smbpid; + req.start = start; + req.size = size; + req.server = brl->server; + req.brl = brl; + req.ntvfs = brlh->ntvfs; + + ret = ctdb_call(brl->ctdb, kbuf, FUNC_BRL_UNLOCK, &rbuf, &sbuf); + if (ret == -1) { + DEBUG(0,("ctdb_call failed - %s\n", __location__)); + return NT_STATUS_INTERNAL_DB_CORRUPTION; + } + + status = *(NTSTATUS *)sbuf.dptr; + talloc_free(sbuf.dptr); + + return status; +} + + +struct ctdb_remove_pending_req { + struct server_id server; + void *notify_ptr; +}; + +/* + remove a pending lock. This is called when the caller has either + given up trying to establish a lock or when they have succeeded in + getting it. In either case they no longer need to be notified. +*/ +static int brl_ctdb_remove_pending_func(struct ctdb_call *call) +{ + struct ctdb_remove_pending_req *req = (struct ctdb_remove_pending_req *)call->call_data->dptr; + TDB_DATA dbuf; + int count, i; + struct lock_struct *locks; + NTSTATUS status = NT_STATUS_OK; + + dbuf = call->record_data; + + /* there are existing locks - find a match */ + locks = (struct lock_struct *)dbuf.dptr; + count = dbuf.dsize / sizeof(*locks); + + show_locks("remove_pending", locks, count); + + for (i=0; i<count; i++) { + struct lock_struct *lock = &locks[i]; + + if (lock->lock_type >= PENDING_READ_LOCK && + lock->notify_ptr == req->notify_ptr && + cluster_id_equal(&lock->context.server, &req->server)) { + call->new_data = talloc(call, TDB_DATA); + if (call->new_data == NULL) { + return CTDB_ERR_NOMEM; + } + + call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(lock)); + if (call->new_data->dptr == NULL) { + return CTDB_ERR_NOMEM; + } + call->new_data->dsize = dbuf.dsize - sizeof(lock); + + memcpy(call->new_data->dptr, locks, i*sizeof(lock)); + memcpy(call->new_data->dptr+i*sizeof(lock), locks+i+1, + (count-(i+1))*sizeof(lock)); + break; + } + } + + if (call->new_data) { + DEBUG(0,("remove_pending: size now %d\n", call->new_data->dsize)); + } + + if (i == count) { + /* we didn't find it */ + status = NT_STATUS_RANGE_NOT_LOCKED; + } + + call->reply_data = talloc(call, TDB_DATA); + if (call->reply_data == NULL) { + return CTDB_ERR_NOMEM; + } + + call->reply_data->dptr = talloc_size(call, sizeof(NTSTATUS)); + call->reply_data->dsize = sizeof(NTSTATUS); + if (call->reply_data->dptr == NULL) { + return CTDB_ERR_NOMEM; + } + *(NTSTATUS *)call->reply_data->dptr = status; + + return 0; +} + +static NTSTATUS brl_ctdb_remove_pending(struct brl_context *brl, + struct brl_handle *brlh, + void *notify_ptr) +{ + TDB_DATA kbuf, rbuf, sbuf; + struct ctdb_remove_pending_req req; + NTSTATUS status; + int ret; + + kbuf.dptr = brlh->key.data; + kbuf.dsize = brlh->key.length; + + rbuf.dptr = (uint8_t *)&req; + rbuf.dsize = sizeof(req); + + ZERO_STRUCT(req); + req.notify_ptr = notify_ptr; + req.server = brl->server; + + ret = ctdb_call(brl->ctdb, kbuf, FUNC_BRL_REMOVE_PENDING, &rbuf, &sbuf); + if (ret == -1) { + DEBUG(0,("ctdb_call failed - %s\n", __location__)); + return NT_STATUS_INTERNAL_DB_CORRUPTION; + } + + status = *(NTSTATUS *)sbuf.dptr; + talloc_free(sbuf.dptr); + + return status; +} + + +struct ctdb_locktest_req { + uint16_t smbpid; + uint64_t start; + uint64_t size; + enum brl_type lock_type; + struct brl_context *brl; + struct server_id server; + struct ntvfs_handle *ntvfs; +}; + +/* + remove a pending lock. This is called when the caller has either + given up trying to establish a lock or when they have succeeded in + getting it. In either case they no longer need to be notified. +*/ +static int brl_ctdb_locktest_func(struct ctdb_call *call) +{ + struct ctdb_locktest_req *req = (struct ctdb_locktest_req *)call->call_data->dptr; + TDB_DATA dbuf; + int count, i; + struct lock_struct *locks, lock; + NTSTATUS status = NT_STATUS_OK; + + lock.context.smbpid = req->smbpid; + lock.context.server = req->server; + lock.context.ctx = req->brl; + lock.ntvfs = req->ntvfs; + lock.start = req->start; + lock.size = req->size; + lock.lock_type = req->lock_type; + + dbuf = call->record_data; + + /* there are existing locks - find a match */ + locks = (struct lock_struct *)dbuf.dptr; + count = dbuf.dsize / sizeof(*locks); + + show_locks("locktest", locks, count); + + for (i=0; i<count; i++) { + if (brl_ctdb_conflict_other(&locks[i], &lock)) { + status = NT_STATUS_FILE_LOCK_CONFLICT; + break; + } + } + + call->reply_data = talloc(call, TDB_DATA); + if (call->reply_data == NULL) { + return CTDB_ERR_NOMEM; + } + + call->reply_data->dptr = talloc_size(call, sizeof(NTSTATUS)); + call->reply_data->dsize = sizeof(NTSTATUS); + if (call->reply_data->dptr == NULL) { + return CTDB_ERR_NOMEM; + } + *(NTSTATUS *)call->reply_data->dptr = status; + + return 0; +} + +/* + Test if we are allowed to perform IO on a region of an open file +*/ +static NTSTATUS brl_ctdb_locktest(struct brl_context *brl, + struct brl_handle *brlh, + uint16_t smbpid, + uint64_t start, uint64_t size, + enum brl_type lock_type) +{ + TDB_DATA kbuf, rbuf, sbuf; + struct ctdb_locktest_req req; + NTSTATUS status; + int ret; + + kbuf.dptr = brlh->key.data; + kbuf.dsize = brlh->key.length; + + rbuf.dptr = (uint8_t *)&req; + rbuf.dsize = sizeof(req); + + ZERO_STRUCT(req); + req.smbpid = smbpid; + req.start = start; + req.size = size; + req.lock_type = lock_type; + req.server = brl->server; + req.brl = brl; + req.ntvfs = brlh->ntvfs; + + ret = ctdb_call(brl->ctdb, kbuf, FUNC_BRL_LOCKTEST, &rbuf, &sbuf); + if (ret == -1) { + DEBUG(0,("ctdb_call failed - %s\n", __location__)); + return NT_STATUS_INTERNAL_DB_CORRUPTION; + } + + status = *(NTSTATUS *)sbuf.dptr; + talloc_free(sbuf.dptr); + + return status; +} + + +struct ctdb_close_req { + struct brl_context *brl; + struct server_id server; + struct ntvfs_handle *ntvfs; +}; + +/* + remove a pending lock. This is called when the caller has either + given up trying to establish a lock or when they have succeeded in + getting it. In either case they no longer need to be notified. +*/ +static int brl_ctdb_close_func(struct ctdb_call *call) +{ + struct ctdb_close_req *req = (struct ctdb_close_req *)call->call_data->dptr; + TDB_DATA dbuf; + int count, dcount=0, i; + struct lock_struct *locks; + NTSTATUS status = NT_STATUS_OK; + + dbuf = call->record_data; + + /* there are existing locks - find a match */ + locks = (struct lock_struct *)dbuf.dptr; + count = dbuf.dsize / sizeof(*locks); + + show_locks("close", locks, count); + + DEBUG(0,("closing ctx=%p server=%d.%d ntvfs=%p\n", + req->brl, req->server.node, req->server.id, req->ntvfs)); + + for (i=0; i<count; i++) { + struct lock_struct *lock = &locks[i]; + + if (lock->context.ctx == req->brl && + cluster_id_equal(&lock->context.server, &req->server) && + lock->ntvfs == req->ntvfs) { + /* found it - delete it */ + if (count > 1 && i < count-1) { + memmove(&locks[i], &locks[i+1], + sizeof(*locks)*((count-1) - i)); + } + count--; + i--; + dcount++; + } + } + + if (dcount > 0) { + call->new_data = talloc(call, TDB_DATA); + if (call->new_data == NULL) { + return CTDB_ERR_NOMEM; + } + + call->new_data->dptr = talloc_size(call, count*sizeof(struct lock_struct)); + if (call->new_data->dptr == NULL) { + return CTDB_ERR_NOMEM; + } + call->new_data->dsize = count*sizeof(struct lock_struct); + + memcpy(call->new_data->dptr, locks, count*sizeof(struct lock_struct)); + } + + if (call->new_data) { + DEBUG(0,("close: size now %d\n", call->new_data->dsize)); + } + + DEBUG(0,("brl_ctdb_close_func dcount=%d count=%d\n", dcount, count)); + + call->reply_data = talloc(call, TDB_DATA); + if (call->reply_data == NULL) { + return CTDB_ERR_NOMEM; + } + + call->reply_data->dptr = talloc_size(call, sizeof(NTSTATUS)); + call->reply_data->dsize = sizeof(NTSTATUS); + if (call->reply_data->dptr == NULL) { + return CTDB_ERR_NOMEM; + } + *(NTSTATUS *)call->reply_data->dptr = status; + + return 0; +} + +/* + Test if we are allowed to perform IO on a region of an open file +*/ +static NTSTATUS brl_ctdb_close(struct brl_context *brl, + struct brl_handle *brlh) +{ + TDB_DATA kbuf, rbuf, sbuf; + struct ctdb_close_req req; + NTSTATUS status; + int ret; + + kbuf.dptr = brlh->key.data; + kbuf.dsize = brlh->key.length; + + rbuf.dptr = (uint8_t *)&req; + rbuf.dsize = sizeof(req); + + ZERO_STRUCT(req); + req.brl = brl; + req.server = brl->server; + req.ntvfs = brlh->ntvfs; + + DEBUG(0,("brl_ctdb_close %u.%u %p\n", req.server.node, req.server.id, brl)); + + ret = ctdb_call(brl->ctdb, kbuf, FUNC_BRL_CLOSE, &rbuf, &sbuf); + if (ret == -1) { + DEBUG(0,("ctdb_call failed - %s\n", __location__)); + return NT_STATUS_INTERNAL_DB_CORRUPTION; + } + + status = *(NTSTATUS *)sbuf.dptr; + talloc_free(sbuf.dptr); + + return status; +} + + +static const struct brlock_ops brlock_tdb_ops = { + .brl_init = brl_ctdb_init, + .brl_create_handle = brl_ctdb_create_handle, + .brl_lock = brl_ctdb_lock, + .brl_unlock = brl_ctdb_unlock, + .brl_remove_pending = brl_ctdb_remove_pending, + .brl_locktest = brl_ctdb_locktest, + .brl_close = brl_ctdb_close +}; + + +void brl_ctdb_init_ops(void) +{ + struct ctdb_context *ctdb = talloc_get_type(cluster_private(), struct ctdb_context); + + brl_set_ops(&brlock_tdb_ops); + + ctdb_set_call(ctdb, brl_ctdb_lock_func, FUNC_BRL_LOCK); + ctdb_set_call(ctdb, brl_ctdb_unlock_func, FUNC_BRL_UNLOCK); + ctdb_set_call(ctdb, brl_ctdb_remove_pending_func, FUNC_BRL_REMOVE_PENDING); + ctdb_set_call(ctdb, brl_ctdb_locktest_func, FUNC_BRL_LOCKTEST); + ctdb_set_call(ctdb, brl_ctdb_close_func, FUNC_BRL_CLOSE); + +} diff --git a/source4/cluster/ctdb/common/ctdb.c b/source4/cluster/ctdb/common/ctdb.c new file mode 100644 index 0000000000..ad0345b3c7 --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb.c @@ -0,0 +1,287 @@ +/* + ctdb main protocol code + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/tdb/include/tdb.h" +#include "lib/events/events.h" +#include "lib/util/dlinklist.h" +#include "system/network.h" +#include "system/filesys.h" +#include "cluster/ctdb/include/ctdb_private.h" + +/* + choose the transport we will use +*/ +int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport) +{ + int ctdb_tcp_init(struct ctdb_context *ctdb); + + if (strcmp(transport, "tcp") == 0) { + return ctdb_tcp_init(ctdb); + } + ctdb_set_error(ctdb, "Unknown transport '%s'\n", transport); + return -1; +} + +/* + set some ctdb flags +*/ +void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags) +{ + ctdb->flags |= flags; +} + + +/* + add a node to the list of active nodes +*/ +static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr) +{ + struct ctdb_node *node, **nodep; + + nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1); + CTDB_NO_MEMORY(ctdb, nodep); + + ctdb->nodes = nodep; + nodep = &ctdb->nodes[ctdb->num_nodes]; + (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node); + CTDB_NO_MEMORY(ctdb, *nodep); + node = *nodep; + + if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) { + return -1; + } + node->ctdb = ctdb; + node->name = talloc_asprintf(node, "%s:%u", + node->address.address, + node->address.port); + /* for now we just set the vnn to the line in the file - this + will change! */ + node->vnn = ctdb->num_nodes; + + if (ctdb->methods->add_node(node) != 0) { + talloc_free(node); + return -1; + } + + if (ctdb_same_address(&ctdb->address, &node->address)) { + ctdb->vnn = node->vnn; + } + + ctdb->num_nodes++; + + return 0; +} + +/* + setup the node list from a file +*/ +int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist) +{ + char **lines; + int nlines; + int i; + + lines = file_lines_load(nlist, &nlines, ctdb); + if (lines == NULL) { + ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist); + return -1; + } + + for (i=0;i<nlines;i++) { + if (ctdb_add_node(ctdb, lines[i]) != 0) { + talloc_free(lines); + return -1; + } + } + + talloc_free(lines); + return 0; +} + +/* + setup the local node address +*/ +int ctdb_set_address(struct ctdb_context *ctdb, const char *address) +{ + if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) { + return -1; + } + + ctdb->name = talloc_asprintf(ctdb, "%s:%u", + ctdb->address.address, + ctdb->address.port); + return 0; +} + +/* + add a node to the list of active nodes +*/ +int ctdb_set_call(struct ctdb_context *ctdb, ctdb_fn_t fn, int id) +{ + struct ctdb_registered_call *call; + + call = talloc(ctdb, struct ctdb_registered_call); + call->fn = fn; + call->id = id; + + DLIST_ADD(ctdb->calls, call); + return 0; +} + +/* + return the vnn of this node +*/ +uint32_t ctdb_get_vnn(struct ctdb_context *ctdb) +{ + return ctdb->vnn; +} + +/* + start the protocol going +*/ +int ctdb_start(struct ctdb_context *ctdb) +{ + return ctdb->methods->start(ctdb); +} + +/* + called by the transport layer when a packet comes in +*/ +static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length) +{ + struct ctdb_req_header *hdr; + if (length < sizeof(*hdr)) { + ctdb_set_error(ctdb, "Bad packet length %d\n", length); + return; + } + hdr = (struct ctdb_req_header *)data; + if (length != hdr->length) { + ctdb_set_error(ctdb, "Bad header length %d expected %d\n", + hdr->length, length); + return; + } + + DEBUG(0,("got ctdb op %d reqid %d\n", hdr->operation, hdr->reqid)); + + switch (hdr->operation) { + case CTDB_REQ_CALL: + ctdb_request_call(ctdb, hdr); + break; + + case CTDB_REPLY_CALL: + ctdb_reply_call(ctdb, hdr); + break; + + case CTDB_REPLY_ERROR: + ctdb_reply_error(ctdb, hdr); + break; + + case CTDB_REPLY_REDIRECT: + ctdb_reply_redirect(ctdb, hdr); + break; + + case CTDB_REQ_DMASTER: + ctdb_request_dmaster(ctdb, hdr); + break; + + case CTDB_REPLY_DMASTER: + ctdb_reply_dmaster(ctdb, hdr); + break; + + default: + printf("Packet with unknown operation %d\n", hdr->operation); + talloc_free(hdr); + break; + } +} + +/* + called by the transport layer when a node is dead +*/ +static void ctdb_node_dead(struct ctdb_node *node) +{ + node->ctdb->num_connected--; + printf("%s: node %s is dead: %d connected\n", + node->ctdb->name, node->name, node->ctdb->num_connected); +} + +/* + called by the transport layer when a node is dead +*/ +static void ctdb_node_connected(struct ctdb_node *node) +{ + node->ctdb->num_connected++; + printf("%s: connected to %s - %d connected\n", + node->ctdb->name, node->name, node->ctdb->num_connected); +} + +/* + wait for all nodes to be connected +*/ +void ctdb_connect_wait(struct ctdb_context *ctdb) +{ + int expected = ctdb->num_nodes - 1; + if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) { + expected++; + } + while (ctdb->num_connected != expected) { + event_loop_once(ctdb->ev); + } +} + +/* + wait until we're the only node left +*/ +void ctdb_wait_loop(struct ctdb_context *ctdb) +{ + int expected = 0; + if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) { + expected++; + } + while (ctdb->num_connected > expected) { + event_loop_once(ctdb->ev); + } +} + +static const struct ctdb_upcalls ctdb_upcalls = { + .recv_pkt = ctdb_recv_pkt, + .node_dead = ctdb_node_dead, + .node_connected = ctdb_node_connected +}; + +/* + initialise the ctdb daemon. + + NOTE: In current code the daemon does not fork. This is for testing purposes only + and to simplify the code. +*/ +struct ctdb_context *ctdb_init(struct event_context *ev) +{ + struct ctdb_context *ctdb; + + ctdb = talloc_zero(ev, struct ctdb_context); + ctdb->ev = ev; + ctdb->upcalls = &ctdb_upcalls; + ctdb->idr = idr_init(ctdb); + + return ctdb; +} + diff --git a/source4/cluster/ctdb/common/ctdb_call.c b/source4/cluster/ctdb/common/ctdb_call.c new file mode 100644 index 0000000000..2bedccc86a --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb_call.c @@ -0,0 +1,653 @@ +/* + ctdb_call protocol code + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + see http://wiki.samba.org/index.php/Samba_%26_Clustering for + protocol design and packet details +*/ +#include "includes.h" +#include "lib/events/events.h" +#include "lib/tdb/include/tdb.h" +#include "system/network.h" +#include "system/filesys.h" +#include "cluster/ctdb/include/ctdb_private.h" + + +/* + queue a packet or die +*/ +static void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_node *node; + DEBUG(0,("queueing destnode=%u srcnode=%u\n", hdr->destnode, hdr->srcnode)); + node = ctdb->nodes[hdr->destnode]; + if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) { + ctdb_fatal(ctdb, "Unable to queue packet\n"); + } +} + + +/* + local version of ctdb_call +*/ +static int ctdb_call_local(struct ctdb_context *ctdb, TDB_DATA key, + struct ctdb_ltdb_header *header, TDB_DATA *data, + int call_id, TDB_DATA *call_data, TDB_DATA *reply_data, + uint32_t caller) +{ + struct ctdb_call *c; + struct ctdb_registered_call *fn; + + c = talloc(ctdb, struct ctdb_call); + CTDB_NO_MEMORY(ctdb, c); + + c->key = key; + c->call_data = call_data; + c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize); + c->record_data.dsize = data->dsize; + CTDB_NO_MEMORY(ctdb, c->record_data.dptr); + c->new_data = NULL; + c->reply_data = NULL; + + for (fn=ctdb->calls;fn;fn=fn->next) { + if (fn->id == call_id) break; + } + if (fn == NULL) { + ctdb_set_error(ctdb, "Unknown call id %u\n", call_id); + return -1; + } + + if (fn->fn(c) != 0) { + ctdb_set_error(ctdb, "ctdb_call %u failed\n", call_id); + return -1; + } + + if (header->laccessor != caller) { + header->lacount = 0; + } + header->laccessor = caller; + header->lacount++; + + /* we need to force the record to be written out if this was a remote access, + so that the lacount is updated */ + if (c->new_data == NULL && header->laccessor != ctdb->vnn) { + c->new_data = &c->record_data; + } + + if (c->new_data) { + if (ctdb_ltdb_store(ctdb, key, header, *c->new_data) != 0) { + ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n"); + return -1; + } + } + + if (reply_data) { + if (c->reply_data) { + *reply_data = *c->reply_data; + talloc_steal(ctdb, reply_data->dptr); + } else { + reply_data->dptr = NULL; + reply_data->dsize = 0; + } + } + + talloc_free(c); + + return 0; +} + +/* + send an error reply +*/ +static void ctdb_send_error(struct ctdb_context *ctdb, + struct ctdb_req_header *hdr, uint32_t status, + const char *fmt, ...) +{ + va_list ap; + struct ctdb_reply_error *r; + char *msg; + int len; + + va_start(ap, fmt); + msg = talloc_vasprintf(ctdb, fmt, ap); + if (msg == NULL) { + ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n"); + } + va_end(ap); + + len = strlen(msg)+1; + r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + len); + CTDB_NO_MEMORY_FATAL(ctdb, r); + r->hdr.length = sizeof(*r) + len; + r->hdr.operation = CTDB_REPLY_ERROR; + r->hdr.destnode = hdr->srcnode; + r->hdr.srcnode = ctdb->vnn; + r->hdr.reqid = hdr->reqid; + r->status = status; + r->msglen = len; + memcpy(&r->msg[0], msg, len); + + talloc_free(msg); + + ctdb_queue_packet(ctdb, &r->hdr); + + talloc_free(r); +} + + +/* + send a redirect reply +*/ +static void ctdb_call_send_redirect(struct ctdb_context *ctdb, + struct ctdb_req_call *c, + struct ctdb_ltdb_header *header) +{ + struct ctdb_reply_redirect *r; + + r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r)); + CTDB_NO_MEMORY_FATAL(ctdb, r); + r->hdr.length = sizeof(*r); + r->hdr.operation = CTDB_REPLY_REDIRECT; + r->hdr.destnode = c->hdr.srcnode; + r->hdr.srcnode = ctdb->vnn; + r->hdr.reqid = c->hdr.reqid; + r->dmaster = header->dmaster; + + ctdb_queue_packet(ctdb, &r->hdr); + + talloc_free(r); +} + +/* + send a dmaster request (give another node the dmaster for a record) + + This is always sent to the lmaster, which ensures that the lmaster + always knows who the dmaster is. The lmaster will then send a + CTDB_REPLY_DMASTER to the new dmaster +*/ +static void ctdb_call_send_dmaster(struct ctdb_context *ctdb, + struct ctdb_req_call *c, + struct ctdb_ltdb_header *header, + TDB_DATA *key, TDB_DATA *data) +{ + struct ctdb_req_dmaster *r; + int len; + + len = sizeof(*r) + key->dsize + data->dsize; + r = ctdb->methods->allocate_pkt(ctdb, len); + CTDB_NO_MEMORY_FATAL(ctdb, r); + r->hdr.length = len; + r->hdr.operation = CTDB_REQ_DMASTER; + r->hdr.destnode = ctdb_lmaster(ctdb, key); + r->hdr.srcnode = ctdb->vnn; + r->hdr.reqid = c->hdr.reqid; + r->dmaster = header->laccessor; + r->keylen = key->dsize; + r->datalen = data->dsize; + memcpy(&r->data[0], key->dptr, key->dsize); + memcpy(&r->data[key->dsize], data->dptr, data->dsize); + + if (r->hdr.destnode == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { + /* we are the lmaster - don't send to ourselves */ + DEBUG(0,("XXXX local ctdb_req_dmaster\n")); + ctdb_request_dmaster(ctdb, &r->hdr); + } else { + ctdb_queue_packet(ctdb, &r->hdr); + + /* update the ltdb to record the new dmaster */ + header->dmaster = r->hdr.destnode; + ctdb_ltdb_store(ctdb, *key, header, *data); + } + + talloc_free(r); +} + + +/* + called when a CTDB_REQ_DMASTER packet comes in + + this comes into the lmaster for a record when the current dmaster + wants to give up the dmaster role and give it to someone else +*/ +void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr; + struct ctdb_reply_dmaster *r; + TDB_DATA key, data; + struct ctdb_ltdb_header header; + int ret; + + key.dptr = c->data; + key.dsize = c->keylen; + data.dptr = c->data + c->keylen; + data.dsize = c->datalen; + + DEBUG(0,("request dmaster reqid=%d\n", hdr->reqid)); + + /* fetch the current record */ + ret = ctdb_ltdb_fetch(ctdb, key, &header, &data); + if (ret != 0) { + ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record"); + return; + } + + { + int i, fd = open("/dev/null", O_WRONLY); + for (i=0;i<data.dsize;i++) { + write(fd, &data.dptr[i], 1); + } + close(fd); + } + + /* its a protocol error if the sending node is not the current dmaster */ + if (header.dmaster != hdr->srcnode) { + ctdb_fatal(ctdb, "dmaster request from non-master"); + return; + } + + DEBUG(0,("request dmaster reqid=%d %s\n", hdr->reqid, __location__)); + + header.dmaster = c->dmaster; + if (ctdb_ltdb_store(ctdb, key, &header, data) != 0) { + ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster"); + return; + } + + { + int i, fd = open("/dev/null", O_WRONLY); + for (i=0;i<data.dsize;i++) { + write(fd, &data.dptr[i], 1); + } + close(fd); + } + + /* send the CTDB_REPLY_DMASTER */ + r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + data.dsize); + CTDB_NO_MEMORY_FATAL(ctdb, r); + r->hdr.length = sizeof(*r) + data.dsize; + r->hdr.operation = CTDB_REPLY_DMASTER; + r->hdr.destnode = c->dmaster; + r->hdr.srcnode = ctdb->vnn; + r->hdr.reqid = hdr->reqid; + r->datalen = data.dsize; + memcpy(&r->data[0], data.dptr, data.dsize); + + { + int i, fd = open("/dev/null", O_WRONLY); + for (i=0;i<data.dsize;i++) { + write(fd, &data.dptr[i], 1); + } + close(fd); + } + + DEBUG(0,("request dmaster reqid=%d %s\n", hdr->reqid, __location__)); + + if (0 && r->hdr.destnode == r->hdr.srcnode) { + ctdb_reply_dmaster(ctdb, &r->hdr); + } else { + ctdb_queue_packet(ctdb, &r->hdr); + DEBUG(0,("request dmaster reqid=%d %s\n", hdr->reqid, __location__)); + + talloc_free(r); + } +} + + +/* + called when a CTDB_REQ_CALL packet comes in +*/ +void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_req_call *c = (struct ctdb_req_call *)hdr; + TDB_DATA key, data, call_data, reply_data; + struct ctdb_reply_call *r; + int ret; + struct ctdb_ltdb_header header; + + key.dptr = c->data; + key.dsize = c->keylen; + call_data.dptr = c->data + c->keylen; + call_data.dsize = c->calldatalen; + + /* determine if we are the dmaster for this key. This also + fetches the record data (if any), thus avoiding a 2nd fetch of the data + if the call will be answered locally */ + ret = ctdb_ltdb_fetch(ctdb, key, &header, &data); + if (ret != 0) { + ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call"); + return; + } + + /* if we are not the dmaster, then send a redirect to the + requesting node */ + if (header.dmaster != ctdb->vnn) { + ctdb_call_send_redirect(ctdb, c, &header); + talloc_free(data.dptr); + return; + } + + /* if this nodes has done enough consecutive calls on the same record + then give them the record */ + if (header.laccessor == c->hdr.srcnode && + header.lacount >= CTDB_MAX_LACOUNT) { + ctdb_call_send_dmaster(ctdb, c, &header, &key, &data); + talloc_free(data.dptr); + return; + } + + ctdb_call_local(ctdb, key, &header, &data, c->callid, + call_data.dsize?&call_data:NULL, + &reply_data, c->hdr.srcnode); + + r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + reply_data.dsize); + CTDB_NO_MEMORY_FATAL(ctdb, r); + r->hdr.length = sizeof(*r) + reply_data.dsize; + r->hdr.operation = CTDB_REPLY_CALL; + r->hdr.destnode = hdr->srcnode; + r->hdr.srcnode = hdr->destnode; + r->hdr.reqid = hdr->reqid; + r->datalen = reply_data.dsize; + memcpy(&r->data[0], reply_data.dptr, reply_data.dsize); + + ctdb_queue_packet(ctdb, &r->hdr); + + talloc_free(reply_data.dptr); + talloc_free(r); +} + +enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR}; + +/* + state of a in-progress ctdb call +*/ +struct ctdb_call_state { + enum call_state state; + struct ctdb_req_call *c; + struct ctdb_node *node; + const char *errmsg; + TDB_DATA call_data; + TDB_DATA reply_data; + TDB_DATA key; + int redirect_count; + struct ctdb_ltdb_header header; +}; + + +/* + called when a CTDB_REPLY_CALL packet comes in + + This packet comes in response to a CTDB_REQ_CALL request packet. It + contains any reply data freom the call +*/ +void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr; + struct ctdb_call_state *state; + TDB_DATA reply_data; + + state = idr_find(ctdb->idr, hdr->reqid); + + reply_data.dptr = c->data; + reply_data.dsize = c->datalen; + + state->reply_data = reply_data; + + talloc_steal(state, c); + + state->state = CTDB_CALL_DONE; +} + +/* + called when a CTDB_REPLY_DMASTER packet comes in + + This packet comes in from the lmaster response to a CTDB_REQ_CALL + request packet. It means that the current dmaster wants to give us + the dmaster role +*/ +void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr; + struct ctdb_call_state *state; + TDB_DATA data; + + state = idr_find(ctdb->idr, hdr->reqid); + + data.dptr = c->data; + data.dsize = c->datalen; + + talloc_steal(state, c); + + /* we're now the dmaster - update our local ltdb with new header + and data */ + state->header.dmaster = ctdb->vnn; + + if (ctdb_ltdb_store(ctdb, state->key, &state->header, data) != 0) { + ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n"); + return; + } + + ctdb_call_local(ctdb, state->key, &state->header, &data, state->c->callid, + state->call_data.dsize?&state->call_data:NULL, + &state->reply_data, ctdb->vnn); + + state->state = CTDB_CALL_DONE; +} + + +/* + called when a CTDB_REPLY_ERROR packet comes in +*/ +void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr; + struct ctdb_call_state *state; + + state = idr_find(ctdb->idr, hdr->reqid); + + talloc_steal(state, c); + + state->state = CTDB_CALL_ERROR; + state->errmsg = (char *)c->msg; +} + + +/* + called when a CTDB_REPLY_REDIRECT packet comes in + + This packet arrives when we have sent a CTDB_REQ_CALL request and + the node that received it is not the dmaster for the given key. We + are given a hint as to what node to try next. +*/ +void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr; + struct ctdb_call_state *state; + + state = idr_find(ctdb->idr, hdr->reqid); + + talloc_steal(state, c); + + /* don't allow for too many redirects */ + if (state->redirect_count++ == CTDB_MAX_REDIRECT) { + c->dmaster = ctdb_lmaster(ctdb, &state->key); + } + + /* send it off again */ + state->node = ctdb->nodes[c->dmaster]; + + ctdb_queue_packet(ctdb, &state->c->hdr); +} + +/* + destroy a ctdb_call +*/ +static int ctdb_call_destructor(struct ctdb_call_state *state) +{ +// idr_remove(state->node->ctdb->idr, state->c->hdr.reqid); + return 0; +} + + +/* + called when a ctdb_call times out +*/ +void ctdb_call_timeout(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private) +{ + struct ctdb_call_state *state = talloc_get_type(private, struct ctdb_call_state); + state->state = CTDB_CALL_ERROR; + ctdb_set_error(state->node->ctdb, "ctdb_call %u timed out", + state->c->hdr.reqid); +} + +/* + construct an event driven local ctdb_call + + this is used so that locally processed ctdb_call requests are processed + in an event driven manner +*/ +struct ctdb_call_state *ctdb_call_local_send(struct ctdb_context *ctdb, + TDB_DATA key, int call_id, + TDB_DATA *call_data, TDB_DATA *reply_data, + struct ctdb_ltdb_header *header, + TDB_DATA *data) +{ + struct ctdb_call_state *state; + int ret; + + state = talloc_zero(ctdb, struct ctdb_call_state); + CTDB_NO_MEMORY_NULL(ctdb, state); + + state->state = CTDB_CALL_DONE; + state->node = ctdb->nodes[ctdb->vnn]; + + ret = ctdb_call_local(ctdb, key, header, data, + call_id, call_data, &state->reply_data, + ctdb->vnn); + return state; +} + + +/* + make a remote ctdb call - async send + + This constructs a ctdb_call request and queues it for processing. + This call never blocks. +*/ +struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb, + TDB_DATA key, int call_id, + TDB_DATA *call_data, TDB_DATA *reply_data) +{ + uint32_t len; + struct ctdb_call_state *state; + int ret; + struct ctdb_ltdb_header header; + TDB_DATA data; + + /* + if we are the dmaster for this key then we don't need to + send it off at all, we can bypass the network and handle it + locally. To find out if we are the dmaster we need to look + in our ltdb + */ + ret = ctdb_ltdb_fetch(ctdb, key, &header, &data); + if (ret != 0) return NULL; + + if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { + return ctdb_call_local_send(ctdb, key, call_id, call_data, reply_data, + &header, &data); + } + + state = talloc_zero(ctdb, struct ctdb_call_state); + CTDB_NO_MEMORY_NULL(ctdb, state); + + len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0); + state->c = ctdb->methods->allocate_pkt(ctdb, len); + CTDB_NO_MEMORY_NULL(ctdb, state->c); + + state->c->hdr.length = len; + state->c->hdr.operation = CTDB_REQ_CALL; + state->c->hdr.destnode = header.dmaster; + state->c->hdr.srcnode = ctdb->vnn; + /* this limits us to 16k outstanding messages - not unreasonable */ + state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + DEBUG(0,("Allocate reqid %u\n", state->c->hdr.reqid)); + state->c->callid = call_id; + state->c->keylen = key.dsize; + state->c->calldatalen = call_data?call_data->dsize:0; + memcpy(&state->c->data[0], key.dptr, key.dsize); + if (call_data) { + memcpy(&state->c->data[key.dsize], call_data->dptr, call_data->dsize); + state->call_data.dptr = &state->c->data[key.dsize]; + state->call_data.dsize = call_data->dsize; + } + state->key.dptr = &state->c->data[0]; + state->key.dsize = key.dsize; + + state->node = ctdb->nodes[header.dmaster]; + state->state = CTDB_CALL_WAIT; + state->header = header; + + talloc_set_destructor(state, ctdb_call_destructor); + + ctdb_queue_packet(ctdb, &state->c->hdr); + + event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), + ctdb_call_timeout, state); + return state; +} + + +/* + make a remote ctdb call - async recv. + + This is called when the program wants to wait for a ctdb_call to complete and get the + results. This call will block unless the call has already completed. +*/ +int ctdb_call_recv(struct ctdb_call_state *state, TDB_DATA *reply_data) +{ + while (state->state < CTDB_CALL_DONE) { + event_loop_once(state->node->ctdb->ev); + } + if (state->state != CTDB_CALL_DONE) { + ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + talloc_free(state); + return -1; + } + if (reply_data) { + reply_data->dptr = talloc_memdup(state->node->ctdb, + state->reply_data.dptr, + state->reply_data.dsize); + reply_data->dsize = state->reply_data.dsize; + } + talloc_free(state); + return 0; +} + +/* + full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv() +*/ +int ctdb_call(struct ctdb_context *ctdb, + TDB_DATA key, int call_id, + TDB_DATA *call_data, TDB_DATA *reply_data) +{ + struct ctdb_call_state *state; + state = ctdb_call_send(ctdb, key, call_id, call_data, reply_data); + return ctdb_call_recv(state, reply_data); +} diff --git a/source4/cluster/ctdb/common/ctdb_ltdb.c b/source4/cluster/ctdb/common/ctdb_ltdb.c new file mode 100644 index 0000000000..bc15a3e898 --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb_ltdb.c @@ -0,0 +1,139 @@ +/* + ctdb ltdb code + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "lib/tdb/include/tdb.h" +#include "system/network.h" +#include "system/filesys.h" +#include "cluster/ctdb/include/ctdb_private.h" + +/* + attach to a specific database +*/ +int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags, + int open_flags, mode_t mode) +{ + /* when we have a separate daemon this will need to be a real + file, not a TDB_INTERNAL, so the parent can access it to + for ltdb bypass */ + ctdb->ltdb = tdb_open(name, 0, /* tdb_flags */ TDB_INTERNAL, open_flags, mode); + if (ctdb->ltdb == NULL) { + ctdb_set_error(ctdb, "Failed to open tdb %s\n", name); + return -1; + } + return 0; +} + +/* + return the lmaster given a key +*/ +uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key) +{ + return ctdb_hash(key) % ctdb->num_nodes; +} + + +/* + construct an initial header for a record with no ltdb header yet +*/ +static void ltdb_initial_header(struct ctdb_context *ctdb, + TDB_DATA key, + struct ctdb_ltdb_header *header) +{ + header->rsn = 0; + /* initial dmaster is the lmaster */ + header->dmaster = ctdb_lmaster(ctdb, &key); + header->laccessor = header->dmaster; + header->lacount = 0; +} + + +/* + fetch a record from the ltdb, separating out the header information + and returning the body of the record. A valid (initial) header is + returned if the record is not present +*/ +int ctdb_ltdb_fetch(struct ctdb_context *ctdb, + TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA *data) +{ + TDB_DATA rec; + + rec = tdb_fetch(ctdb->ltdb, key); + if (rec.dsize < sizeof(*header)) { + /* return an initial header */ + free(rec.dptr); + ltdb_initial_header(ctdb, key, header); + data->dptr = NULL; + data->dsize = 0; + return 0; + } + + *header = *(struct ctdb_ltdb_header *)rec.dptr; + + data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header); + data->dptr = talloc_memdup(ctdb, sizeof(struct ctdb_ltdb_header)+rec.dptr, + data->dsize); + free(rec.dptr); + CTDB_NO_MEMORY(ctdb, data->dptr); + + { + int i, fd = open("/dev/null", O_WRONLY); + for (i=0;i<data->dsize;i++) { + write(fd, &data->dptr[i], 1); + } + close(fd); + } + + return 0; +} + + +/* + fetch a record from the ltdb, separating out the header information + and returning the body of the record. A valid (initial) header is + returned if the record is not present +*/ +int ctdb_ltdb_store(struct ctdb_context *ctdb, TDB_DATA key, + struct ctdb_ltdb_header *header, TDB_DATA data) +{ + TDB_DATA rec; + int ret; + + rec.dsize = sizeof(*header) + data.dsize; + rec.dptr = talloc_size(ctdb, rec.dsize); + CTDB_NO_MEMORY(ctdb, rec.dptr); + + memcpy(rec.dptr, header, sizeof(*header)); + memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize); + + { + int i, fd = open("/dev/null", O_WRONLY); + for (i=0;i<rec.dsize;i++) { + write(fd, &rec.dptr[i], 1); + } + close(fd); + } + + ret = tdb_store(ctdb->ltdb, key, rec, TDB_REPLACE); + talloc_free(rec.dptr); + + return ret; +} diff --git a/source4/cluster/ctdb/common/ctdb_util.c b/source4/cluster/ctdb/common/ctdb_util.c new file mode 100644 index 0000000000..8e25759609 --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb_util.c @@ -0,0 +1,103 @@ +/* + ctdb utility code + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "lib/tdb/include/tdb.h" +#include "system/network.h" +#include "system/filesys.h" +#include "cluster/ctdb/include/ctdb_private.h" + +/* + return error string for last error +*/ +const char *ctdb_errstr(struct ctdb_context *ctdb) +{ + return ctdb->err_msg; +} + + +/* + remember an error message +*/ +void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...) +{ + va_list ap; + talloc_free(ctdb->err_msg); + va_start(ap, fmt); + ctdb->err_msg = talloc_vasprintf(ctdb, fmt, ap); + DEBUG(0,("ctdb error: %s\n", ctdb->err_msg)); + va_end(ap); +} + + +/* + a fatal internal error occurred - no hope for recovery +*/ +void ctdb_fatal(struct ctdb_context *ctdb, const char *msg) +{ + DEBUG(0,("ctdb fatal error: %s\n", msg)); + fprintf(stderr, "ctdb fatal error: '%s'\n", msg); + abort(); +} + +/* + parse a IP:port pair +*/ +int ctdb_parse_address(struct ctdb_context *ctdb, + TALLOC_CTX *mem_ctx, const char *str, + struct ctdb_address *address) +{ + char *p; + p = strchr(str, ':'); + if (p == NULL) { + ctdb_set_error(ctdb, "Badly formed node '%s'\n", str); + return -1; + } + + address->address = talloc_strndup(mem_ctx, str, p-str); + address->port = strtoul(p+1, NULL, 0); + return 0; +} + + +/* + check if two addresses are the same +*/ +bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2) +{ + return strcmp(a1->address, a2->address) == 0 && a1->port == a2->port; +} + + +/* + hash function for mapping data to a VNN - taken from tdb +*/ +uint32_t ctdb_hash(const TDB_DATA *key) +{ + uint32_t value; /* Used to compute the hash value. */ + uint32_t i; /* Used to cycle through random values. */ + + /* Set the initial value from the key size. */ + for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++) + value = (value + (key->dptr[i] << (i*5 % 24))); + + return (1103515243 * value + 12345); +} diff --git a/source4/cluster/ctdb/config.mk b/source4/cluster/ctdb/config.mk new file mode 100644 index 0000000000..90897eedd3 --- /dev/null +++ b/source4/cluster/ctdb/config.mk @@ -0,0 +1,24 @@ +################## +[MODULE::brlock_ctdb] +SUBSYSTEM = ntvfs_common +OBJ_FILES = brlock_ctdb.o + +################## +[MODULE::ctdb_tcp] +SUBSYSTEM = CLUSTER +OBJ_FILES = \ + tcp/tcp_init.o \ + tcp/tcp_io.o \ + tcp/tcp_connect.o + +################## +[MODULE::ctdb] +SUBSYSTEM = CLUSTER +OBJ_FILES = \ + ctdb_cluster.o \ + common/ctdb.o \ + common/ctdb_call.o \ + common/ctdb_ltdb.o \ + common/ctdb_util.o +PRIVATE_DEPENDENCIES = ctdb_tcp +PUBLIC_DEPENDENCIES = LIBTDB LIBTALLOC diff --git a/source4/cluster/ctdb/ctdb_cluster.c b/source4/cluster/ctdb/ctdb_cluster.c new file mode 100644 index 0000000000..df16f2f8b5 --- /dev/null +++ b/source4/cluster/ctdb/ctdb_cluster.c @@ -0,0 +1,138 @@ +/* + Unix SMB/CIFS implementation. + + ctdb clustering hooks + + Copyright (C) Andrew Tridgell 2006 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "cluster/cluster.h" +#include "system/filesys.h" +#include "cluster/cluster_private.h" +#include "lib/tdb/include/tdb.h" +#include "cluster/ctdb/include/ctdb.h" + +struct cluster_state { + struct ctdb_context *ctdb; +}; + + +/* + return a server_id for a ctdb node +*/ +static struct server_id ctdb_id(struct cluster_ops *ops, uint32_t id) +{ + struct ctdb_context *ctdb = ops->private; + struct server_id server_id; + server_id.node = ctdb_get_vnn(ctdb); + server_id.id = id; + return server_id; +} + + +/* + return a server_id as a string +*/ +static const char *ctdb_id_string(struct cluster_ops *ops, + TALLOC_CTX *mem_ctx, struct server_id id) +{ + return talloc_asprintf(mem_ctx, "%u.%u", id.node, id.id); +} + +static struct cluster_ops cluster_ctdb_ops = { + .cluster_id = ctdb_id, + .cluster_id_string = ctdb_id_string, + .private = NULL +}; + +/* initialise ctdb */ +void cluster_ctdb_init(struct event_context *ev) +{ + const char *nlist; + const char *address; + const char *transport; + struct cluster_state *state; + int ret; + + nlist = lp_parm_string(-1, "ctdb", "nlist"); + if (nlist == NULL) return; + + address = lp_parm_string(-1, "ctdb", "address"); + if (address == NULL) return; + + transport = lp_parm_string(-1, "ctdb", "transport"); + if (transport == NULL) { + transport = "tcp"; + } + + state = talloc(ev, struct cluster_state); + if (state == NULL) goto failed; + + state->ctdb = ctdb_init(ev); + if (state->ctdb == NULL) goto failed; + + cluster_ctdb_ops.private = state->ctdb; + + ret = ctdb_set_transport(state->ctdb, transport); + if (ret == -1) { + DEBUG(0,("ctdb_set_transport failed - %s\n", + ctdb_errstr(state->ctdb))); + goto failed; + } + +// ctdb_set_flags(state->ctdb, CTDB_FLAG_SELF_CONNECT); + + /* tell ctdb what address to listen on */ + ret = ctdb_set_address(state->ctdb, address); + if (ret == -1) { + DEBUG(0,("ctdb_set_address failed - %s\n", ctdb_errstr(state->ctdb))); + goto failed; + } + + /* tell ctdb what nodes are available */ + ret = ctdb_set_nlist(state->ctdb, nlist); + if (ret == -1) { + DEBUG(0,("ctdb_set_nlist failed - %s\n", ctdb_errstr(state->ctdb))); + goto failed; + } + + ret = ctdb_attach(state->ctdb, "cluster.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666); + if (ret == -1) { + DEBUG(0,("ctdb_attach failed - %s\n", ctdb_errstr(state->ctdb))); + goto failed; + } + + /* start the protocol running */ + ret = ctdb_start(state->ctdb); + if (ret == -1) { + DEBUG(0,("ctdb_start failed - %s\n", ctdb_errstr(state->ctdb))); + goto failed; + } + + /* wait until all nodes are connected (should not be needed + outide of test code) */ + ctdb_connect_wait(state->ctdb); + + cluster_set_ops(&cluster_ctdb_ops); + return; + +failed: + DEBUG(0,("cluster_ctdb_init failed\n")); + talloc_free(state); +} diff --git a/source4/cluster/ctdb/ctdb_cluster.h b/source4/cluster/ctdb/ctdb_cluster.h new file mode 100644 index 0000000000..5f93df960c --- /dev/null +++ b/source4/cluster/ctdb/ctdb_cluster.h @@ -0,0 +1,23 @@ +/* + Unix SMB/CIFS implementation. + + ctdb clustering hooks - header + + Copyright (C) Andrew Tridgell 2006 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +*/ + +void cluster_ctdb_init(struct event_context *ev); diff --git a/source4/cluster/ctdb/include/ctdb.h b/source4/cluster/ctdb/include/ctdb.h new file mode 100644 index 0000000000..21b9b5d1ce --- /dev/null +++ b/source4/cluster/ctdb/include/ctdb.h @@ -0,0 +1,117 @@ +/* + ctdb database library + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#ifndef _CTDB_H +#define _CTDB_H + +/* + structure passed to a ctdb call function +*/ +struct ctdb_call { + TDB_DATA key; /* record key */ + TDB_DATA record_data; /* current data in the record */ + TDB_DATA *new_data; /* optionally updated record data */ + TDB_DATA *call_data; /* optionally passed from caller */ + TDB_DATA *reply_data; /* optionally returned by function */ +}; + +#define CTDB_ERR_INVALID 1 +#define CTDB_ERR_NOMEM 2 + +/* + ctdb flags +*/ +#define CTDB_FLAG_SELF_CONNECT (1<<0) + + +struct event_context; + +/* + initialise ctdb subsystem +*/ +struct ctdb_context *ctdb_init(struct event_context *ev); + +/* + choose the transport +*/ +int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport); + +/* + set some flags +*/ +void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags); + +/* + tell ctdb what address to listen on, in transport specific format +*/ +int ctdb_set_address(struct ctdb_context *ctdb, const char *address); + +/* + tell ctdb what nodes are available. This takes a filename, which will contain + 1 node address per line, in a transport specific format +*/ +int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist); + +/* + start the ctdb protocol +*/ +int ctdb_start(struct ctdb_context *ctdb); + +/* + error string for last ctdb error +*/ +const char *ctdb_errstr(struct ctdb_context *); + +/* a ctdb call function */ +typedef int (*ctdb_fn_t)(struct ctdb_call *); + +/* + setup a ctdb call function +*/ +int ctdb_set_call(struct ctdb_context *ctdb, ctdb_fn_t fn, int id); + +/* + attach to a ctdb database +*/ +int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags, + int open_flags, mode_t mode); + + +/* + make a ctdb call. The associated ctdb call function will be called on the DMASTER + for the given record +*/ +int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id, + TDB_DATA *call_data, TDB_DATA *reply_data); + +/* + wait for all nodes to be connected - useful for test code +*/ +void ctdb_connect_wait(struct ctdb_context *ctdb); + +/* + wait until we're the only node left +*/ +void ctdb_wait_loop(struct ctdb_context *ctdb); + +/* return vnn of this node */ +uint32_t ctdb_get_vnn(struct ctdb_context *ctdb); + +#endif diff --git a/source4/cluster/ctdb/include/ctdb_private.h b/source4/cluster/ctdb/include/ctdb_private.h new file mode 100644 index 0000000000..d373e3af32 --- /dev/null +++ b/source4/cluster/ctdb/include/ctdb_private.h @@ -0,0 +1,216 @@ +/* + ctdb database library + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#ifndef _CTDB_PRIVATE_H +#define _CTDB_PRIVATE_H + +#include "ctdb.h" + +/* + an installed ctdb remote call +*/ +struct ctdb_registered_call { + struct ctdb_registered_call *next, *prev; + uint32_t id; + ctdb_fn_t fn; +}; + +/* + this address structure might need to be generalised later for some + transports +*/ +struct ctdb_address { + const char *address; + int port; +}; + +/* + state associated with one node +*/ +struct ctdb_node { + struct ctdb_context *ctdb; + struct ctdb_address address; + const char *name; /* for debug messages */ + void *private; /* private to transport */ + uint32_t vnn; +}; + +/* + transport specific methods +*/ +struct ctdb_methods { + int (*start)(struct ctdb_context *); /* start protocol processing */ + int (*add_node)(struct ctdb_node *); /* setup a new node */ + int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length); + void *(*allocate_pkt)(struct ctdb_context *, size_t ); +}; + +/* + transport calls up to the ctdb layer +*/ +struct ctdb_upcalls { + /* recv_pkt is called when a packet comes in */ + void (*recv_pkt)(struct ctdb_context *, uint8_t *data, uint32_t length); + + /* node_dead is called when an attempt to send to a node fails */ + void (*node_dead)(struct ctdb_node *); + + /* node_connected is called when a connection to a node is established */ + void (*node_connected)(struct ctdb_node *); +}; + +/* main state of the ctdb daemon */ +struct ctdb_context { + struct event_context *ev; + struct ctdb_address address; + const char *name; + uint32_t vnn; /* our own vnn */ + uint32_t num_nodes; + uint32_t num_connected; + unsigned flags; + struct idr_context *idr; + struct ctdb_node **nodes; /* array of nodes in the cluster - indexed by vnn */ + struct ctdb_registered_call *calls; /* list of registered calls */ + char *err_msg; + struct tdb_context *ltdb; + const struct ctdb_methods *methods; /* transport methods */ + const struct ctdb_upcalls *upcalls; /* transport upcalls */ + void *private; /* private to transport */ +}; + +#define CTDB_NO_MEMORY(ctdb, p) do { if (!(p)) { \ + ctdb_set_error(ctdb, "Out of memory at %s:%d", __FILE__, __LINE__); \ + return -1; }} while (0) + +#define CTDB_NO_MEMORY_NULL(ctdb, p) do { if (!(p)) { \ + ctdb_set_error(ctdb, "Out of memory at %s:%d", __FILE__, __LINE__); \ + return NULL; }} while (0) + +#define CTDB_NO_MEMORY_FATAL(ctdb, p) do { if (!(p)) { \ + ctdb_fatal(ctdb, "Out of memory in " __location__ ); \ + }} while (0) + +/* arbitrary maximum timeout for ctdb operations */ +#define CTDB_REQ_TIMEOUT 10 + +/* max number of redirects before we ask the lmaster */ +#define CTDB_MAX_REDIRECT 2 + +/* number of consecutive calls from the same node before we give them + the record */ +#define CTDB_MAX_LACOUNT 7000 + +/* + the extended header for records in the ltdb +*/ +struct ctdb_ltdb_header { + uint64_t rsn; + uint32_t dmaster; + uint32_t laccessor; + uint32_t lacount; +}; + + +/* + operation IDs +*/ +enum ctdb_operation { + CTDB_REQ_CALL = 0, + CTDB_REPLY_CALL = 1, + CTDB_REPLY_REDIRECT = 2, + CTDB_REQ_DMASTER = 3, + CTDB_REPLY_DMASTER = 4, + CTDB_REPLY_ERROR = 5 +}; + +/* + packet structures +*/ +struct ctdb_req_header { + uint32_t length; + uint32_t operation; + uint32_t destnode; + uint32_t srcnode; + uint32_t reqid; +}; + +struct ctdb_req_call { + struct ctdb_req_header hdr; + uint32_t callid; + uint32_t keylen; + uint32_t calldatalen; + uint8_t data[0]; /* key[] followed by calldata[] */ +}; + +struct ctdb_reply_call { + struct ctdb_req_header hdr; + uint32_t datalen; + uint8_t data[0]; +}; + +struct ctdb_reply_error { + struct ctdb_req_header hdr; + uint32_t status; + uint32_t msglen; + uint8_t msg[0]; +}; + +struct ctdb_reply_redirect { + struct ctdb_req_header hdr; + uint32_t dmaster; +}; + +struct ctdb_req_dmaster { + struct ctdb_req_header hdr; + uint32_t dmaster; + uint32_t keylen; + uint32_t datalen; + uint8_t data[0]; +}; + +struct ctdb_reply_dmaster { + struct ctdb_req_header hdr; + uint32_t datalen; + uint8_t data[0]; +}; + +/* internal prototypes */ +void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...); +void ctdb_fatal(struct ctdb_context *ctdb, const char *msg); +bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2); +int ctdb_parse_address(struct ctdb_context *ctdb, + TALLOC_CTX *mem_ctx, const char *str, + struct ctdb_address *address); +uint32_t ctdb_hash(const TDB_DATA *key); +void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr); +void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr); +void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr); +void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr); +void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr); +void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr); + +uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key); +int ctdb_ltdb_fetch(struct ctdb_context *ctdb, + TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA *data); +int ctdb_ltdb_store(struct ctdb_context *ctdb, TDB_DATA key, + struct ctdb_ltdb_header *header, TDB_DATA data); + + +#endif diff --git a/source4/cluster/ctdb/tcp/ctdb_tcp.h b/source4/cluster/ctdb/tcp/ctdb_tcp.h new file mode 100644 index 0000000000..0f8ce300b4 --- /dev/null +++ b/source4/cluster/ctdb/tcp/ctdb_tcp.h @@ -0,0 +1,76 @@ +/* + ctdb database library + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + + +/* ctdb_tcp main state */ +struct ctdb_tcp { + int listen_fd; +}; + +/* + incoming packet structure - only used when we get a partial packet + on read +*/ +struct ctdb_tcp_partial { + uint8_t *data; + uint32_t length; +}; + + +/* + state associated with an incoming connection +*/ +struct ctdb_incoming { + struct ctdb_context *ctdb; + int fd; + struct ctdb_tcp_partial partial; +}; + +/* + outgoing packet structure - only allocated when we can't write immediately + to the socket +*/ +struct ctdb_tcp_packet { + struct ctdb_tcp_packet *next, *prev; + uint8_t *data; + uint32_t length; +}; + +/* + state associated with one tcp node +*/ +struct ctdb_tcp_node { + int fd; + struct fd_event *fde; + struct ctdb_tcp_packet *queue; +}; + + +/* prototypes internal to tcp transport */ +void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private); +void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private); +int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length); +int ctdb_tcp_listen(struct ctdb_context *ctdb); +void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private); + +#define CTDB_TCP_ALIGNMENT 8 diff --git a/source4/cluster/ctdb/tcp/tcp_connect.c b/source4/cluster/ctdb/tcp/tcp_connect.c new file mode 100644 index 0000000000..2404144ac1 --- /dev/null +++ b/source4/cluster/ctdb/tcp/tcp_connect.c @@ -0,0 +1,191 @@ +/* + ctdb over TCP + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "lib/tdb/include/tdb.h" +#include "system/network.h" +#include "system/filesys.h" +#include "cluster/ctdb/include/ctdb_private.h" +#include "ctdb_tcp.h" + +static void set_nonblocking(int fd) +{ + unsigned v; + v = fcntl(fd, F_GETFL, 0); + fcntl(fd, F_SETFL, v | O_NONBLOCK); +} + + +/* + called when socket becomes writeable on connect +*/ +static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private) +{ + struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); + struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_tcp_node); + struct ctdb_context *ctdb = node->ctdb; + int error = 0; + socklen_t len = sizeof(error); + + if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 || + error != 0) { + talloc_free(fde); + close(tnode->fd); + tnode->fd = -1; + event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0), + ctdb_tcp_node_connect, node); + return; + } + + talloc_free(fde); + tnode->fde = event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_READ, + ctdb_tcp_node_write, node); + + /* tell the ctdb layer we are connected */ + node->ctdb->upcalls->node_connected(node); + + if (tnode->queue) { + EVENT_FD_WRITEABLE(tnode->fde); + } +} + +/* + called when we should try and establish a tcp connection to a node +*/ +void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private) +{ + struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); + struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_tcp_node); + struct ctdb_context *ctdb = node->ctdb; + struct sockaddr_in sock_out; + + tnode->fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + + set_nonblocking(tnode->fd); + + inet_pton(AF_INET, node->address.address, &sock_out.sin_addr); + sock_out.sin_port = htons(node->address.port); + sock_out.sin_family = PF_INET; + + if (connect(tnode->fd, (struct sockaddr *)&sock_out, sizeof(sock_out)) != 0 && + errno != EINPROGRESS) { + /* try again once a second */ + close(tnode->fd); + event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0), + ctdb_tcp_node_connect, node); + return; + } + + /* non-blocking connect - wait for write event */ + event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_WRITE|EVENT_FD_READ, + ctdb_node_connect_write, node); +} + +/* + destroy a ctdb_incoming structure +*/ +static int ctdb_incoming_destructor(struct ctdb_incoming *in) +{ + close(in->fd); + in->fd = -1; + return 0; +} + +/* + called when we get contacted by another node + currently makes no attempt to check if the connection is really from a ctdb + node in our cluster +*/ +static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private) +{ + struct ctdb_context *ctdb; + struct ctdb_tcp *ctcp; + struct sockaddr_in addr; + socklen_t len; + int fd; + struct ctdb_incoming *in; + + ctdb = talloc_get_type(private, struct ctdb_context); + ctcp = talloc_get_type(ctdb->private, struct ctdb_tcp); + memset(&addr, 0, sizeof(addr)); + len = sizeof(addr); + fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len); + if (fd == -1) return; + + in = talloc_zero(ctdb, struct ctdb_incoming); + in->fd = fd; + in->ctdb = ctdb; + + set_nonblocking(in->fd); + + event_add_fd(ctdb->ev, in, in->fd, EVENT_FD_READ, + ctdb_tcp_incoming_read, in); + + talloc_set_destructor(in, ctdb_incoming_destructor); +} + + +/* + listen on our own address +*/ +int ctdb_tcp_listen(struct ctdb_context *ctdb) +{ + struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private, struct ctdb_tcp); + struct sockaddr_in sock; + int one = 1; + + sock.sin_port = htons(ctdb->address.port); + sock.sin_family = PF_INET; + inet_pton(AF_INET, ctdb->address.address, &sock.sin_addr); + + ctcp->listen_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + if (ctcp->listen_fd == -1) { + ctdb_set_error(ctdb, "socket failed\n"); + return -1; + } + + setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one)); + + if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sizeof(sock)) != 0) { + ctdb_set_error(ctdb, "bind failed\n"); + close(ctcp->listen_fd); + ctcp->listen_fd = -1; + return -1; + } + + if (listen(ctcp->listen_fd, 10) == -1) { + ctdb_set_error(ctdb, "listen failed\n"); + close(ctcp->listen_fd); + ctcp->listen_fd = -1; + return -1; + } + + event_add_fd(ctdb->ev, ctdb, ctcp->listen_fd, EVENT_FD_READ, + ctdb_listen_event, ctdb); + + return 0; +} + diff --git a/source4/cluster/ctdb/tcp/tcp_init.c b/source4/cluster/ctdb/tcp/tcp_init.c new file mode 100644 index 0000000000..b8ee8cb30e --- /dev/null +++ b/source4/cluster/ctdb/tcp/tcp_init.c @@ -0,0 +1,102 @@ +/* + ctdb over TCP + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/tdb/include/tdb.h" +#include "lib/events/events.h" +#include "system/network.h" +#include "system/filesys.h" +#include "cluster/ctdb/include/ctdb_private.h" +#include "ctdb_tcp.h" + +/* + start the protocol going +*/ +int ctdb_tcp_start(struct ctdb_context *ctdb) +{ + int i; + + /* listen on our own address */ + if (ctdb_tcp_listen(ctdb) != 0) return -1; + + /* startup connections to the other servers - will happen on + next event loop */ + for (i=0;i<ctdb->num_nodes;i++) { + struct ctdb_node *node = *(ctdb->nodes + i); + if (!(ctdb->flags & CTDB_FLAG_SELF_CONNECT) && + ctdb_same_address(&ctdb->address, &node->address)) continue; + event_add_timed(ctdb->ev, node, timeval_zero(), + ctdb_tcp_node_connect, node); + } + + return 0; +} + + +/* + initialise tcp portion of a ctdb node +*/ +int ctdb_tcp_add_node(struct ctdb_node *node) +{ + struct ctdb_tcp_node *tnode; + tnode = talloc_zero(node, struct ctdb_tcp_node); + CTDB_NO_MEMORY(node->ctdb, tnode); + + tnode->fd = -1; + node->private = tnode; + return 0; +} + + +/* + transport packet allocator - allows transport to control memory for packets +*/ +void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size) +{ + /* tcp transport needs to round to 8 byte alignment to ensure + that we can use a length header and 64 bit elements in + structures */ + size = (size+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1); + return talloc_size(ctdb, size); +} + + +static const struct ctdb_methods ctdb_tcp_methods = { + .start = ctdb_tcp_start, + .add_node = ctdb_tcp_add_node, + .queue_pkt = ctdb_tcp_queue_pkt, + .allocate_pkt = ctdb_tcp_allocate_pkt +}; + +/* + initialise tcp portion of ctdb +*/ +int ctdb_tcp_init(struct ctdb_context *ctdb) +{ + struct ctdb_tcp *ctcp; + ctcp = talloc_zero(ctdb, struct ctdb_tcp); + CTDB_NO_MEMORY(ctdb, ctcp); + + ctcp->listen_fd = -1; + ctdb->private = ctcp; + ctdb->methods = &ctdb_tcp_methods; + return 0; +} + diff --git a/source4/cluster/ctdb/tcp/tcp_io.c b/source4/cluster/ctdb/tcp/tcp_io.c new file mode 100644 index 0000000000..82e24f7260 --- /dev/null +++ b/source4/cluster/ctdb/tcp/tcp_io.c @@ -0,0 +1,254 @@ +/* + ctdb over TCP + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "lib/util/dlinklist.h" +#include "lib/tdb/include/tdb.h" +#include "system/network.h" +#include "system/filesys.h" +#include "cluster/ctdb/include/ctdb_private.h" +#include "ctdb_tcp.h" + + +/* + called when we fail to send a message to a node +*/ +static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private) +{ + struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); + struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_tcp_node); + + /* flush the queue */ + while (tnode->queue) { + struct ctdb_tcp_packet *pkt = tnode->queue; + DLIST_REMOVE(tnode->queue, pkt); + talloc_free(pkt); + } + + /* start a new connect cycle to try to re-establish the + link */ + talloc_free(tnode->fde); + close(tnode->fd); + tnode->fd = -1; + event_add_timed(node->ctdb->ev, node, timeval_zero(), + ctdb_tcp_node_connect, node); +} + +/* + called when socket becomes readable +*/ +void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private) +{ + struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); + struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_tcp_node); + if (flags & EVENT_FD_READ) { + /* getting a read event on this fd in the current tcp model is + always an error, as we have separate read and write + sockets. In future we may combine them, but for now it must + mean that the socket is dead, so we try to reconnect */ + talloc_free(tnode->fde); + close(tnode->fd); + tnode->fd = -1; + event_add_timed(node->ctdb->ev, node, timeval_zero(), + ctdb_tcp_node_connect, node); + return; + } + + while (tnode->queue) { + struct ctdb_tcp_packet *pkt = tnode->queue; + ssize_t n; + + n = write(tnode->fd, pkt->data, pkt->length); + + if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + event_add_timed(node->ctdb->ev, node, timeval_zero(), + ctdb_tcp_node_dead, node); + EVENT_FD_NOT_WRITEABLE(tnode->fde); + return; + } + if (n <= 0) return; + + if (n != pkt->length) { + pkt->length -= n; + pkt->data += n; + return; + } + + DLIST_REMOVE(tnode->queue, pkt); + talloc_free(pkt); + } + + EVENT_FD_NOT_WRITEABLE(tnode->fde); +} + + +/* + called when an incoming connection is readable +*/ +void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private) +{ + struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming); + int num_ready = 0; + ssize_t nread; + uint8_t *data, *data_base; + + if (ioctl(in->fd, FIONREAD, &num_ready) != 0 || + num_ready == 0) { + /* we've lost the link from another node. We don't + notify the upper layers, as we only want to trigger + a full node reorganisation when a send fails - that + allows nodes to restart without penalty as long as + the network is idle */ + talloc_free(in); + return; + } + + in->partial.data = talloc_realloc_size(in, in->partial.data, + num_ready + in->partial.length); + if (in->partial.data == NULL) { + /* not much we can do except drop the socket */ + talloc_free(in); + return; + } + + nread = read(in->fd, in->partial.data+in->partial.length, num_ready); + if (nread <= 0) { + /* the connection must be dead */ + talloc_free(in); + return; + } + + data = in->partial.data; + nread += in->partial.length; + + in->partial.data = NULL; + in->partial.length = 0; + + if (nread >= 4 && *(uint32_t *)data == nread) { + /* most common case - we got a whole packet in one go + tell the ctdb layer above that we have a packet */ + in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread); + return; + } + + data_base = data; + + while (nread >= 4 && *(uint32_t *)data <= nread) { + /* we have at least one packet */ + uint8_t *d2; + uint32_t len; + len = *(uint32_t *)data; + d2 = talloc_memdup(in, data, len); + if (d2 == NULL) { + /* sigh */ + talloc_free(in); + return; + } + in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len); + data += len; + nread -= len; + return; + } + + if (nread < 4 || *(uint32_t *)data > nread) { + /* we have only part of a packet */ + if (data_base == data) { + in->partial.data = data; + in->partial.length = nread; + } else { + in->partial.data = talloc_memdup(in, data, nread); + if (in->partial.data == NULL) { + talloc_free(in); + return; + } + in->partial.length = nread; + talloc_free(data_base); + } + return; + } + + talloc_free(data_base); +} + +/* + queue a packet for sending +*/ +int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length) +{ + struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_tcp_node); + struct ctdb_tcp_packet *pkt; + uint32_t length2; + + /* enforce the length and alignment rules from the tcp packet allocator */ + length2 = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1); + *(uint32_t *)data = length2; + + if (length2 != length) { + memset(data+length, 0, length2-length); + } + { + int i, fd = open("/dev/null", O_WRONLY); + for (i=0;i<length2;i++) { + write(fd, &data[i], 1); + } + close(fd); + } + + /* if the queue is empty then try an immediate write, avoiding + queue overhead. This relies on non-blocking sockets */ + if (tnode->queue == NULL && tnode->fd != -1) { + ssize_t n = write(tnode->fd, data, length2); + if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + event_add_timed(node->ctdb->ev, node, timeval_zero(), + ctdb_tcp_node_dead, node); + /* yes, we report success, as the dead node is + handled via a separate event */ + return 0; + } + if (n > 0) { + data += n; + length2 -= n; + } + if (length2 == 0) return 0; + } + + pkt = talloc(tnode, struct ctdb_tcp_packet); + CTDB_NO_MEMORY(node->ctdb, pkt); + + pkt->data = talloc_memdup(pkt, data, length2); + CTDB_NO_MEMORY(node->ctdb, pkt->data); + + pkt->length = length2; + + if (tnode->queue == NULL && tnode->fd != -1) { + EVENT_FD_WRITEABLE(tnode->fde); + } + + DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *); + + return 0; +} diff --git a/source4/cluster/ctdb/tests/ctdb_bench.c b/source4/cluster/ctdb/tests/ctdb_bench.c new file mode 100644 index 0000000000..01a8cc0f15 --- /dev/null +++ b/source4/cluster/ctdb/tests/ctdb_bench.c @@ -0,0 +1,228 @@ +/* + simple ctdb benchmark + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "system/filesys.h" +#include "popt.h" + +#include <sys/time.h> +#include <time.h> + +static struct timeval tp1,tp2; + +static void start_timer(void) +{ + gettimeofday(&tp1,NULL); +} + +static double end_timer(void) +{ + gettimeofday(&tp2,NULL); + return (tp2.tv_sec + (tp2.tv_usec*1.0e-6)) - + (tp1.tv_sec + (tp1.tv_usec*1.0e-6)); +} + + +static int timelimit = 10; +static int num_records = 10; +static int num_repeats = 100; + +enum my_functions {FUNC_INCR=1, FUNC_FETCH=2}; + +/* + ctdb call function to increment an integer +*/ +static int incr_func(struct ctdb_call *call) +{ + if (call->record_data.dsize == 0) { + call->new_data = talloc(call, TDB_DATA); + if (call->new_data == NULL) { + return CTDB_ERR_NOMEM; + } + call->new_data->dptr = talloc_size(call, 4); + call->new_data->dsize = 4; + *(uint32_t *)call->new_data->dptr = 0; + } else { + call->new_data = &call->record_data; + } + (*(uint32_t *)call->new_data->dptr)++; + return 0; +} + +/* + ctdb call function to fetch a record +*/ +static int fetch_func(struct ctdb_call *call) +{ + call->reply_data = &call->record_data; + return 0; +} + +/* + benchmark incrementing an integer +*/ +static void bench_incr(struct ctdb_context *ctdb) +{ + TDB_DATA key, data; + int loops=0; + int ret, i; + + start_timer(); + + while (1) { + uint32_t v = loops % num_records; + key.dptr = &v; + key.dsize = 4; + for (i=0;i<num_repeats;i++) { + ret = ctdb_call(ctdb, key, FUNC_INCR, NULL, NULL); + if (ret != 0) { + printf("incr call failed - %s\n", ctdb_errstr(ctdb)); + return; + } + } + if (num_repeats * (++loops) % 10000 == 0) { + if (end_timer() > timelimit) break; + printf("Incr: %.2f ops/sec\r", num_repeats*loops/end_timer()); + fflush(stdout); + } + } + + ret = ctdb_call(ctdb, key, FUNC_FETCH, NULL, &data); + if (ret == -1) { + printf("ctdb_call FUNC_FETCH failed - %s\n", ctdb_errstr(ctdb)); + return; + } + + printf("Incr: %.2f ops/sec (loops=%d val=%d)\n", + num_repeats*loops/end_timer(), loops, *(uint32_t *)data.dptr); +} + +/* + main program +*/ +int main(int argc, const char *argv[]) +{ + struct ctdb_context *ctdb; + const char *nlist = NULL; + const char *transport = "tcp"; + const char *myaddress = NULL; + int self_connect=0; + + struct poptOption popt_options[] = { + POPT_AUTOHELP + { "nlist", 0, POPT_ARG_STRING, &nlist, 0, "node list file", "filename" }, + { "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" }, + { "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL }, + { "self-connect", 0, POPT_ARG_NONE, &self_connect, 0, "enable self connect", "boolean" }, + { "timelimit", 't', POPT_ARG_INT, &timelimit, 0, "timelimit", "integer" }, + { "num-records", 'r', POPT_ARG_INT, &num_records, 0, "num_records", "integer" }, + POPT_TABLEEND + }; + int opt; + const char **extra_argv; + int extra_argc = 0; + int ret; + poptContext pc; + struct event_context *ev; + + pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST); + + while ((opt = poptGetNextOpt(pc)) != -1) { + switch (opt) { + default: + fprintf(stderr, "Invalid option %s: %s\n", + poptBadOption(pc, 0), poptStrerror(opt)); + exit(1); + } + } + + /* setup the remaining options for the main program to use */ + extra_argv = poptGetArgs(pc); + if (extra_argv) { + extra_argv++; + while (extra_argv[extra_argc]) extra_argc++; + } + + if (nlist == NULL || myaddress == NULL) { + printf("You must provide a node list with --nlist and an address with --listen\n"); + exit(1); + } + + ev = event_context_init(NULL); + + /* initialise ctdb */ + ctdb = ctdb_init(ev); + if (ctdb == NULL) { + printf("Failed to init ctdb\n"); + exit(1); + } + + if (self_connect) { + ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT); + } + + ret = ctdb_set_transport(ctdb, transport); + if (ret == -1) { + printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* tell ctdb what address to listen on */ + ret = ctdb_set_address(ctdb, myaddress); + if (ret == -1) { + printf("ctdb_set_address failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* tell ctdb what nodes are available */ + ret = ctdb_set_nlist(ctdb, nlist); + if (ret == -1) { + printf("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* setup a ctdb call function */ + ret = ctdb_set_call(ctdb, incr_func, FUNC_INCR); + ret = ctdb_set_call(ctdb, fetch_func, FUNC_FETCH); + + /* attach to a specific database */ + ret = ctdb_attach(ctdb, "test.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666); + if (ret == -1) { + printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* start the protocol running */ + ret = ctdb_start(ctdb); + + /* wait until all nodes are connected (should not be needed + outside of test code) */ + ctdb_connect_wait(ctdb); + + bench_incr(ctdb); + + /* go into a wait loop to allow other nodes to complete */ + ctdb_wait_loop(ctdb); + + /* shut it down */ + talloc_free(ctdb); + return 0; +} diff --git a/source4/cluster/ctdb/tests/ctdb_test.c b/source4/cluster/ctdb/tests/ctdb_test.c new file mode 100644 index 0000000000..5bc35ad332 --- /dev/null +++ b/source4/cluster/ctdb/tests/ctdb_test.c @@ -0,0 +1,207 @@ +/* + ctdb test harness + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "system/filesys.h" +#include "popt.h" + +enum my_functions {FUNC_SORT=1, FUNC_FETCH=2}; + +static int int_compare(int *i1, int *i2) +{ + return *i1 - *i2; +} + +/* + add an integer into a record in sorted order +*/ +static int sort_func(struct ctdb_call *call) +{ + if (call->call_data == NULL || + call->call_data->dsize != sizeof(int)) { + return CTDB_ERR_INVALID; + } + call->new_data = talloc(call, TDB_DATA); + if (call->new_data == NULL) { + return CTDB_ERR_NOMEM; + } + call->new_data->dptr = talloc_size(call, + call->record_data.dsize + + call->call_data->dsize); + if (call->new_data->dptr == NULL) { + return CTDB_ERR_NOMEM; + } + call->new_data->dsize = call->record_data.dsize + call->call_data->dsize; + memcpy(call->new_data->dptr, + call->record_data.dptr, call->record_data.dsize); + memcpy(call->new_data->dptr+call->record_data.dsize, + call->call_data->dptr, call->call_data->dsize); + + qsort(call->new_data->dptr, call->new_data->dsize / sizeof(int), + sizeof(int), (comparison_fn_t)int_compare); + + return 0; +} + +/* + ctdb call function to fetch a record +*/ +static int fetch_func(struct ctdb_call *call) +{ + call->reply_data = &call->record_data; + return 0; +} + +/* + main program +*/ +int main(int argc, const char *argv[]) +{ + struct ctdb_context *ctdb; + const char *nlist = NULL; + const char *transport = "tcp"; + const char *myaddress = NULL; + int self_connect=0; + + struct poptOption popt_options[] = { + POPT_AUTOHELP + { "nlist", 0, POPT_ARG_STRING, &nlist, 0, "node list file", "filename" }, + { "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" }, + { "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL }, + { "self-connect", 0, POPT_ARG_NONE, &self_connect, 0, "enable self connect", "boolean" }, + POPT_TABLEEND + }; + int opt; + const char **extra_argv; + int extra_argc = 0; + int i, ret; + TDB_DATA key, data; + poptContext pc; + struct event_context *ev; + + pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST); + + while ((opt = poptGetNextOpt(pc)) != -1) { + switch (opt) { + default: + fprintf(stderr, "Invalid option %s: %s\n", + poptBadOption(pc, 0), poptStrerror(opt)); + exit(1); + } + } + + /* setup the remaining options for the main program to use */ + extra_argv = poptGetArgs(pc); + if (extra_argv) { + extra_argv++; + while (extra_argv[extra_argc]) extra_argc++; + } + + if (nlist == NULL || myaddress == NULL) { + printf("You must provide a node list with --nlist and an address with --listen\n"); + exit(1); + } + + ev = event_context_init(NULL); + + /* initialise ctdb */ + ctdb = ctdb_init(ev); + if (ctdb == NULL) { + printf("Failed to init ctdb\n"); + exit(1); + } + + if (self_connect) { + ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT); + } + + ret = ctdb_set_transport(ctdb, transport); + if (ret == -1) { + printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* tell ctdb what address to listen on */ + ret = ctdb_set_address(ctdb, myaddress); + if (ret == -1) { + printf("ctdb_set_address failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* tell ctdb what nodes are available */ + ret = ctdb_set_nlist(ctdb, nlist); + if (ret == -1) { + printf("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* setup a ctdb call function */ + ret = ctdb_set_call(ctdb, sort_func, FUNC_SORT); + ret = ctdb_set_call(ctdb, fetch_func, FUNC_FETCH); + + /* attach to a specific database */ + ret = ctdb_attach(ctdb, "test.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666); + if (ret == -1) { + printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* start the protocol running */ + ret = ctdb_start(ctdb); + + /* wait until all nodes are connected (should not be needed + outide of test code) */ + ctdb_connect_wait(ctdb); + + key.dptr = "test"; + key.dsize = strlen("test")+1; + + /* add some random data */ + for (i=0;i<10;i++) { + int v = random() % 1000; + data.dptr = (uint8_t *)&v; + data.dsize = sizeof(v); + ret = ctdb_call(ctdb, key, FUNC_SORT, &data, NULL); + if (ret == -1) { + printf("ctdb_call FUNC_SORT failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + } + + /* fetch the record */ + ret = ctdb_call(ctdb, key, FUNC_FETCH, NULL, &data); + if (ret == -1) { + printf("ctdb_call FUNC_FETCH failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + for (i=0;i<data.dsize/sizeof(int);i++) { + printf("%3d\n", ((int *)data.dptr)[i]); + } + talloc_free(data.dptr); + + /* go into a wait loop to allow other nodes to complete */ + ctdb_wait_loop(ctdb); + + /* shut it down */ + talloc_free(ctdb); + return 0; +} |