summaryrefslogtreecommitdiff
path: root/source4/cluster/ctdb/direct
diff options
context:
space:
mode:
authorAndrew Tridgell <tridge@samba.org>2007-04-16 00:18:54 +0000
committerGerald (Jerry) Carter <jerry@samba.org>2007-10-10 14:50:40 -0500
commitc9f04d8648cfdd573d45d47467bc964ef01f754d (patch)
tree115acf98b7b136f07dd8b16bbd50c9f7cbcdd3bb /source4/cluster/ctdb/direct
parentbb36705c8d360a2ba865a3d8118c52afa1e46f4e (diff)
downloadsamba-c9f04d8648cfdd573d45d47467bc964ef01f754d.tar.gz
samba-c9f04d8648cfdd573d45d47467bc964ef01f754d.tar.bz2
samba-c9f04d8648cfdd573d45d47467bc964ef01f754d.zip
r22231: merge from bzr ctdb tree
(This used to be commit 807b959082d3b9a929c9f6597714e636638a940e)
Diffstat (limited to 'source4/cluster/ctdb/direct')
-rw-r--r--source4/cluster/ctdb/direct/README12
-rw-r--r--source4/cluster/ctdb/direct/ctdbd.c157
-rwxr-xr-xsource4/cluster/ctdb/direct/ctdbd.sh8
-rw-r--r--source4/cluster/ctdb/direct/ctdbd_test.c364
-rw-r--r--source4/cluster/ctdb/direct/nodes.txt2
5 files changed, 543 insertions, 0 deletions
diff --git a/source4/cluster/ctdb/direct/README b/source4/cluster/ctdb/direct/README
new file mode 100644
index 0000000000..bc87d060cb
--- /dev/null
+++ b/source4/cluster/ctdb/direct/README
@@ -0,0 +1,12 @@
+Run ./direct/ctdbd.sh to start a cluster with two ctdb nodes
+They will listen for clients on the unix domain sockets
+/tmp/ctdb.socket.127.0.0.1
+/tmp/ctdb.socket.127.0.0.2
+
+In order for this to work you must have an interface with the address 127.0.0.2 available.
+Just create this as an alias for loopback.
+
+
+Then run ./direct/ctdbd_test to connect a client to the ctdbd daemon on /tmp/ctdb.socket.127.0.0.1 and do some commands to it across the domain socket.
+
+
diff --git a/source4/cluster/ctdb/direct/ctdbd.c b/source4/cluster/ctdb/direct/ctdbd.c
new file mode 100644
index 0000000000..700416e5e9
--- /dev/null
+++ b/source4/cluster/ctdb/direct/ctdbd.c
@@ -0,0 +1,157 @@
+/*
+ standalone ctdb daemon
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/filesys.h"
+#include "popt.h"
+#include "system/wait.h"
+
+static void block_signal(int signum)
+{
+ struct sigaction act;
+
+ memset(&act, 0, sizeof(act));
+
+ act.sa_handler = SIG_IGN;
+ sigemptyset(&act.sa_mask);
+ sigaddset(&act.sa_mask, signum);
+ sigaction(signum, &act, NULL);
+}
+
+
+/*
+ main program
+*/
+int main(int argc, const char *argv[])
+{
+ struct ctdb_context *ctdb;
+ const char *nlist = NULL;
+ const char *transport = "tcp";
+ const char *myaddress = NULL;
+ int self_connect=0;
+ int daemon_mode=0;
+ const char *db_list = "test.tdb";
+ char *s, *tok;
+
+ struct poptOption popt_options[] = {
+ POPT_AUTOHELP
+ { "nlist", 0, POPT_ARG_STRING, &nlist, 0, "node list file", "filename" },
+ { "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" },
+ { "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL },
+ { "self-connect", 0, POPT_ARG_NONE, &self_connect, 0, "enable self connect", "boolean" },
+ { "daemon", 0, POPT_ARG_NONE, &daemon_mode, 0, "spawn a ctdb daemon", "boolean" },
+ { "dblist", 0, POPT_ARG_STRING, &db_list, 0, "list of databases", NULL },
+ POPT_TABLEEND
+ };
+ int opt;
+ const char **extra_argv;
+ int extra_argc = 0;
+ int ret;
+ poptContext pc;
+ struct event_context *ev;
+
+ pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
+
+ while ((opt = poptGetNextOpt(pc)) != -1) {
+ switch (opt) {
+ default:
+ fprintf(stderr, "Invalid option %s: %s\n",
+ poptBadOption(pc, 0), poptStrerror(opt));
+ exit(1);
+ }
+ }
+
+ /* setup the remaining options for the main program to use */
+ extra_argv = poptGetArgs(pc);
+ if (extra_argv) {
+ extra_argv++;
+ while (extra_argv[extra_argc]) extra_argc++;
+ }
+
+ if (nlist == NULL || myaddress == NULL) {
+ printf("You must provide a node list with --nlist and an address with --listen\n");
+ exit(1);
+ }
+
+ block_signal(SIGPIPE);
+
+ ev = event_context_init(NULL);
+
+ /* initialise ctdb */
+ ctdb = ctdb_init(ev);
+ if (ctdb == NULL) {
+ printf("Failed to init ctdb\n");
+ exit(1);
+ }
+
+ if (self_connect) {
+ ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT);
+ }
+ if (daemon_mode) {
+ ctdb_set_flags(ctdb, CTDB_FLAG_DAEMON_MODE);
+ }
+
+ ret = ctdb_set_transport(ctdb, transport);
+ if (ret == -1) {
+ printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb));
+ exit(1);
+ }
+
+ /* tell ctdb what address to listen on */
+ ret = ctdb_set_address(ctdb, myaddress);
+ if (ret == -1) {
+ printf("ctdb_set_address failed - %s\n", ctdb_errstr(ctdb));
+ exit(1);
+ }
+
+ /* tell ctdb what nodes are available */
+ ret = ctdb_set_nlist(ctdb, nlist);
+ if (ret == -1) {
+ printf("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb));
+ exit(1);
+ }
+
+ /* attach to the list of databases */
+ s = talloc_strdup(ctdb, db_list);
+ for (tok=strtok(s, ", "); tok; tok=strtok(NULL, ", ")) {
+ struct ctdb_db_context *ctdb_db;
+ ctdb_db = ctdb_attach(ctdb, tok, TDB_DEFAULT,
+ O_RDWR|O_CREAT|O_TRUNC, 0666);
+ if (!ctdb_db) {
+ printf("ctdb_attach to '%s'failed - %s\n", tok,
+ ctdb_errstr(ctdb));
+ exit(1);
+ }
+ printf("Attached to database '%s'\n", tok);
+ }
+
+ /* start the protocol running */
+ ret = ctdb_start(ctdb);
+
+/* event_loop_wait(ev);*/
+ while (1) {
+ event_loop_once(ev);
+ }
+
+ /* shut it down */
+ talloc_free(ev);
+ return 0;
+}
diff --git a/source4/cluster/ctdb/direct/ctdbd.sh b/source4/cluster/ctdb/direct/ctdbd.sh
new file mode 100755
index 0000000000..366226260b
--- /dev/null
+++ b/source4/cluster/ctdb/direct/ctdbd.sh
@@ -0,0 +1,8 @@
+#!/bin/sh
+
+killall -q ctdbd
+
+echo "Starting 2 ctdb daemons"
+bin/ctdbd --nlist direct/nodes.txt --listen 127.0.0.2:9001 --daemon &
+bin/ctdbd --nlist direct/nodes.txt --listen 127.0.0.1:9001 --daemon &
+
diff --git a/source4/cluster/ctdb/direct/ctdbd_test.c b/source4/cluster/ctdb/direct/ctdbd_test.c
new file mode 100644
index 0000000000..019cdad30d
--- /dev/null
+++ b/source4/cluster/ctdb/direct/ctdbd_test.c
@@ -0,0 +1,364 @@
+/*
+ test of messaging
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#include "includes.h"
+#include "system/network.h"
+#include "../include/ctdb.h"
+#include "../include/ctdb_private.h"
+
+#define CTDB_SOCKET "/tmp/ctdb.socket.127.0.0.1"
+
+
+/*
+ connect to the unix domain socket
+*/
+static int ux_socket_connect(const char *name)
+{
+ struct sockaddr_un addr;
+ int fd;
+
+ memset(&addr, 0, sizeof(addr));
+ addr.sun_family = AF_UNIX;
+ strncpy(addr.sun_path, name, sizeof(addr.sun_path));
+
+ fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (fd == -1) {
+ return -1;
+ }
+
+ if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
+ close(fd);
+ return -1;
+ }
+
+ return fd;
+}
+
+void register_pid_with_daemon(int fd, int pid)
+{
+ struct ctdb_req_register r;
+
+ bzero(&r, sizeof(r));
+ r.hdr.length = sizeof(r);
+ r.hdr.ctdb_magic = CTDB_MAGIC;
+ r.hdr.ctdb_version = CTDB_VERSION;
+ r.hdr.operation = CTDB_REQ_REGISTER;
+ r.srvid = pid;
+
+ /* XXX must deal with partial writes here */
+ write(fd, &r, sizeof(r));
+}
+
+/* send a command to the cluster to wait until all nodes are connected
+ and the cluster is fully operational
+ */
+int wait_for_cluster(int fd)
+{
+ struct ctdb_req_connect_wait req;
+ struct ctdb_reply_connect_wait rep;
+ int cnt, tot;
+
+ /* send a connect wait command to the local node */
+ bzero(&req, sizeof(req));
+ req.hdr.length = sizeof(req);
+ req.hdr.ctdb_magic = CTDB_MAGIC;
+ req.hdr.ctdb_version = CTDB_VERSION;
+ req.hdr.operation = CTDB_REQ_CONNECT_WAIT;
+
+ /* XXX must deal with partial writes here */
+ write(fd, &req, sizeof(req));
+
+
+ /* read the 4 bytes of length for the pdu */
+ cnt=0;
+ tot=4;
+ while(cnt!=tot){
+ int numread;
+ numread=read(fd, ((char *)&rep)+cnt, tot-cnt);
+ if(numread>0){
+ cnt+=numread;
+ }
+ }
+ /* read the rest of the pdu */
+ tot=rep.hdr.length;
+ while(cnt!=tot){
+ int numread;
+ numread=read(fd, ((char *)&rep)+cnt, tot-cnt);
+ if(numread>0){
+ cnt+=numread;
+ }
+ }
+
+ return rep.vnn;
+}
+
+
+int send_a_message(int fd, int ourvnn, int vnn, int pid, TDB_DATA data)
+{
+ struct ctdb_req_message r;
+ int len, cnt;
+
+ len = offsetof(struct ctdb_req_message, data) + data.dsize;
+ r.hdr.length = len;
+ r.hdr.ctdb_magic = CTDB_MAGIC;
+ r.hdr.ctdb_version = CTDB_VERSION;
+ r.hdr.operation = CTDB_REQ_MESSAGE;
+ r.hdr.destnode = vnn;
+ r.hdr.srcnode = ourvnn;
+ r.hdr.reqid = 0;
+ r.srvid = pid;
+ r.datalen = data.dsize;
+
+ /* write header */
+ cnt=write(fd, &r, offsetof(struct ctdb_req_message, data));
+ /* write data */
+ if(data.dsize){
+ cnt=write(fd, data.dptr, data.dsize);
+ }
+ return 0;
+}
+
+int receive_a_message(int fd, struct ctdb_req_message **preply)
+{
+ int cnt,tot;
+ struct ctdb_req_message *rep;
+ uint32_t length;
+
+ /* read the 4 bytes of length for the pdu */
+ cnt=0;
+ tot=4;
+ while(cnt!=tot){
+ int numread;
+ numread=read(fd, ((char *)&length)+cnt, tot-cnt);
+ if(numread>0){
+ cnt+=numread;
+ }
+ }
+
+ /* read the rest of the pdu */
+ rep = malloc(length);
+ rep->hdr.length = length;
+ cnt = 0;
+ tot = length-4;
+ while(cnt!=tot){
+ int numread;
+ numread=read(fd, ((char *)rep)+cnt, tot-cnt);
+ if(numread>0){
+ cnt+=numread;
+ }
+ }
+
+ *preply = rep;
+ return 0;
+}
+
+/*
+ hash function for mapping data to a VNN - taken from tdb
+*/
+uint32_t ctdb_hash(const TDB_DATA *key)
+{
+ uint32_t value; /* Used to compute the hash value. */
+ uint32_t i; /* Used to cycle through random values. */
+
+ /* Set the initial value from the key size. */
+ for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
+ value = (value + (key->dptr[i] << (i*5 % 24)));
+
+ return (1103515243 * value + 12345);
+}
+
+void fetch_lock(int fd, uint32_t db_id, TDB_DATA key)
+{
+ struct ctdb_req_fetch_lock *req;
+ struct ctdb_reply_fetch_lock *rep;
+ uint32_t length;
+ int len, cnt, tot;
+
+ len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize;
+ req = malloc(len);
+
+ req->hdr.length = len;
+ req->hdr.ctdb_magic = CTDB_MAGIC;
+ req->hdr.ctdb_version = CTDB_VERSION;
+ req->hdr.operation = CTDB_REQ_FETCH_LOCK;
+ req->hdr.reqid = 1;
+ req->db_id = db_id;
+ req->keylen = key.dsize;
+ memcpy(&req->key[0], key.dptr, key.dsize);
+
+ cnt=write(fd, req, len);
+
+
+ /* wait fot the reply */
+ /* read the 4 bytes of length for the pdu */
+ cnt=0;
+ tot=4;
+ while(cnt!=tot){
+ int numread;
+ numread=read(fd, ((char *)&length)+cnt, tot-cnt);
+ if(numread>0){
+ cnt+=numread;
+ }
+ }
+ /* read the rest of the pdu */
+ rep = malloc(length);
+ tot=length;
+ while(cnt!=tot){
+ int numread;
+ numread=read(fd, ((char *)rep)+cnt, tot-cnt);
+ if(numread>0){
+ cnt+=numread;
+ }
+ }
+ printf("fetch lock reply: state:%d datalen:%d\n",rep->state,rep->datalen);
+ if(!rep->datalen){
+ printf("no data\n");
+ } else {
+ printf("data:[%s]\n",rep->data);
+ }
+
+}
+
+void store_unlock(int fd, uint32_t db_id, TDB_DATA key, TDB_DATA data)
+{
+ struct ctdb_req_store_unlock *req;
+ struct ctdb_reply_store_unlock *rep;
+ uint32_t length;
+ int len, cnt, tot;
+
+ len = offsetof(struct ctdb_req_store_unlock, data) + key.dsize + data.dsize;
+ req = malloc(len);
+
+ req->hdr.length = len;
+ req->hdr.ctdb_magic = CTDB_MAGIC;
+ req->hdr.ctdb_version = CTDB_VERSION;
+ req->hdr.operation = CTDB_REQ_STORE_UNLOCK;
+ req->hdr.reqid = 1;
+ req->db_id = db_id;
+ req->keylen = key.dsize;
+ req->datalen = data.dsize;
+ memcpy(&req->data[0], key.dptr, key.dsize);
+ memcpy(&req->data[key.dsize], data.dptr, data.dsize);
+
+ cnt=write(fd, req, len);
+
+
+ /* wait fot the reply */
+ /* read the 4 bytes of length for the pdu */
+ cnt=0;
+ tot=4;
+ while(cnt!=tot){
+ int numread;
+ numread=read(fd, ((char *)&length)+cnt, tot-cnt);
+ if(numread>0){
+ cnt+=numread;
+ }
+ }
+ /* read the rest of the pdu */
+ rep = malloc(length);
+ tot=length;
+ while(cnt!=tot){
+ int numread;
+ numread=read(fd, ((char *)rep)+cnt, tot-cnt);
+ if(numread>0){
+ cnt+=numread;
+ }
+ }
+ printf("store unlock reply: state:%d\n",rep->state);
+}
+
+int main(int argc, const char *argv[])
+{
+ int fd, pid, vnn, dstvnn, dstpid;
+ TDB_DATA message;
+ struct ctdb_req_message *reply;
+ TDB_DATA dbname;
+ uint32_t db_id;
+ TDB_DATA key, data;
+ char str[256];
+
+ /* open the socket to talk to the local ctdb daemon */
+ fd=ux_socket_connect(CTDB_SOCKET);
+ if (fd==-1) {
+ printf("failed to open domain socket\n");
+ exit(10);
+ }
+
+
+ /* register our local server id with the daemon so that it knows
+ where to send messages addressed to our local pid.
+ */
+ pid=getpid();
+ register_pid_with_daemon(fd, pid);
+
+
+ /* do a connect wait to ensure that all nodes in the cluster are up
+ and operational.
+ this also tells us the vnn of the local cluster.
+ If someone wants to send us a emssage they should send it to
+ this vnn and our pid
+ */
+ vnn=wait_for_cluster(fd);
+ printf("our address is vnn:%d pid:%d if someone wants to send us a message!\n",vnn,pid);
+
+
+ /* send a message to ourself */
+ dstvnn=vnn;
+ dstpid=pid;
+ message.dptr=discard_const("Test message");
+ message.dsize=strlen((const char *)message.dptr)+1;
+ printf("sending test message [%s] to ourself\n", message.dptr);
+ send_a_message(fd, vnn, dstvnn, dstpid, message);
+
+ /* wait for the message to come back */
+ receive_a_message(fd, &reply);
+ printf("received message: [%s]\n",&reply->data[0]);
+
+ /* create the db id for "test.tdb" */
+ dbname.dptr = discard_const("test.tdb");
+ dbname.dsize = strlen((const char *)(dbname.dptr));
+ db_id = ctdb_hash(&dbname);
+ printf("the has for the database id is 0x%08x\n",db_id);
+ printf("\n");
+
+ /* send a fetch lock */
+ key.dptr=discard_const("TestKey");
+ key.dsize=strlen((const char *)(key.dptr));
+ printf("fetch the test key:[%s]\n",key.dptr);
+ fetch_lock(fd, db_id, key);
+ printf("\n");
+
+
+ /* send a store unlock */
+ sprintf(str,"TestData_%d",getpid());
+ data.dptr=discard_const(str);
+ data.dsize=strlen((const char *)(data.dptr));
+ printf("store new data==[%s] for this record\n",data.dptr);
+ store_unlock(fd, db_id, key, data);
+ printf("\n");
+
+ /* send a fetch lock */
+ printf("fetch the test key:[%s]\n",key.dptr);
+ fetch_lock(fd, db_id, key);
+ printf("\n");
+
+
+ return 0;
+}
diff --git a/source4/cluster/ctdb/direct/nodes.txt b/source4/cluster/ctdb/direct/nodes.txt
new file mode 100644
index 0000000000..e1198b59ac
--- /dev/null
+++ b/source4/cluster/ctdb/direct/nodes.txt
@@ -0,0 +1,2 @@
+127.0.0.1:9001
+127.0.0.2:9001