diff options
Diffstat (limited to 'source4/cluster/ctdb/direct')
-rw-r--r-- | source4/cluster/ctdb/direct/README | 12 | ||||
-rw-r--r-- | source4/cluster/ctdb/direct/ctdbd.c | 157 | ||||
-rwxr-xr-x | source4/cluster/ctdb/direct/ctdbd.sh | 8 | ||||
-rw-r--r-- | source4/cluster/ctdb/direct/ctdbd_test.c | 364 | ||||
-rw-r--r-- | source4/cluster/ctdb/direct/nodes.txt | 2 |
5 files changed, 543 insertions, 0 deletions
diff --git a/source4/cluster/ctdb/direct/README b/source4/cluster/ctdb/direct/README new file mode 100644 index 0000000000..bc87d060cb --- /dev/null +++ b/source4/cluster/ctdb/direct/README @@ -0,0 +1,12 @@ +Run ./direct/ctdbd.sh to start a cluster with two ctdb nodes +They will listen for clients on the unix domain sockets +/tmp/ctdb.socket.127.0.0.1 +/tmp/ctdb.socket.127.0.0.2 + +In order for this to work you must have an interface with the address 127.0.0.2 available. +Just create this as an alias for loopback. + + +Then run ./direct/ctdbd_test to connect a client to the ctdbd daemon on /tmp/ctdb.socket.127.0.0.1 and do some commands to it across the domain socket. + + diff --git a/source4/cluster/ctdb/direct/ctdbd.c b/source4/cluster/ctdb/direct/ctdbd.c new file mode 100644 index 0000000000..700416e5e9 --- /dev/null +++ b/source4/cluster/ctdb/direct/ctdbd.c @@ -0,0 +1,157 @@ +/* + standalone ctdb daemon + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "system/filesys.h" +#include "popt.h" +#include "system/wait.h" + +static void block_signal(int signum) +{ + struct sigaction act; + + memset(&act, 0, sizeof(act)); + + act.sa_handler = SIG_IGN; + sigemptyset(&act.sa_mask); + sigaddset(&act.sa_mask, signum); + sigaction(signum, &act, NULL); +} + + +/* + main program +*/ +int main(int argc, const char *argv[]) +{ + struct ctdb_context *ctdb; + const char *nlist = NULL; + const char *transport = "tcp"; + const char *myaddress = NULL; + int self_connect=0; + int daemon_mode=0; + const char *db_list = "test.tdb"; + char *s, *tok; + + struct poptOption popt_options[] = { + POPT_AUTOHELP + { "nlist", 0, POPT_ARG_STRING, &nlist, 0, "node list file", "filename" }, + { "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" }, + { "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL }, + { "self-connect", 0, POPT_ARG_NONE, &self_connect, 0, "enable self connect", "boolean" }, + { "daemon", 0, POPT_ARG_NONE, &daemon_mode, 0, "spawn a ctdb daemon", "boolean" }, + { "dblist", 0, POPT_ARG_STRING, &db_list, 0, "list of databases", NULL }, + POPT_TABLEEND + }; + int opt; + const char **extra_argv; + int extra_argc = 0; + int ret; + poptContext pc; + struct event_context *ev; + + pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST); + + while ((opt = poptGetNextOpt(pc)) != -1) { + switch (opt) { + default: + fprintf(stderr, "Invalid option %s: %s\n", + poptBadOption(pc, 0), poptStrerror(opt)); + exit(1); + } + } + + /* setup the remaining options for the main program to use */ + extra_argv = poptGetArgs(pc); + if (extra_argv) { + extra_argv++; + while (extra_argv[extra_argc]) extra_argc++; + } + + if (nlist == NULL || myaddress == NULL) { + printf("You must provide a node list with --nlist and an address with --listen\n"); + exit(1); + } + + block_signal(SIGPIPE); + + ev = event_context_init(NULL); + + /* initialise ctdb */ + ctdb = ctdb_init(ev); + if (ctdb == NULL) { + printf("Failed to init ctdb\n"); + exit(1); + } + + if (self_connect) { + ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT); + } + if (daemon_mode) { + ctdb_set_flags(ctdb, CTDB_FLAG_DAEMON_MODE); + } + + ret = ctdb_set_transport(ctdb, transport); + if (ret == -1) { + printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* tell ctdb what address to listen on */ + ret = ctdb_set_address(ctdb, myaddress); + if (ret == -1) { + printf("ctdb_set_address failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* tell ctdb what nodes are available */ + ret = ctdb_set_nlist(ctdb, nlist); + if (ret == -1) { + printf("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* attach to the list of databases */ + s = talloc_strdup(ctdb, db_list); + for (tok=strtok(s, ", "); tok; tok=strtok(NULL, ", ")) { + struct ctdb_db_context *ctdb_db; + ctdb_db = ctdb_attach(ctdb, tok, TDB_DEFAULT, + O_RDWR|O_CREAT|O_TRUNC, 0666); + if (!ctdb_db) { + printf("ctdb_attach to '%s'failed - %s\n", tok, + ctdb_errstr(ctdb)); + exit(1); + } + printf("Attached to database '%s'\n", tok); + } + + /* start the protocol running */ + ret = ctdb_start(ctdb); + +/* event_loop_wait(ev);*/ + while (1) { + event_loop_once(ev); + } + + /* shut it down */ + talloc_free(ev); + return 0; +} diff --git a/source4/cluster/ctdb/direct/ctdbd.sh b/source4/cluster/ctdb/direct/ctdbd.sh new file mode 100755 index 0000000000..366226260b --- /dev/null +++ b/source4/cluster/ctdb/direct/ctdbd.sh @@ -0,0 +1,8 @@ +#!/bin/sh + +killall -q ctdbd + +echo "Starting 2 ctdb daemons" +bin/ctdbd --nlist direct/nodes.txt --listen 127.0.0.2:9001 --daemon & +bin/ctdbd --nlist direct/nodes.txt --listen 127.0.0.1:9001 --daemon & + diff --git a/source4/cluster/ctdb/direct/ctdbd_test.c b/source4/cluster/ctdb/direct/ctdbd_test.c new file mode 100644 index 0000000000..019cdad30d --- /dev/null +++ b/source4/cluster/ctdb/direct/ctdbd_test.c @@ -0,0 +1,364 @@ +/* + test of messaging + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "system/network.h" +#include "../include/ctdb.h" +#include "../include/ctdb_private.h" + +#define CTDB_SOCKET "/tmp/ctdb.socket.127.0.0.1" + + +/* + connect to the unix domain socket +*/ +static int ux_socket_connect(const char *name) +{ + struct sockaddr_un addr; + int fd; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, name, sizeof(addr.sun_path)); + + fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd == -1) { + return -1; + } + + if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + close(fd); + return -1; + } + + return fd; +} + +void register_pid_with_daemon(int fd, int pid) +{ + struct ctdb_req_register r; + + bzero(&r, sizeof(r)); + r.hdr.length = sizeof(r); + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.operation = CTDB_REQ_REGISTER; + r.srvid = pid; + + /* XXX must deal with partial writes here */ + write(fd, &r, sizeof(r)); +} + +/* send a command to the cluster to wait until all nodes are connected + and the cluster is fully operational + */ +int wait_for_cluster(int fd) +{ + struct ctdb_req_connect_wait req; + struct ctdb_reply_connect_wait rep; + int cnt, tot; + + /* send a connect wait command to the local node */ + bzero(&req, sizeof(req)); + req.hdr.length = sizeof(req); + req.hdr.ctdb_magic = CTDB_MAGIC; + req.hdr.ctdb_version = CTDB_VERSION; + req.hdr.operation = CTDB_REQ_CONNECT_WAIT; + + /* XXX must deal with partial writes here */ + write(fd, &req, sizeof(req)); + + + /* read the 4 bytes of length for the pdu */ + cnt=0; + tot=4; + while(cnt!=tot){ + int numread; + numread=read(fd, ((char *)&rep)+cnt, tot-cnt); + if(numread>0){ + cnt+=numread; + } + } + /* read the rest of the pdu */ + tot=rep.hdr.length; + while(cnt!=tot){ + int numread; + numread=read(fd, ((char *)&rep)+cnt, tot-cnt); + if(numread>0){ + cnt+=numread; + } + } + + return rep.vnn; +} + + +int send_a_message(int fd, int ourvnn, int vnn, int pid, TDB_DATA data) +{ + struct ctdb_req_message r; + int len, cnt; + + len = offsetof(struct ctdb_req_message, data) + data.dsize; + r.hdr.length = len; + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.operation = CTDB_REQ_MESSAGE; + r.hdr.destnode = vnn; + r.hdr.srcnode = ourvnn; + r.hdr.reqid = 0; + r.srvid = pid; + r.datalen = data.dsize; + + /* write header */ + cnt=write(fd, &r, offsetof(struct ctdb_req_message, data)); + /* write data */ + if(data.dsize){ + cnt=write(fd, data.dptr, data.dsize); + } + return 0; +} + +int receive_a_message(int fd, struct ctdb_req_message **preply) +{ + int cnt,tot; + struct ctdb_req_message *rep; + uint32_t length; + + /* read the 4 bytes of length for the pdu */ + cnt=0; + tot=4; + while(cnt!=tot){ + int numread; + numread=read(fd, ((char *)&length)+cnt, tot-cnt); + if(numread>0){ + cnt+=numread; + } + } + + /* read the rest of the pdu */ + rep = malloc(length); + rep->hdr.length = length; + cnt = 0; + tot = length-4; + while(cnt!=tot){ + int numread; + numread=read(fd, ((char *)rep)+cnt, tot-cnt); + if(numread>0){ + cnt+=numread; + } + } + + *preply = rep; + return 0; +} + +/* + hash function for mapping data to a VNN - taken from tdb +*/ +uint32_t ctdb_hash(const TDB_DATA *key) +{ + uint32_t value; /* Used to compute the hash value. */ + uint32_t i; /* Used to cycle through random values. */ + + /* Set the initial value from the key size. */ + for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++) + value = (value + (key->dptr[i] << (i*5 % 24))); + + return (1103515243 * value + 12345); +} + +void fetch_lock(int fd, uint32_t db_id, TDB_DATA key) +{ + struct ctdb_req_fetch_lock *req; + struct ctdb_reply_fetch_lock *rep; + uint32_t length; + int len, cnt, tot; + + len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize; + req = malloc(len); + + req->hdr.length = len; + req->hdr.ctdb_magic = CTDB_MAGIC; + req->hdr.ctdb_version = CTDB_VERSION; + req->hdr.operation = CTDB_REQ_FETCH_LOCK; + req->hdr.reqid = 1; + req->db_id = db_id; + req->keylen = key.dsize; + memcpy(&req->key[0], key.dptr, key.dsize); + + cnt=write(fd, req, len); + + + /* wait fot the reply */ + /* read the 4 bytes of length for the pdu */ + cnt=0; + tot=4; + while(cnt!=tot){ + int numread; + numread=read(fd, ((char *)&length)+cnt, tot-cnt); + if(numread>0){ + cnt+=numread; + } + } + /* read the rest of the pdu */ + rep = malloc(length); + tot=length; + while(cnt!=tot){ + int numread; + numread=read(fd, ((char *)rep)+cnt, tot-cnt); + if(numread>0){ + cnt+=numread; + } + } + printf("fetch lock reply: state:%d datalen:%d\n",rep->state,rep->datalen); + if(!rep->datalen){ + printf("no data\n"); + } else { + printf("data:[%s]\n",rep->data); + } + +} + +void store_unlock(int fd, uint32_t db_id, TDB_DATA key, TDB_DATA data) +{ + struct ctdb_req_store_unlock *req; + struct ctdb_reply_store_unlock *rep; + uint32_t length; + int len, cnt, tot; + + len = offsetof(struct ctdb_req_store_unlock, data) + key.dsize + data.dsize; + req = malloc(len); + + req->hdr.length = len; + req->hdr.ctdb_magic = CTDB_MAGIC; + req->hdr.ctdb_version = CTDB_VERSION; + req->hdr.operation = CTDB_REQ_STORE_UNLOCK; + req->hdr.reqid = 1; + req->db_id = db_id; + req->keylen = key.dsize; + req->datalen = data.dsize; + memcpy(&req->data[0], key.dptr, key.dsize); + memcpy(&req->data[key.dsize], data.dptr, data.dsize); + + cnt=write(fd, req, len); + + + /* wait fot the reply */ + /* read the 4 bytes of length for the pdu */ + cnt=0; + tot=4; + while(cnt!=tot){ + int numread; + numread=read(fd, ((char *)&length)+cnt, tot-cnt); + if(numread>0){ + cnt+=numread; + } + } + /* read the rest of the pdu */ + rep = malloc(length); + tot=length; + while(cnt!=tot){ + int numread; + numread=read(fd, ((char *)rep)+cnt, tot-cnt); + if(numread>0){ + cnt+=numread; + } + } + printf("store unlock reply: state:%d\n",rep->state); +} + +int main(int argc, const char *argv[]) +{ + int fd, pid, vnn, dstvnn, dstpid; + TDB_DATA message; + struct ctdb_req_message *reply; + TDB_DATA dbname; + uint32_t db_id; + TDB_DATA key, data; + char str[256]; + + /* open the socket to talk to the local ctdb daemon */ + fd=ux_socket_connect(CTDB_SOCKET); + if (fd==-1) { + printf("failed to open domain socket\n"); + exit(10); + } + + + /* register our local server id with the daemon so that it knows + where to send messages addressed to our local pid. + */ + pid=getpid(); + register_pid_with_daemon(fd, pid); + + + /* do a connect wait to ensure that all nodes in the cluster are up + and operational. + this also tells us the vnn of the local cluster. + If someone wants to send us a emssage they should send it to + this vnn and our pid + */ + vnn=wait_for_cluster(fd); + printf("our address is vnn:%d pid:%d if someone wants to send us a message!\n",vnn,pid); + + + /* send a message to ourself */ + dstvnn=vnn; + dstpid=pid; + message.dptr=discard_const("Test message"); + message.dsize=strlen((const char *)message.dptr)+1; + printf("sending test message [%s] to ourself\n", message.dptr); + send_a_message(fd, vnn, dstvnn, dstpid, message); + + /* wait for the message to come back */ + receive_a_message(fd, &reply); + printf("received message: [%s]\n",&reply->data[0]); + + /* create the db id for "test.tdb" */ + dbname.dptr = discard_const("test.tdb"); + dbname.dsize = strlen((const char *)(dbname.dptr)); + db_id = ctdb_hash(&dbname); + printf("the has for the database id is 0x%08x\n",db_id); + printf("\n"); + + /* send a fetch lock */ + key.dptr=discard_const("TestKey"); + key.dsize=strlen((const char *)(key.dptr)); + printf("fetch the test key:[%s]\n",key.dptr); + fetch_lock(fd, db_id, key); + printf("\n"); + + + /* send a store unlock */ + sprintf(str,"TestData_%d",getpid()); + data.dptr=discard_const(str); + data.dsize=strlen((const char *)(data.dptr)); + printf("store new data==[%s] for this record\n",data.dptr); + store_unlock(fd, db_id, key, data); + printf("\n"); + + /* send a fetch lock */ + printf("fetch the test key:[%s]\n",key.dptr); + fetch_lock(fd, db_id, key); + printf("\n"); + + + return 0; +} diff --git a/source4/cluster/ctdb/direct/nodes.txt b/source4/cluster/ctdb/direct/nodes.txt new file mode 100644 index 0000000000..e1198b59ac --- /dev/null +++ b/source4/cluster/ctdb/direct/nodes.txt @@ -0,0 +1,2 @@ +127.0.0.1:9001 +127.0.0.2:9001 |