From c9f04d8648cfdd573d45d47467bc964ef01f754d Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Mon, 16 Apr 2007 00:18:54 +0000 Subject: r22231: merge from bzr ctdb tree (This used to be commit 807b959082d3b9a929c9f6597714e636638a940e) --- source4/cluster/ctdb/Makefile.in | 95 ++ source4/cluster/ctdb/README | 3 + source4/cluster/ctdb/autogen.sh | 17 + source4/cluster/ctdb/common/ctdb.c | 26 +- source4/cluster/ctdb/common/ctdb_call.c | 151 ++- source4/cluster/ctdb/common/ctdb_client.c | 662 ++++++++++++ source4/cluster/ctdb/common/ctdb_daemon.c | 631 +++++++++++ source4/cluster/ctdb/common/ctdb_io.c | 303 ++++++ source4/cluster/ctdb/common/ctdb_ltdb.c | 18 + source4/cluster/ctdb/common/ctdb_message.c | 132 ++- source4/cluster/ctdb/config.guess | 1466 ++++++++++++++++++++++++++ source4/cluster/ctdb/config.mk | 5 +- source4/cluster/ctdb/configure.ac | 33 + source4/cluster/ctdb/ctdb_cluster.c | 19 +- source4/cluster/ctdb/direct/README | 12 + source4/cluster/ctdb/direct/ctdbd.c | 157 +++ source4/cluster/ctdb/direct/ctdbd.sh | 8 + source4/cluster/ctdb/direct/ctdbd_test.c | 364 +++++++ source4/cluster/ctdb/direct/nodes.txt | 2 + source4/cluster/ctdb/ib/README.txt | 20 + source4/cluster/ctdb/ib/config.m4 | 31 + source4/cluster/ctdb/ib/ibw_ctdb.c | 157 +++ source4/cluster/ctdb/ib/ibw_ctdb.h | 46 + source4/cluster/ctdb/ib/ibw_ctdb_init.c | 214 ++++ source4/cluster/ctdb/ib/ibwrapper.c | 1344 +++++++++++++++++++++++ source4/cluster/ctdb/ib/ibwrapper.h | 219 ++++ source4/cluster/ctdb/ib/ibwrapper_internal.h | 127 +++ source4/cluster/ctdb/ib/ibwrapper_test.c | 641 +++++++++++ source4/cluster/ctdb/include/ctdb.h | 28 +- source4/cluster/ctdb/include/ctdb_private.h | 224 +++- source4/cluster/ctdb/include/idtree.h | 7 + source4/cluster/ctdb/include/includes.h | 36 + source4/cluster/ctdb/install-sh | 238 +++++ source4/cluster/ctdb/opendb_ctdb.c | 5 +- source4/cluster/ctdb/tcp/ctdb_tcp.h | 33 +- source4/cluster/ctdb/tcp/tcp_connect.c | 57 +- source4/cluster/ctdb/tcp/tcp_init.c | 20 +- source4/cluster/ctdb/tcp/tcp_io.c | 205 +--- source4/cluster/ctdb/tests/1node.txt | 1 + source4/cluster/ctdb/tests/4nodes.txt | 4 + source4/cluster/ctdb/tests/bench-ssh.sh | 43 + source4/cluster/ctdb/tests/bench.sh | 9 + source4/cluster/ctdb/tests/bench1.sh | 8 + source4/cluster/ctdb/tests/ctdb_bench.c | 13 +- source4/cluster/ctdb/tests/ctdb_fetch.c | 13 +- source4/cluster/ctdb/tests/ctdb_fetch1.c | 274 +++++ source4/cluster/ctdb/tests/ctdb_messaging.c | 187 ++++ source4/cluster/ctdb/tests/ctdb_test.c | 22 +- source4/cluster/ctdb/tests/fetch.sh | 9 + source4/cluster/ctdb/tests/fetch1.sh | 8 + source4/cluster/ctdb/tests/messaging.sh | 9 + source4/cluster/ctdb/tests/nodes.txt | 2 + source4/cluster/ctdb/tests/test.sh | 35 + source4/cluster/ctdb/tests/test1.sh | 8 + 54 files changed, 8058 insertions(+), 343 deletions(-) create mode 100644 source4/cluster/ctdb/Makefile.in create mode 100644 source4/cluster/ctdb/README create mode 100755 source4/cluster/ctdb/autogen.sh create mode 100644 source4/cluster/ctdb/common/ctdb_client.c create mode 100644 source4/cluster/ctdb/common/ctdb_daemon.c create mode 100644 source4/cluster/ctdb/common/ctdb_io.c create mode 100755 source4/cluster/ctdb/config.guess create mode 100644 source4/cluster/ctdb/configure.ac create mode 100644 source4/cluster/ctdb/direct/README create mode 100644 source4/cluster/ctdb/direct/ctdbd.c create mode 100755 source4/cluster/ctdb/direct/ctdbd.sh create mode 100644 source4/cluster/ctdb/direct/ctdbd_test.c create mode 100644 source4/cluster/ctdb/direct/nodes.txt create mode 100644 source4/cluster/ctdb/ib/README.txt create mode 100644 source4/cluster/ctdb/ib/config.m4 create mode 100644 source4/cluster/ctdb/ib/ibw_ctdb.c create mode 100644 source4/cluster/ctdb/ib/ibw_ctdb.h create mode 100644 source4/cluster/ctdb/ib/ibw_ctdb_init.c create mode 100644 source4/cluster/ctdb/ib/ibwrapper.c create mode 100644 source4/cluster/ctdb/ib/ibwrapper.h create mode 100644 source4/cluster/ctdb/ib/ibwrapper_internal.h create mode 100644 source4/cluster/ctdb/ib/ibwrapper_test.c create mode 100644 source4/cluster/ctdb/include/idtree.h create mode 100644 source4/cluster/ctdb/include/includes.h create mode 100755 source4/cluster/ctdb/install-sh create mode 100644 source4/cluster/ctdb/tests/1node.txt create mode 100644 source4/cluster/ctdb/tests/4nodes.txt create mode 100755 source4/cluster/ctdb/tests/bench-ssh.sh create mode 100755 source4/cluster/ctdb/tests/bench.sh create mode 100755 source4/cluster/ctdb/tests/bench1.sh create mode 100644 source4/cluster/ctdb/tests/ctdb_fetch1.c create mode 100644 source4/cluster/ctdb/tests/ctdb_messaging.c create mode 100755 source4/cluster/ctdb/tests/fetch.sh create mode 100755 source4/cluster/ctdb/tests/fetch1.sh create mode 100755 source4/cluster/ctdb/tests/messaging.sh create mode 100644 source4/cluster/ctdb/tests/nodes.txt create mode 100755 source4/cluster/ctdb/tests/test.sh create mode 100755 source4/cluster/ctdb/tests/test1.sh (limited to 'source4/cluster') diff --git a/source4/cluster/ctdb/Makefile.in b/source4/cluster/ctdb/Makefile.in new file mode 100644 index 0000000000..e0baf53045 --- /dev/null +++ b/source4/cluster/ctdb/Makefile.in @@ -0,0 +1,95 @@ +#!gmake +# +CC = @CC@ +prefix = @prefix@ +exec_prefix = @exec_prefix@ +datarootdir = @datarootdir@ +includedir = @includedir@ +libdir = @libdir@ +bindir = @bindir@ +VPATH = @srcdir@:@tdbdir@:@tallocdir@:@libreplacedir@ +srcdir = @srcdir@ +builddir = @builddir@ +EXTRA_OBJ=@EXTRA_OBJ@ + +CFLAGS=-g -I$(srcdir)/include -Iinclude -Ilib/util -I$(srcdir) \ + -I@tallocdir@ -I@tdbdir@/include -I@libreplacedir@ \ + -DLIBDIR=\"$(libdir)\" -DSHLIBEXT=\"@SHLIBEXT@\" -DUSE_MMAP=1 @CFLAGS@ + +LIB_FLAGS=@LDFLAGS@ -Llib @LIBS@ -lpopt @INFINIBAND_LIBS@ + +EVENTS_OBJ = lib/events/events.o lib/events/events_standard.o + +CTDB_COMMON_OBJ = common/ctdb.o common/ctdb_daemon.o common/ctdb_client.o common/ctdb_io.o common/util.o common/ctdb_util.o \ + common/ctdb_call.o common/ctdb_ltdb.o common/ctdb_message.o \ + lib/util/idtree.o lib/util/db_wrap.o + +CTDB_TCP_OBJ = tcp/tcp_connect.o tcp/tcp_io.o tcp/tcp_init.o + +CTDB_OBJ = $(CTDB_COMMON_OBJ) $(CTDB_TCP_OBJ) + +OBJS = @TDBOBJ@ @TALLOCOBJ@ @LIBREPLACEOBJ@ @INFINIBAND_WRAPPER_OBJ@ $(EXTRA_OBJ) $(EVENTS_OBJ) $(CTDB_OBJ) + +BINS = bin/ctdbd bin/ctdbd_test bin/ctdb_test bin/ctdb_bench bin/ctdb_messaging bin/ctdb_fetch bin/ctdb_fetch1 @INFINIBAND_BINS@ + +DIRS = lib bin + +all: showflags dirs $(OBJS) $(BINS) + +showflags: + @echo 'ctdb will be compiled with flags:' + @echo ' CFLAGS = $(CFLAGS)' + @echo ' LIBS = $(LIBS)' + +.c.o: + @echo Compiling $*.c + @mkdir -p `dirname $@` + @$(CC) $(CFLAGS) -c $< -o $@ + +dirs: + @mkdir -p $(DIRS) + +bin/ctdb_test: $(OBJS) tests/ctdb_test.o + @echo Linking $@ + @$(CC) $(CFLAGS) -o $@ tests/ctdb_test.o $(OBJS) $(LIB_FLAGS) + +bin/ctdbd: $(OBJS) direct/ctdbd.o + @echo Linking $@ + @$(CC) $(CFLAGS) -o $@ direct/ctdbd.o $(OBJS) $(LIB_FLAGS) + +bin/ctdbd_test: $(OBJS) direct/ctdbd_test.o + @echo Linking $@ + @$(CC) $(CFLAGS) -o $@ direct/ctdbd_test.o + +bin/ctdb_bench: $(OBJS) tests/ctdb_bench.o + @echo Linking $@ + @$(CC) $(CFLAGS) -o $@ tests/ctdb_bench.o $(OBJS) $(LIB_FLAGS) + +bin/ctdb_fetch: $(OBJS) tests/ctdb_fetch.o + @echo Linking $@ + @$(CC) $(CFLAGS) -o $@ tests/ctdb_fetch.o $(OBJS) $(LIB_FLAGS) + +bin/ctdb_fetch1: $(OBJS) tests/ctdb_fetch1.o + @echo Linking $@ + @$(CC) $(CFLAGS) -o $@ tests/ctdb_fetch1.o $(OBJS) $(LIB_FLAGS) + +bin/ctdb_messaging: $(OBJS) tests/ctdb_messaging.o + @echo Linking $@ + @$(CC) $(CFLAGS) -o $@ tests/ctdb_messaging.o $(OBJS) $(LIB_FLAGS) + +bin/ibwrapper_test: $(OBJS) ib/ibwrapper_test.o + @echo Linking $@ + @$(CC) $(CFLAGS) -o $@ ib/ibwrapper_test.o $(OBJS) $(LIB_FLAGS) + +clean: + rm -f *.o */*.o */*/*.o + rm -f $(BINS) + +distclean: clean + rm -f *~ */*~ + rm -rf bin + rm -f config.log config.status config.cache config.h + rm -f Makefile + +realdistclean: distclean + rm -f configure config.h.in diff --git a/source4/cluster/ctdb/README b/source4/cluster/ctdb/README new file mode 100644 index 0000000000..094d1d9958 --- /dev/null +++ b/source4/cluster/ctdb/README @@ -0,0 +1,3 @@ +To build this you need a recent copy of talloc, libreplace and tdb in +the directory above this directory. + diff --git a/source4/cluster/ctdb/autogen.sh b/source4/cluster/ctdb/autogen.sh new file mode 100755 index 0000000000..500cab87d5 --- /dev/null +++ b/source4/cluster/ctdb/autogen.sh @@ -0,0 +1,17 @@ +#!/bin/sh + +rm -rf autom4te.cache +rm -f configure config.h.in + +IPATHS="-I libreplace -I lib/replace -I ../libreplace -I ../replace" +IPATHS="$IPATHS -I lib/talloc -I talloc -I ../talloc" +IPATHS="$IPATHS -I lib/tdb -I tdb -I ../tdb" +IPATHS="$IPATHS -I lib/popt -I popt -I ../popt" +autoheader $IPATHS || exit 1 +autoconf $IPATHS || exit 1 + +rm -rf autom4te.cache + +echo "Now run ./configure and then make." +exit 0 + diff --git a/source4/cluster/ctdb/common/ctdb.c b/source4/cluster/ctdb/common/ctdb.c index b98c0a3d84..8a8d52f3f1 100644 --- a/source4/cluster/ctdb/common/ctdb.c +++ b/source4/cluster/ctdb/common/ctdb.c @@ -57,6 +57,14 @@ void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags) ctdb->flags |= flags; } +/* + clear some ctdb flags +*/ +void ctdb_clear_flags(struct ctdb_context *ctdb, unsigned flags) +{ + ctdb->flags &= ~flags; +} + /* set max acess count before a dmaster migration */ @@ -179,14 +187,6 @@ uint32_t ctdb_get_num_nodes(struct ctdb_context *ctdb) } -/* - start the protocol going -*/ -int ctdb_start(struct ctdb_context *ctdb) -{ - return ctdb->methods->start(ctdb); -} - /* called by the transport layer when a packet comes in */ @@ -274,7 +274,7 @@ static void ctdb_node_connected(struct ctdb_node *node) /* wait for all nodes to be connected */ -void ctdb_connect_wait(struct ctdb_context *ctdb) +void ctdb_daemon_connect_wait(struct ctdb_context *ctdb) { int expected = ctdb->num_nodes - 1; if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) { @@ -338,3 +338,11 @@ struct ctdb_context *ctdb_init(struct event_context *ev) return ctdb; } +int ctdb_start(struct ctdb_context *ctdb) +{ + if (ctdb->flags&CTDB_FLAG_DAEMON_MODE) { + return ctdbd_start(ctdb); + } + + return ctdb->methods->start(ctdb); +} diff --git a/source4/cluster/ctdb/common/ctdb_call.c b/source4/cluster/ctdb/common/ctdb_call.c index 77ec872852..ab5c2cce3b 100644 --- a/source4/cluster/ctdb/common/ctdb_call.c +++ b/source4/cluster/ctdb/common/ctdb_call.c @@ -28,6 +28,22 @@ #include "system/filesys.h" #include "../include/ctdb_private.h" +/* + find the ctdb_db from a db index + */ + struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id) +{ + struct ctdb_db_context *ctdb_db; + + for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) { + if (ctdb_db->db_id == id) { + break; + } + } + return ctdb_db; +} + + /* local version of ctdb_call */ @@ -38,7 +54,7 @@ static int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *ca struct ctdb_call_info *c; struct ctdb_registered_call *fn; struct ctdb_context *ctdb = ctdb_db->ctdb; - + c = talloc(ctdb, struct ctdb_call_info); CTDB_NO_MEMORY(ctdb, c); @@ -242,13 +258,11 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr data.dptr = c->data + c->keylen; data.dsize = c->datalen; - for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) { - if (ctdb_db->db_id == c->db_id) { - break; - } - } + ctdb_db = find_ctdb_db(ctdb, c->db_id); if (!ctdb_db) { - ctdb_send_error(ctdb, hdr, ret, "Unknown database in request. db_id==0x%08x",c->db_id); + ctdb_send_error(ctdb, hdr, -1, + "Unknown database in request. db_id==0x%08x", + c->db_id); return; } @@ -309,13 +323,11 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) struct ctdb_call call; struct ctdb_db_context *ctdb_db; - for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) { - if (ctdb_db->db_id == c->db_id) { - break; - } - } + ctdb_db = find_ctdb_db(ctdb, c->db_id); if (!ctdb_db) { - ctdb_send_error(ctdb, hdr, ret, "Unknown database in request. db_id==0x%08x",c->db_id); + ctdb_send_error(ctdb, hdr, -1, + "Unknown database in request. db_id==0x%08x", + c->db_id); return; } @@ -380,24 +392,6 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) talloc_free(r); } -enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR}; - -/* - state of a in-progress ctdb call -*/ -struct ctdb_call_state { - enum call_state state; - struct ctdb_req_call *c; - struct ctdb_db_context *ctdb_db; - struct ctdb_node *node; - const char *errmsg; - struct ctdb_call call; - int redirect_count; - struct ctdb_ltdb_header header; - void *fetch_private; -}; - - /* called when a CTDB_REPLY_CALL packet comes in @@ -418,7 +412,14 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) talloc_steal(state, c); + /* get an extra reference here - this prevents the free in ctdb_recv_pkt() + from freeing the data */ + (void)talloc_reference(state, c); + state->state = CTDB_CALL_DONE; + if (state->async.fn) { + state->async.fn(state); + } } /* @@ -458,6 +459,9 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn); state->state = CTDB_CALL_DONE; + if (state->async.fn) { + state->async.fn(state); + } } @@ -476,6 +480,9 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) state->state = CTDB_CALL_ERROR; state->errmsg = (char *)c->msg; + if (state->async.fn) { + state->async.fn(state); + } } @@ -521,14 +528,30 @@ static int ctdb_call_destructor(struct ctdb_call_state *state) called when a ctdb_call times out */ void ctdb_call_timeout(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private) + struct timeval t, void *private_data) { - struct ctdb_call_state *state = talloc_get_type(private, struct ctdb_call_state); + struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state); state->state = CTDB_CALL_ERROR; ctdb_set_error(state->node->ctdb, "ctdb_call %u timed out", state->c->hdr.reqid); + if (state->async.fn) { + state->async.fn(state); + } } +/* + this allows the caller to setup a async.fn +*/ +static void call_local_trigger(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state); + if (state->async.fn) { + state->async.fn(state); + } +} + + /* construct an event driven local ctdb_call @@ -556,17 +579,20 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn); + event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state); + return state; } /* - make a remote ctdb call - async send + make a remote ctdb call - async send. Called in daemon context. This constructs a ctdb_call request and queues it for processing. This call never blocks. */ -struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) +static struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call) { uint32_t len; struct ctdb_call_state *state; @@ -633,21 +659,27 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct c return state; } +/* + make a remote ctdb call - async send - -struct ctdb_record_handle { - struct ctdb_db_context *ctdb_db; - TDB_DATA key; - TDB_DATA *data; -}; + This constructs a ctdb_call request and queues it for processing. + This call never blocks. +*/ +struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) +{ + if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { + return ctdb_client_call_send(ctdb_db, call); + } + return ctdb_daemon_call_send(ctdb_db, call); +} /* - make a remote ctdb call - async recv. + make a remote ctdb call - async recv - called in daemon context This is called when the program wants to wait for a ctdb_call to complete and get the results. This call will block unless the call has already completed. */ -int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) +static int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) { struct ctdb_record_handle *rec; @@ -684,21 +716,34 @@ int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) return 0; } + +/* + make a remote ctdb call - async recv. + + This is called when the program wants to wait for a ctdb_call to complete and get the + results. This call will block unless the call has already completed. +*/ +int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) +{ + if (state->ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { + return ctdb_client_call_recv(state, call); + } + return ctdb_daemon_call_recv(state, call); +} + /* full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv() */ int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) { struct ctdb_call_state *state; + state = ctdb_call_send(ctdb_db, call); return ctdb_call_recv(state, call); } - - - struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data) { @@ -707,6 +752,10 @@ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALL struct ctdb_call_state *state; int ret; + if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { + return ctdb_client_fetch_lock(ctdb_db, mem_ctx, key, data); + } + ZERO_STRUCT(call); call.call_id = CTDB_FETCH_FUNC; call.key = key; @@ -733,19 +782,27 @@ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALL } -int ctdb_record_store(struct ctdb_record_handle *rec, TDB_DATA data) +int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data) { int ret; struct ctdb_ltdb_header header; + struct ctdb_db_context *ctdb_db = talloc_get_type(rec->ctdb_db, struct ctdb_db_context); + + if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { + return ctdb_client_store_unlock(rec, data); + } /* should be avoided if possible hang header off rec ? */ ret = ctdb_ltdb_fetch(rec->ctdb_db, rec->key, &header, NULL, NULL); if (ret) { ctdb_set_error(rec->ctdb_db->ctdb, "Fetch of locally held record failed"); + talloc_free(rec); return ret; } ret = ctdb_ltdb_store(rec->ctdb_db, rec->key, &header, data); + talloc_free(rec); + return ret; } diff --git a/source4/cluster/ctdb/common/ctdb_client.c b/source4/cluster/ctdb/common/ctdb_client.c new file mode 100644 index 0000000000..3cb27a1165 --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb_client.c @@ -0,0 +1,662 @@ +/* + ctdb daemon code + + Copyright (C) Andrew Tridgell 2007 + Copyright (C) Ronnie Sahlberg 2007 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "db_wrap.h" +#include "lib/tdb/include/tdb.h" +#include "lib/events/events.h" +#include "lib/util/dlinklist.h" +#include "system/network.h" +#include "system/filesys.h" +#include "../include/ctdb.h" +#include "../include/ctdb_private.h" + +/* + queue a packet for sending from client to daemon +*/ +static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length); +} + + +/* + handle a connect wait reply packet + */ +static void ctdb_reply_connect_wait(struct ctdb_context *ctdb, + struct ctdb_req_header *hdr) +{ + struct ctdb_reply_connect_wait *r = (struct ctdb_reply_connect_wait *)hdr; + ctdb->num_connected = r->num_connected; +} + +/* + called in the client when we receive a CTDB_REPLY_FETCH_LOCK from the daemon + + This packet comes in response to a CTDB_REQ_FETCH_LOCK request packet. It + contains any reply data from the call +*/ +void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_fetch_lock *c = (struct ctdb_reply_fetch_lock *)hdr; + struct ctdb_call_state *state; + + state = idr_find(ctdb->idr, hdr->reqid); + if (state == NULL) return; + + state->call.reply_data.dptr = c->data; + state->call.reply_data.dsize = c->datalen; + state->call.status = c->state; + + talloc_steal(state, c); + + /* get an extra reference here - this prevents the free in ctdb_recv_pkt() + from freeing the data */ + (void)talloc_reference(state, c); + + state->state = CTDB_CALL_DONE; + if (state->async.fn) { + state->async.fn(state); + } +} + +/* + called in the client when we receive a CTDB_REPLY_STORE_UNLOCK from the daemon + + This packet comes in response to a CTDB_REQ_STORE_UNLOCK request packet. It + contains any reply data from the call +*/ +void ctdb_reply_store_unlock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_store_unlock *c = (struct ctdb_reply_store_unlock *)hdr; + struct ctdb_call_state *state; + + state = idr_find(ctdb->idr, hdr->reqid); + if (state == NULL) return; + + state->call.status = c->state; + + talloc_steal(state, c); + + /* get an extra reference here - this prevents the free in ctdb_recv_pkt() + from freeing the data */ + (void)talloc_reference(state, c); + + state->state = CTDB_CALL_DONE; + if (state->async.fn) { + state->async.fn(state); + } +} +/* + this is called in the client, when data comes in from the daemon + */ +static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) +{ + struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context); + struct ctdb_req_header *hdr; + + if (cnt < sizeof(*hdr)) { + ctdb_set_error(ctdb, "Bad packet length %d\n", cnt); + return; + } + hdr = (struct ctdb_req_header *)data; + if (cnt != hdr->length) { + ctdb_set_error(ctdb, "Bad header length %d expected %d\n", + hdr->length, cnt); + return; + } + + if (hdr->ctdb_magic != CTDB_MAGIC) { + ctdb_set_error(ctdb, "Non CTDB packet rejected\n"); + return; + } + + if (hdr->ctdb_version != CTDB_VERSION) { + ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); + return; + } + + switch (hdr->operation) { + case CTDB_REPLY_CALL: + ctdb_reply_call(ctdb, hdr); + break; + + case CTDB_REQ_MESSAGE: + ctdb_request_message(ctdb, hdr); + break; + + case CTDB_REPLY_CONNECT_WAIT: + ctdb_reply_connect_wait(ctdb, hdr); + break; + + case CTDB_REPLY_FETCH_LOCK: + ctdb_reply_fetch_lock(ctdb, hdr); + break; + + case CTDB_REPLY_STORE_UNLOCK: + ctdb_reply_store_unlock(ctdb, hdr); + break; + + default: + printf("bogus operation code:%d\n",hdr->operation); + } +} + +/* + connect to a unix domain socket +*/ +static int ux_socket_connect(struct ctdb_context *ctdb) +{ + struct sockaddr_un addr; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)); + + ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0); + if (ctdb->daemon.sd == -1) { + return -1; + } + + if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + close(ctdb->daemon.sd); + ctdb->daemon.sd = -1; + return -1; + } + + ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, + CTDB_DS_ALIGNMENT, + ctdb_client_read_cb, ctdb); + return 0; +} + + + +/* + make a recv call to the local ctdb daemon - called from client context + + This is called when the program wants to wait for a ctdb_call to complete and get the + results. This call will block unless the call has already completed. +*/ +int ctdb_client_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) +{ + struct ctdb_record_handle *rec; + + while (state->state < CTDB_CALL_DONE) { + event_loop_once(state->node->ctdb->ev); + } + if (state->state != CTDB_CALL_DONE) { + ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + talloc_free(state); + return -1; + } + + rec = state->fetch_private; + + /* ugly hack to manage forced migration */ + if (rec != NULL) { + rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr); + rec->data->dsize = state->call.reply_data.dsize; + talloc_free(state); + return 0; + } + + if (state->call.reply_data.dsize) { + call->reply_data.dptr = talloc_memdup(state->node->ctdb, + state->call.reply_data.dptr, + state->call.reply_data.dsize); + call->reply_data.dsize = state->call.reply_data.dsize; + } else { + call->reply_data.dptr = NULL; + call->reply_data.dsize = 0; + } + call->status = state->call.status; + talloc_free(state); + + return 0; +} + + + + +/* + destroy a ctdb_call in client +*/ +static int ctdb_client_call_destructor(struct ctdb_call_state *state) +{ + idr_remove(state->node->ctdb->idr, state->c->hdr.reqid); + return 0; +} + + + +/* + make a ctdb call to the local daemon - async send. Called from client context. + + This constructs a ctdb_call request and queues it for processing. + This call never blocks. +*/ +struct ctdb_call_state *ctdb_client_call_send(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call) +{ + struct ctdb_call_state *state; + struct ctdb_context *ctdb = ctdb_db->ctdb; + struct ctdb_ltdb_header header; + TDB_DATA data; + int ret; + size_t len; + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ux_socket_connect(ctdb); + } + + ret = ctdb_ltdb_lock(ctdb_db, call->key); + if (ret != 0) { + printf("failed to lock ltdb record\n"); + return NULL; + } + + ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data); + if (ret != 0) { + ctdb_ltdb_unlock(ctdb_db, call->key); + return NULL; + } + +#if 0 + if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { + state = ctdb_call_local_send(ctdb_db, call, &header, &data); + ctdb_ltdb_unlock(ctdb_db, call->key); + return state; + } +#endif + + state = talloc_zero(ctdb_db, struct ctdb_call_state); + if (state == NULL) { + printf("failed to allocate state\n"); + ctdb_ltdb_unlock(ctdb_db, call->key); + return NULL; + } + + talloc_steal(state, data.dptr); + + len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize; + state->c = ctdbd_allocate_pkt(ctdb, len); + if (state->c == NULL) { + printf("failed to allocate packet\n"); + ctdb_ltdb_unlock(ctdb_db, call->key); + return NULL; + } + talloc_set_name_const(state->c, "ctdbd req_call packet"); + talloc_steal(state, state->c); + + state->c->hdr.length = len; + state->c->hdr.ctdb_magic = CTDB_MAGIC; + state->c->hdr.ctdb_version = CTDB_VERSION; + state->c->hdr.operation = CTDB_REQ_CALL; + state->c->hdr.destnode = header.dmaster; + state->c->hdr.srcnode = ctdb->vnn; + /* this limits us to 16k outstanding messages - not unreasonable */ + state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + state->c->flags = call->flags; + state->c->db_id = ctdb_db->db_id; + state->c->callid = call->call_id; + state->c->keylen = call->key.dsize; + state->c->calldatalen = call->call_data.dsize; + memcpy(&state->c->data[0], call->key.dptr, call->key.dsize); + memcpy(&state->c->data[call->key.dsize], + call->call_data.dptr, call->call_data.dsize); + state->call = *call; + state->call.call_data.dptr = &state->c->data[call->key.dsize]; + state->call.key.dptr = &state->c->data[0]; + + state->node = ctdb->nodes[header.dmaster]; + state->state = CTDB_CALL_WAIT; + state->header = header; + state->ctdb_db = ctdb_db; + + talloc_set_destructor(state, ctdb_client_call_destructor); + + ctdb_client_queue_pkt(ctdb, &state->c->hdr); + +/*XXX set up timeout to cleanup if server doesnt respond + event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), + ctdb_call_timeout, state); +*/ + + ctdb_ltdb_unlock(ctdb_db, call->key); + return state; +} + + + +/* + tell the daemon what messaging srvid we will use, and register the message + handler function in the client +*/ +int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, + ctdb_message_fn_t handler, + void *private_data) + +{ + struct ctdb_req_register c; + int res; + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ux_socket_connect(ctdb); + } + + ZERO_STRUCT(c); + + c.hdr.length = sizeof(c); + c.hdr.ctdb_magic = CTDB_MAGIC; + c.hdr.ctdb_version = CTDB_VERSION; + c.hdr.operation = CTDB_REQ_REGISTER; + c.srvid = srvid; + + res = ctdb_client_queue_pkt(ctdb, &c.hdr); + if (res != 0) { + return res; + } + + /* also need to register the handler with our ctdb structure */ + return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data); +} + + + +/* + setup handler for receipt of ctdb messages from ctdb_send_message() +*/ +int ctdb_set_message_handler(struct ctdb_context *ctdb, + uint32_t srvid, + ctdb_message_fn_t handler, + void *private_data) +{ + if (ctdb->flags & CTDB_FLAG_DAEMON_MODE) { + return ctdb_client_set_message_handler(ctdb, srvid, handler, private_data); + } + return ctdb_daemon_set_message_handler(ctdb, srvid, handler, private_data); +} + + +/* + send a message - from client context + */ +int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn, + uint32_t srvid, TDB_DATA data) +{ + struct ctdb_req_message *r; + int len, res; + + len = offsetof(struct ctdb_req_message, data) + data.dsize; + r = ctdb->methods->allocate_pkt(ctdb, len); + CTDB_NO_MEMORY(ctdb, r); + talloc_set_name_const(r, "req_message packet"); + + r->hdr.length = len; + r->hdr.ctdb_magic = CTDB_MAGIC; + r->hdr.ctdb_version = CTDB_VERSION; + r->hdr.operation = CTDB_REQ_MESSAGE; + r->hdr.destnode = vnn; + r->hdr.srcnode = ctdb->vnn; + r->hdr.reqid = 0; + r->srvid = srvid; + r->datalen = data.dsize; + memcpy(&r->data[0], data.dptr, data.dsize); + + res = ctdb_client_queue_pkt(ctdb, &r->hdr); + if (res != 0) { + return res; + } + + talloc_free(r); + return 0; +} + +/* + wait for all nodes to be connected - from client + */ +static void ctdb_client_connect_wait(struct ctdb_context *ctdb) +{ + struct ctdb_req_connect_wait r; + int res; + + ZERO_STRUCT(r); + + r.hdr.length = sizeof(r); + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.operation = CTDB_REQ_CONNECT_WAIT; + + res = ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)&r.hdr, r.hdr.length); + if (res != 0) { + printf("Failed to queue a connect wait request\n"); + return; + } + + /* now we can go into the normal wait routine, as the reply packet + will update the ctdb->num_connected variable */ + ctdb_daemon_connect_wait(ctdb); +} + +/* + wait for all nodes to be connected +*/ +void ctdb_connect_wait(struct ctdb_context *ctdb) +{ + if (!(ctdb->flags & CTDB_FLAG_DAEMON_MODE)) { + ctdb_daemon_connect_wait(ctdb); + return; + } + + ctdb_client_connect_wait(ctdb); +} + + +struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db, + TALLOC_CTX *mem_ctx, + TDB_DATA key) +{ + struct ctdb_call_state *state; + struct ctdb_context *ctdb = ctdb_db->ctdb; + struct ctdb_req_fetch_lock *req; + int len, res; + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ux_socket_connect(ctdb); + } + + state = talloc_zero(ctdb_db, struct ctdb_call_state); + if (state == NULL) { + printf("failed to allocate state\n"); + return NULL; + } + state->state = CTDB_CALL_WAIT; + state->ctdb_db = ctdb_db; + len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize; + state->c = ctdbd_allocate_pkt(ctdb, len); + if (state->c == NULL) { + printf("failed to allocate packet\n"); + return NULL; + } + ZERO_STRUCT(*state->c); + talloc_set_name_const(state->c, "ctdbd req_fetch_lock packet"); + talloc_steal(state, state->c); + + req = (struct ctdb_req_fetch_lock *)state->c; + req->hdr.length = len; + req->hdr.ctdb_magic = CTDB_MAGIC; + req->hdr.ctdb_version = CTDB_VERSION; + req->hdr.operation = CTDB_REQ_FETCH_LOCK; + req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + req->db_id = ctdb_db->db_id; + req->keylen = key.dsize; + memcpy(&req->key[0], key.dptr, key.dsize); + + res = ctdb_client_queue_pkt(ctdb, &req->hdr); + if (res != 0) { + return NULL; + } + + talloc_free(req); + + return state; +} + + +struct ctdb_call_state *ctdb_client_store_unlock_send( + struct ctdb_record_handle *rh, + TALLOC_CTX *mem_ctx, + TDB_DATA data) +{ + struct ctdb_call_state *state; + struct ctdb_db_context *ctdb_db = talloc_get_type(rh->ctdb_db, struct ctdb_db_context); + struct ctdb_context *ctdb = ctdb_db->ctdb; + struct ctdb_req_store_unlock *req; + int len, res; + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ux_socket_connect(ctdb); + } + + state = talloc_zero(ctdb_db, struct ctdb_call_state); + if (state == NULL) { + printf("failed to allocate state\n"); + return NULL; + } + state->state = CTDB_CALL_WAIT; + state->ctdb_db = ctdb_db; + len = offsetof(struct ctdb_req_store_unlock, data) + rh->key.dsize + data.dsize; + state->c = ctdbd_allocate_pkt(ctdb, len); + if (state->c == NULL) { + printf("failed to allocate packet\n"); + return NULL; + } + ZERO_STRUCT(*state->c); + talloc_set_name_const(state->c, "ctdbd req_store_unlock packet"); + talloc_steal(state, state->c); + + req = (struct ctdb_req_store_unlock *)state->c; + req->hdr.length = len; + req->hdr.ctdb_magic = CTDB_MAGIC; + req->hdr.ctdb_version = CTDB_VERSION; + req->hdr.operation = CTDB_REQ_STORE_UNLOCK; + req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + req->db_id = ctdb_db->db_id; + req->keylen = rh->key.dsize; + req->datalen = data.dsize; + memcpy(&req->data[0], rh->key.dptr, rh->key.dsize); + memcpy(&req->data[req->keylen], data.dptr, data.dsize); + + res = ctdb_client_queue_pkt(ctdb, &req->hdr); + if (res != 0) { + return NULL; + } + + talloc_free(req); + + return state; +} + +/* + make a recv call to the local ctdb daemon - called from client context + + This is called when the program wants to wait for a ctdb_fetch_lock to complete and get the + results. This call will block unless the call has already completed. +*/ +struct ctdb_record_handle *ctdb_client_fetch_lock_recv(struct ctdb_call_state *state, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data) +{ + struct ctdb_record_handle *rec; + + while (state->state < CTDB_CALL_DONE) { + event_loop_once(state->ctdb_db->ctdb->ev); + } + if (state->state != CTDB_CALL_DONE) { + ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + talloc_free(state); + return NULL; + } + + rec = talloc(mem_ctx, struct ctdb_record_handle); + CTDB_NO_MEMORY_NULL(state->ctdb_db->ctdb, rec); + + rec->ctdb_db = state->ctdb_db; + rec->key = key; + rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize); + rec->data = talloc(rec, TDB_DATA); + rec->data->dsize = state->call.reply_data.dsize; + rec->data->dptr = talloc_memdup(rec, state->call.reply_data.dptr, rec->data->dsize); + + if (data) { + *data = *rec->data; + } + return rec; +} + +/* + make a recv call to the local ctdb daemon - called from client context + + This is called when the program wants to wait for a ctdb_store_unlock to complete and get the + results. This call will block unless the call has already completed. +*/ +int ctdb_client_store_unlock_recv(struct ctdb_call_state *state, struct ctdb_record_handle *rec) +{ + while (state->state < CTDB_CALL_DONE) { + event_loop_once(state->ctdb_db->ctdb->ev); + } + if (state->state != CTDB_CALL_DONE) { + ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + } + + talloc_free(state); + return state->state; +} + +struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db, + TALLOC_CTX *mem_ctx, + TDB_DATA key, + TDB_DATA *data) +{ + struct ctdb_call_state *state; + struct ctdb_record_handle *rec; + + state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key); + rec = ctdb_client_fetch_lock_recv(state, mem_ctx, key, data); + + return rec; +} + +int ctdb_client_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data) +{ + struct ctdb_call_state *state; + int res; + + state = ctdb_client_store_unlock_send(rec, rec, data); + res = ctdb_client_store_unlock_recv(state, rec); + + talloc_free(rec); + + return res; +} diff --git a/source4/cluster/ctdb/common/ctdb_daemon.c b/source4/cluster/ctdb/common/ctdb_daemon.c new file mode 100644 index 0000000000..945030d77e --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb_daemon.c @@ -0,0 +1,631 @@ +/* + ctdb daemon code + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "db_wrap.h" +#include "lib/tdb/include/tdb.h" +#include "lib/events/events.h" +#include "lib/util/dlinklist.h" +#include "system/network.h" +#include "system/filesys.h" +#include "../include/ctdb.h" +#include "../include/ctdb_private.h" + +static void ctdb_main_loop(struct ctdb_context *ctdb) +{ + ctdb->methods->start(ctdb); + + /* go into a wait loop to allow other nodes to complete */ + event_loop_wait(ctdb->ev); + + printf("event_loop_wait() returned. this should not happen\n"); + exit(1); +} + + +static void set_non_blocking(int fd) +{ + unsigned v; + v = fcntl(fd, F_GETFL, 0); + fcntl(fd, F_SETFL, v | O_NONBLOCK); +} + + +/* + structure describing a connected client in the daemon + */ +struct ctdb_client { + struct ctdb_context *ctdb; + int fd; + struct ctdb_queue *queue; +}; + + +/* + message handler for when we are in daemon mode. This redirects the message + to the right client + */ +static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, + TDB_DATA data, void *private_data) +{ + struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client); + struct ctdb_req_message *r; + int len; + + /* construct a message to send to the client containing the data */ + len = offsetof(struct ctdb_req_message, data) + data.dsize; + r = ctdbd_allocate_pkt(ctdb, len); + +/*XXX cant use this since it returns an int CTDB_NO_MEMORY(ctdb, r);*/ + talloc_set_name_const(r, "req_message packet"); + + ZERO_STRUCT(*r); + + r->hdr.length = len; + r->hdr.ctdb_magic = CTDB_MAGIC; + r->hdr.ctdb_version = CTDB_VERSION; + r->hdr.operation = CTDB_REQ_MESSAGE; + r->srvid = srvid; + r->datalen = data.dsize; + memcpy(&r->data[0], data.dptr, data.dsize); + + ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, len); + + talloc_free(r); + return; +} + + +/* + this is called when the ctdb daemon received a ctdb request to + set the srvid from the client + */ +static void daemon_request_register_message_handler(struct ctdb_client *client, + struct ctdb_req_register *c) +{ + int res; + res = ctdb_register_message_handler(client->ctdb, client, + c->srvid, daemon_message_handler, + client); + if (res != 0) { + printf("Failed to register handler %u in daemon\n", c->srvid); + } +} + + +static struct ctdb_call_state *ctdb_fetch_lock_send(struct ctdb_db_context *ctdb_db, + TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data) +{ + struct ctdb_call *call; + struct ctdb_record_handle *rec; + struct ctdb_call_state *state; + + rec = talloc(mem_ctx, struct ctdb_record_handle); + CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec); + + + call = talloc(rec, struct ctdb_call); + ZERO_STRUCT(*call); + call->call_id = CTDB_FETCH_FUNC; + call->key = key; + call->flags = CTDB_IMMEDIATE_MIGRATION; + + + rec->ctdb_db = ctdb_db; + rec->key = key; + rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize); + rec->data = data; + + state = ctdb_call_send(ctdb_db, call); + state->fetch_private = rec; + + return state; +} + +struct client_fetch_lock_data { + struct ctdb_client *client; + uint32_t reqid; +}; +static void daemon_fetch_lock_complete(struct ctdb_call_state *state) +{ + struct ctdb_reply_fetch_lock *r; + struct client_fetch_lock_data *data = talloc_get_type(state->async.private_data, struct client_fetch_lock_data); + struct ctdb_client *client = talloc_get_type(data->client, struct ctdb_client); + int length, res; + + length = offsetof(struct ctdb_reply_fetch_lock, data) + state->call.reply_data.dsize; + r = ctdbd_allocate_pkt(client->ctdb, length); + if (r == NULL) { + printf("Failed to allocate reply_call in ctdb daemon\n"); + return; + } + ZERO_STRUCT(*r); + r->hdr.length = length; + r->hdr.ctdb_magic = CTDB_MAGIC; + r->hdr.ctdb_version = CTDB_VERSION; + r->hdr.operation = CTDB_REPLY_FETCH_LOCK; + r->hdr.reqid = data->reqid; + r->state = state->state; + r->datalen = state->call.reply_data.dsize; + memcpy(&r->data[0], state->call.reply_data.dptr, r->datalen); + + res = ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, r->hdr.length); + if (res != 0) { + printf("Failed to queue packet from daemon to client\n"); + } + talloc_free(r); +} + +/* + called when the daemon gets a fetch lock request from a client + */ +static void daemon_request_fetch_lock(struct ctdb_client *client, + struct ctdb_req_fetch_lock *f) +{ + struct ctdb_call_state *state; + TDB_DATA key, *data; + struct ctdb_db_context *ctdb_db; + struct client_fetch_lock_data *fl_data; + + ctdb_db = find_ctdb_db(client->ctdb, f->db_id); + + key.dsize = f->keylen; + key.dptr = &f->key[0]; + + data = talloc(client, TDB_DATA); + data->dptr = NULL; + data->dsize = 0; + + state = ctdb_fetch_lock_send(ctdb_db, client, key, data); + talloc_steal(state, data); + + fl_data = talloc(state, struct client_fetch_lock_data); + fl_data->client = client; + fl_data->reqid = f->hdr.reqid; + state->async.fn = daemon_fetch_lock_complete; + state->async.private_data = fl_data; +} + +/* + called when the daemon gets a store unlock request from a client + + this would never block? + */ +static void daemon_request_store_unlock(struct ctdb_client *client, + struct ctdb_req_store_unlock *f) +{ + struct ctdb_db_context *ctdb_db; + struct ctdb_reply_store_unlock r; + uint32_t caller = ctdb_get_vnn(client->ctdb); + struct ctdb_ltdb_header header; + TDB_DATA key, data; + int res; + + ctdb_db = find_ctdb_db(client->ctdb, f->db_id); + + /* write the data to ltdb */ + key.dsize = f->keylen; + key.dptr = &f->data[0]; + res = ctdb_ltdb_fetch(ctdb_db, key, &header, NULL, NULL); + if (res) { + ctdb_set_error(ctdb_db->ctdb, "Fetch of locally held record failed"); + res = -1; + goto done; + } + if (header.laccessor != caller) { + header.lacount = 0; + } + header.laccessor = caller; + header.lacount++; + data.dsize = f->datalen; + data.dptr = &f->data[f->keylen]; + res = ctdb_ltdb_store(ctdb_db, key, &header, data); + if ( res != 0) { + ctdb_set_error(ctdb_db->ctdb, "ctdb_call tdb_store failed\n"); + } + + +done: + /* now send the reply */ + ZERO_STRUCT(r); + + r.hdr.length = sizeof(r); + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.operation = CTDB_REPLY_STORE_UNLOCK; + r.hdr.reqid = f->hdr.reqid; + r.state = res; + + res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length); + if (res != 0) { + printf("Failed to queue a store unlock response\n"); + return; + } +} + +/* + called when the daemon gets a connect wait request from a client + */ +static void daemon_request_connect_wait(struct ctdb_client *client, + struct ctdb_req_connect_wait *c) +{ + struct ctdb_reply_connect_wait r; + int res; + + /* first wait - in the daemon */ + ctdb_daemon_connect_wait(client->ctdb); + + /* now send the reply */ + ZERO_STRUCT(r); + + r.hdr.length = sizeof(r); + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.operation = CTDB_REPLY_CONNECT_WAIT; + r.vnn = ctdb_get_vnn(client->ctdb); + r.num_connected = client->ctdb->num_connected; + + res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length); + if (res != 0) { + printf("Failed to queue a connect wait response\n"); + return; + } +} + +/* + destroy a ctdb_client +*/ +static int ctdb_client_destructor(struct ctdb_client *client) +{ + close(client->fd); + client->fd = -1; + return 0; +} + + +/* + this is called when the ctdb daemon received a ctdb request message + from a local client over the unix domain socket + */ +static void daemon_request_message_from_client(struct ctdb_client *client, + struct ctdb_req_message *c) +{ + TDB_DATA data; + int res; + + /* maybe the message is for another client on this node */ + if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) { + ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c); + return; + } + + /* its for a remote node */ + data.dptr = &c->data[0]; + data.dsize = c->datalen; + res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode, + c->srvid, data); + if (res != 0) { + printf("Failed to send message to remote node %u\n", + c->hdr.destnode); + } +} + +/* + this is called when the ctdb daemon received a ctdb request call + from a local client over the unix domain socket + */ +static void daemon_request_call_from_client(struct ctdb_client *client, + struct ctdb_req_call *c) +{ + struct ctdb_call_state *state; + struct ctdb_db_context *ctdb_db; + struct ctdb_call call; + struct ctdb_reply_call *r; + int res; + uint32_t length; + + ctdb_db = find_ctdb_db(client->ctdb, c->db_id); + if (!ctdb_db) { + printf("Unknown database in request. db_id==0x%08x",c->db_id); + return; + } + + ZERO_STRUCT(call); + call.call_id = c->callid; + call.key.dptr = c->data; + call.key.dsize = c->keylen; + call.call_data.dptr = c->data + c->keylen; + call.call_data.dsize = c->calldatalen; + + state = ctdb_call_send(ctdb_db, &call); + +/* XXX this must be converted to fully async */ + res = ctdb_call_recv(state, &call); + if (res != 0) { + printf("ctdbd_call_recv() returned error\n"); + exit(1); + } + + length = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize; + r = ctdbd_allocate_pkt(client->ctdb, length); + if (r == NULL) { + printf("Failed to allocate reply_call in ctdb daemon\n"); + return; + } + ZERO_STRUCT(*r); + r->hdr.length = length; + r->hdr.ctdb_magic = CTDB_MAGIC; + r->hdr.ctdb_version = CTDB_VERSION; + r->hdr.operation = CTDB_REPLY_CALL; + r->hdr.reqid = c->hdr.reqid; + r->datalen = call.reply_data.dsize; + memcpy(&r->data[0], call.reply_data.dptr, r->datalen); + + res = ctdb_queue_send(client->queue, (uint8_t *)&r, r->hdr.length); + if (res != 0) { + printf("Failed to queue packet from daemon to client\n"); + } + talloc_free(r); +} + + +/* data contains a packet from the client */ +static void client_incoming_packet(struct ctdb_client *client, void *data, size_t nread) +{ + struct ctdb_req_header *hdr = data; + + if (hdr->ctdb_magic != CTDB_MAGIC) { + ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n"); + goto done; + } + + if (hdr->ctdb_version != CTDB_VERSION) { + ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); + goto done; + } + + switch (hdr->operation) { + case CTDB_REQ_CALL: + daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr); + break; + + case CTDB_REQ_REGISTER: + daemon_request_register_message_handler(client, + (struct ctdb_req_register *)hdr); + break; + case CTDB_REQ_MESSAGE: + daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr); + break; + + case CTDB_REQ_CONNECT_WAIT: + daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr); + break; + case CTDB_REQ_FETCH_LOCK: + daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr); + break; + case CTDB_REQ_STORE_UNLOCK: + daemon_request_store_unlock(client, (struct ctdb_req_store_unlock *)hdr); + break; + default: + printf("daemon: unrecognized operation:%d\n",hdr->operation); + } + +done: + talloc_free(data); +} + + +static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) +{ + struct ctdb_client *client = talloc_get_type(args, struct ctdb_client); + struct ctdb_req_header *hdr; + + if (cnt == 0) { + talloc_free(client); + return; + } + + if (cnt < sizeof(*hdr)) { + ctdb_set_error(client->ctdb, "Bad packet length %d\n", cnt); + return; + } + hdr = (struct ctdb_req_header *)data; + if (cnt != hdr->length) { + ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n", + hdr->length, cnt); + return; + } + + if (hdr->ctdb_magic != CTDB_MAGIC) { + ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n"); + return; + } + + if (hdr->ctdb_version != CTDB_VERSION) { + ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); + return; + } + + /* it is the responsibility of the incoming packet function to free 'data' */ + client_incoming_packet(client, data, cnt); +} + +static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private_data) +{ + struct sockaddr_in addr; + socklen_t len; + int fd; + struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context); + struct ctdb_client *client; + + memset(&addr, 0, sizeof(addr)); + len = sizeof(addr); + fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len); + if (fd == -1) { + return; + } + set_non_blocking(fd); + + client = talloc_zero(ctdb, struct ctdb_client); + client->ctdb = ctdb; + client->fd = fd; + + client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, + ctdb_client_read_cb, client); + + talloc_set_destructor(client, ctdb_client_destructor); +} + + + +static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private_data) +{ + int *fd = private_data; + int cnt; + char buf; + + /* XXX this is a good place to try doing some cleaning up before exiting */ + cnt = read(*fd, &buf, 1); + if (cnt==0) { + printf("parent process exited. filedescriptor dissappeared\n"); + exit(1); + } else { + printf("ctdb: did not expect data from parent process\n"); + exit(1); + } +} + + + +/* + create a unix domain socket and bind it + return a file descriptor open on the socket +*/ +static int ux_socket_bind(struct ctdb_context *ctdb) +{ + struct sockaddr_un addr; + + ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0); + if (ctdb->daemon.sd == -1) { + ctdb->daemon.sd = -1; + return -1; + } + + set_non_blocking(ctdb->daemon.sd); + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)); + + if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + close(ctdb->daemon.sd); + ctdb->daemon.sd = -1; + return -1; + } + listen(ctdb->daemon.sd, 1); + + return 0; +} + +/* + delete the socket on exit - called on destruction of autofree context + */ +static int unlink_destructor(const char *name) +{ + unlink(name); + return 0; +} + +/* + start the protocol going +*/ +int ctdbd_start(struct ctdb_context *ctdb) +{ + pid_t pid; + static int fd[2]; + int res; + struct fd_event *fde; + const char *domain_socket_name; + + /* generate a name to use for our local socket */ + ctdb->daemon.name = talloc_asprintf(ctdb, "%s.%s", CTDB_PATH, ctdb->address.address); + /* get rid of any old sockets */ + unlink(ctdb->daemon.name); + + /* create a unix domain stream socket to listen to */ + res = ux_socket_bind(ctdb); + if (res!=0) { + printf("Failed to open CTDB unix domain socket\n"); + exit(10); + } + + res = pipe(&fd[0]); + if (res) { + printf("Failed to open pipe for CTDB\n"); + exit(1); + } + pid = fork(); + if (pid==-1) { + printf("Failed to fork CTDB daemon\n"); + exit(1); + } + + if (pid) { + close(fd[0]); + close(ctdb->daemon.sd); + ctdb->daemon.sd = -1; + return 0; + } + + /* ensure the socket is deleted on exit of the daemon */ + domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name); + talloc_set_destructor(domain_socket_name, unlink_destructor); + + close(fd[1]); + ctdb_clear_flags(ctdb, CTDB_FLAG_DAEMON_MODE); + ctdb->ev = event_context_init(NULL); + fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]); + fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb); + ctdb_main_loop(ctdb); + + return 0; +} + +/* + allocate a packet for use in client<->daemon communication + */ +void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len) +{ + int size; + + size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1); + return talloc_size(ctdb, size); +} + +int ctdb_daemon_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, + ctdb_message_fn_t handler, + void *private_data) +{ + return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data); +} + diff --git a/source4/cluster/ctdb/common/ctdb_io.c b/source4/cluster/ctdb/common/ctdb_io.c new file mode 100644 index 0000000000..238f1701cf --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb_io.c @@ -0,0 +1,303 @@ +/* + ctdb database library + Utility functions to read/write blobs of data from a file descriptor + and handle the case where we might need multiple read/writes to get all the + data. + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/tdb/include/tdb.h" +#include "lib/events/events.h" +#include "lib/util/dlinklist.h" +#include "system/network.h" +#include "system/filesys.h" +#include "../include/ctdb_private.h" +#include "../include/ctdb.h" + +/* structures for packet queueing - see common/ctdb_io.c */ +struct ctdb_partial { + uint8_t *data; + uint32_t length; +}; + +struct ctdb_queue_pkt { + struct ctdb_queue_pkt *next, *prev; + uint8_t *data; + uint32_t length; +}; + +struct ctdb_queue { + struct ctdb_context *ctdb; + struct ctdb_partial partial; /* partial input packet */ + struct ctdb_queue_pkt *out_queue; + struct fd_event *fde; + int fd; + size_t alignment; + void *private_data; + ctdb_queue_cb_fn_t callback; +}; + + + +/* + called when an incoming connection is readable +*/ +static void queue_io_read(struct ctdb_queue *queue) +{ + int num_ready = 0; + ssize_t nread; + uint8_t *data, *data_base; + + if (ioctl(queue->fd, FIONREAD, &num_ready) != 0 || + num_ready == 0) { + /* the descriptor has been closed */ + goto failed; + } + + + queue->partial.data = talloc_realloc_size(queue, queue->partial.data, + num_ready + queue->partial.length); + + if (queue->partial.data == NULL) { + goto failed; + } + + nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready); + if (nread <= 0) { + goto failed; + } + + + data = queue->partial.data; + nread += queue->partial.length; + + queue->partial.data = NULL; + queue->partial.length = 0; + + if (nread >= 4 && *(uint32_t *)data == nread) { + /* it is the responsibility of the incoming packet + function to free 'data' */ + queue->callback(data, nread, queue->private_data); + return; + } + + data_base = data; + + while (nread >= 4 && *(uint32_t *)data <= nread) { + /* we have at least one packet */ + uint8_t *d2; + uint32_t len; + len = *(uint32_t *)data; + d2 = talloc_memdup(queue, data, len); + if (d2 == NULL) { + /* sigh */ + goto failed; + } + queue->callback(d2, len, queue->private_data); + data += len; + nread -= len; + } + + if (nread > 0) { + /* we have only part of a packet */ + if (data_base == data) { + queue->partial.data = data; + queue->partial.length = nread; + } else { + queue->partial.data = talloc_memdup(queue, data, nread); + if (queue->partial.data == NULL) { + goto failed; + } + queue->partial.length = nread; + talloc_free(data_base); + } + return; + } + + talloc_free(data_base); + return; + +failed: + queue->callback(NULL, 0, queue->private_data); +} + + +/* used when an event triggers a dead queue */ +static void queue_dead(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue); + queue->callback(NULL, 0, queue->private_data); +} + + +/* + called when an incoming connection is writeable +*/ +static void queue_io_write(struct ctdb_queue *queue) +{ + while (queue->out_queue) { + struct ctdb_queue_pkt *pkt = queue->out_queue; + ssize_t n; + + n = write(queue->fd, pkt->data, pkt->length); + + if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + event_add_timed(queue->ctdb->ev, queue, timeval_zero(), + queue_dead, queue); + EVENT_FD_NOT_WRITEABLE(queue->fde); + return; + } + if (n <= 0) return; + + if (n != pkt->length) { + pkt->length -= n; + pkt->data += n; + return; + } + + DLIST_REMOVE(queue->out_queue, pkt); + talloc_free(pkt); + } + + EVENT_FD_NOT_WRITEABLE(queue->fde); +} + +/* + called when an incoming connection is readable or writeable +*/ +static void queue_io_handler(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private_data) +{ + struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue); + + if (flags & EVENT_FD_READ) { + queue_io_read(queue); + } else { + queue_io_write(queue); + } +} + + +/* + queue a packet for sending +*/ +int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length) +{ + struct ctdb_queue_pkt *pkt; + uint32_t length2; + + /* enforce the length and alignment rules from the tcp packet allocator */ + length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1); + *(uint32_t *)data = length2; + + if (length2 != length) { + memset(data+length, 0, length2-length); + } + + /* if the queue is empty then try an immediate write, avoiding + queue overhead. This relies on non-blocking sockets */ + if (queue->out_queue == NULL && queue->fd != -1) { + ssize_t n = write(queue->fd, data, length2); + if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + event_add_timed(queue->ctdb->ev, queue, timeval_zero(), + queue_dead, queue); + /* yes, we report success, as the dead node is + handled via a separate event */ + return 0; + } + if (n > 0) { + data += n; + length2 -= n; + } + if (length2 == 0) return 0; + } + + pkt = talloc(queue, struct ctdb_queue_pkt); + CTDB_NO_MEMORY(queue->ctdb, pkt); + + pkt->data = talloc_memdup(pkt, data, length2); + CTDB_NO_MEMORY(queue->ctdb, pkt->data); + + pkt->length = length2; + + if (queue->out_queue == NULL && queue->fd != -1) { + EVENT_FD_WRITEABLE(queue->fde); + } + + DLIST_ADD_END(queue->out_queue, pkt, struct ctdb_queue_pkt *); + + return 0; +} + + +/* + setup the fd used by the queue + */ +int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd) +{ + queue->fd = fd; + talloc_free(queue->fde); + queue->fde = NULL; + + if (fd != -1) { + queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ, + queue_io_handler, queue); + if (queue->fde == NULL) { + return -1; + } + + if (queue->out_queue) { + EVENT_FD_WRITEABLE(queue->fde); + } + } + + return 0; +} + + + +/* + setup a packet queue on a socket + */ +struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb, + TALLOC_CTX *mem_ctx, int fd, int alignment, + + ctdb_queue_cb_fn_t callback, + void *private_data) +{ + struct ctdb_queue *queue; + + queue = talloc_zero(mem_ctx, struct ctdb_queue); + CTDB_NO_MEMORY_NULL(ctdb, queue); + + queue->ctdb = ctdb; + queue->fd = fd; + queue->alignment = alignment; + queue->private_data = private_data; + queue->callback = callback; + if (fd != -1) { + if (ctdb_queue_set_fd(queue, fd) != 0) { + talloc_free(queue); + return NULL; + } + } + + return queue; +} diff --git a/source4/cluster/ctdb/common/ctdb_ltdb.c b/source4/cluster/ctdb/common/ctdb_ltdb.c index 84c3bd49da..785ccad9b3 100644 --- a/source4/cluster/ctdb/common/ctdb_ltdb.c +++ b/source4/cluster/ctdb/common/ctdb_ltdb.c @@ -197,3 +197,21 @@ int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, return ret; } + + +/* + lock a record in the ltdb, given a key + */ +int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key) +{ + return tdb_chainlock(ctdb_db->ltdb->tdb, key); +} + +/* + unlock a record in the ltdb, given a key + */ +int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key) +{ + return tdb_chainunlock(ctdb_db->ltdb->tdb, key); +} + diff --git a/source4/cluster/ctdb/common/ctdb_message.c b/source4/cluster/ctdb/common/ctdb_message.c index 300bee8339..dba15aecb9 100644 --- a/source4/cluster/ctdb/common/ctdb_message.c +++ b/source4/cluster/ctdb/common/ctdb_message.c @@ -27,36 +27,104 @@ #include "system/network.h" #include "system/filesys.h" #include "../include/ctdb_private.h" +#include "lib/util/dlinklist.h" + +/* + this dispatches the messages to the registered ctdb message handler +*/ +static int ctdb_dispatch_message(struct ctdb_context *ctdb, uint32_t srvid, TDB_DATA data) +{ + struct ctdb_message_list *ml; + + /* XXX we need a must faster way of finding the matching srvid + - maybe a tree? */ + for (ml=ctdb->message_list;ml;ml=ml->next) { + if (ml->srvid == srvid) break; + } + if (ml == NULL) { + printf("daemon vnn:%d no msg handler for srvid=%u\n", ctdb_get_vnn(ctdb), srvid); + /* no registered message handler */ + return -1; + } + + ml->message_handler(ctdb, srvid, data, ml->message_private); + return 0; +} /* called when a CTDB_REQ_MESSAGE packet comes in - - this dispatches the messages to the registered ctdb message handler */ void ctdb_request_message(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { struct ctdb_req_message *c = (struct ctdb_req_message *)hdr; TDB_DATA data; - if (ctdb->message_handler == NULL) { - /* no registered message handler */ - return; - } + data.dptr = &c->data[0]; data.dsize = c->datalen; - ctdb->message_handler(ctdb, c->srvid, data, ctdb->message_private); + + ctdb_dispatch_message(ctdb, c->srvid, data); +} + +/* + this local messaging handler is ugly, but is needed to prevent + recursion in ctdb_send_message() when the destination node is the + same as the source node + */ +struct ctdb_local_message { + struct ctdb_context *ctdb; + uint32_t srvid; + TDB_DATA data; +}; + +static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ctdb_local_message *m = talloc_get_type(private_data, + struct ctdb_local_message); + int res; + + res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data); + if (res != 0) { + printf("Failed to dispatch message for srvid=%u\n", m->srvid); + } + talloc_free(m); } +static int ctdb_local_message(struct ctdb_context *ctdb, uint32_t srvid, TDB_DATA data) +{ + struct ctdb_local_message *m; + m = talloc(ctdb, struct ctdb_local_message); + CTDB_NO_MEMORY(ctdb, m); + + m->ctdb = ctdb; + m->srvid = srvid; + m->data = data; + m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize); + if (m->data.dptr == NULL) { + talloc_free(m); + return -1; + } + + /* this needs to be done as an event to prevent recursion */ + event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m); + return 0; +} /* send a ctdb message */ -int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn, - uint32_t srvid, TDB_DATA data) +int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn, + uint32_t srvid, TDB_DATA data) { struct ctdb_req_message *r; int len; + /* see if this is a message to ourselves */ + if (vnn == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { + return ctdb_local_message(ctdb, srvid, data); + } + len = offsetof(struct ctdb_req_message, data) + data.dsize; r = ctdb->methods->allocate_pkt(ctdb, len); CTDB_NO_MEMORY(ctdb, r); @@ -80,13 +148,49 @@ int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn, } /* - setup handler for receipt of ctdb messages from ctdb_send_message() + send a ctdb message */ -int ctdb_set_message_handler(struct ctdb_context *ctdb, ctdb_message_fn_t handler, - void *private) +int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn, + uint32_t srvid, TDB_DATA data) +{ + if (ctdb->flags & CTDB_FLAG_DAEMON_MODE) { + return ctdb_client_send_message(ctdb, vnn, srvid, data); + } + return ctdb_daemon_send_message(ctdb, vnn, srvid, data); +} + + +/* + when a client goes away, we need to remove its srvid handler from the list + */ +static int message_handler_destructor(struct ctdb_message_list *m) { - ctdb->message_handler = handler; - ctdb->message_private = private; + DLIST_REMOVE(m->ctdb->message_list, m); return 0; } +/* + setup handler for receipt of ctdb messages from ctdb_send_message() +*/ +int ctdb_register_message_handler(struct ctdb_context *ctdb, + TALLOC_CTX *mem_ctx, + uint32_t srvid, + ctdb_message_fn_t handler, + void *private_data) +{ + struct ctdb_message_list *m; + + m = talloc(mem_ctx, struct ctdb_message_list); + CTDB_NO_MEMORY(ctdb, m); + + m->ctdb = ctdb; + m->srvid = srvid; + m->message_handler = handler; + m->message_private = private_data; + + DLIST_ADD(ctdb->message_list, m); + + talloc_set_destructor(m, message_handler_destructor); + + return 0; +} diff --git a/source4/cluster/ctdb/config.guess b/source4/cluster/ctdb/config.guess new file mode 100755 index 0000000000..ad5281e66e --- /dev/null +++ b/source4/cluster/ctdb/config.guess @@ -0,0 +1,1466 @@ +#! /bin/sh +# Attempt to guess a canonical system name. +# Copyright (C) 1992, 1993, 1994, 1995, 1996, 1997, 1998, 1999, +# 2000, 2001, 2002, 2003, 2004, 2005 Free Software Foundation, Inc. + +timestamp='2005-08-03' + +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street - Fifth Floor, Boston, MA +# 02110-1301, USA. +# +# As a special exception to the GNU General Public License, if you +# distribute this file as part of a program that contains a +# configuration script generated by Autoconf, you may include it under +# the same distribution terms that you use for the rest of that program. + + +# Originally written by Per Bothner . +# Please send patches to . Submit a context +# diff and a properly formatted ChangeLog entry. +# +# This script attempts to guess a canonical system name similar to +# config.sub. If it succeeds, it prints the system name on stdout, and +# exits with 0. Otherwise, it exits with 1. +# +# The plan is that this can be called by configure scripts if you +# don't specify an explicit build system type. + +me=`echo "$0" | sed -e 's,.*/,,'` + +usage="\ +Usage: $0 [OPTION] + +Output the configuration name of the system \`$me' is run on. + +Operation modes: + -h, --help print this help, then exit + -t, --time-stamp print date of last modification, then exit + -v, --version print version number, then exit + +Report bugs and patches to ." + +version="\ +GNU config.guess ($timestamp) + +Originally written by Per Bothner. +Copyright (C) 1992, 1993, 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005 +Free Software Foundation, Inc. + +This is free software; see the source for copying conditions. There is NO +warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE." + +help=" +Try \`$me --help' for more information." + +# Parse command line +while test $# -gt 0 ; do + case $1 in + --time-stamp | --time* | -t ) + echo "$timestamp" ; exit ;; + --version | -v ) + echo "$version" ; exit ;; + --help | --h* | -h ) + echo "$usage"; exit ;; + -- ) # Stop option processing + shift; break ;; + - ) # Use stdin as input. + break ;; + -* ) + echo "$me: invalid option $1$help" >&2 + exit 1 ;; + * ) + break ;; + esac +done + +if test $# != 0; then + echo "$me: too many arguments$help" >&2 + exit 1 +fi + +trap 'exit 1' 1 2 15 + +# CC_FOR_BUILD -- compiler used by this script. Note that the use of a +# compiler to aid in system detection is discouraged as it requires +# temporary files to be created and, as you can see below, it is a +# headache to deal with in a portable fashion. + +# Historically, `CC_FOR_BUILD' used to be named `HOST_CC'. We still +# use `HOST_CC' if defined, but it is deprecated. + +# Portable tmp directory creation inspired by the Autoconf team. + +set_cc_for_build=' +trap "exitcode=\$?; (rm -f \$tmpfiles 2>/dev/null; rmdir \$tmp 2>/dev/null) && exit \$exitcode" 0 ; +trap "rm -f \$tmpfiles 2>/dev/null; rmdir \$tmp 2>/dev/null; exit 1" 1 2 13 15 ; +: ${TMPDIR=/tmp} ; + { tmp=`(umask 077 && mktemp -d -q "$TMPDIR/cgXXXXXX") 2>/dev/null` && test -n "$tmp" && test -d "$tmp" ; } || + { test -n "$RANDOM" && tmp=$TMPDIR/cg$$-$RANDOM && (umask 077 && mkdir $tmp) ; } || + { tmp=$TMPDIR/cg-$$ && (umask 077 && mkdir $tmp) && echo "Warning: creating insecure temp directory" >&2 ; } || + { echo "$me: cannot create a temporary directory in $TMPDIR" >&2 ; exit 1 ; } ; +dummy=$tmp/dummy ; +tmpfiles="$dummy.c $dummy.o $dummy.rel $dummy" ; +case $CC_FOR_BUILD,$HOST_CC,$CC in + ,,) echo "int x;" > $dummy.c ; + for c in cc gcc c89 c99 ; do + if ($c -c -o $dummy.o $dummy.c) >/dev/null 2>&1 ; then + CC_FOR_BUILD="$c"; break ; + fi ; + done ; + if test x"$CC_FOR_BUILD" = x ; then + CC_FOR_BUILD=no_compiler_found ; + fi + ;; + ,,*) CC_FOR_BUILD=$CC ;; + ,*,*) CC_FOR_BUILD=$HOST_CC ;; +esac ; set_cc_for_build= ;' + +# This is needed to find uname on a Pyramid OSx when run in the BSD universe. +# (ghazi@noc.rutgers.edu 1994-08-24) +if (test -f /.attbin/uname) >/dev/null 2>&1 ; then + PATH=$PATH:/.attbin ; export PATH +fi + +UNAME_MACHINE=`(uname -m) 2>/dev/null` || UNAME_MACHINE=unknown +UNAME_RELEASE=`(uname -r) 2>/dev/null` || UNAME_RELEASE=unknown +UNAME_SYSTEM=`(uname -s) 2>/dev/null` || UNAME_SYSTEM=unknown +UNAME_VERSION=`(uname -v) 2>/dev/null` || UNAME_VERSION=unknown + +# Note: order is significant - the case branches are not exclusive. + +case "${UNAME_MACHINE}:${UNAME_SYSTEM}:${UNAME_RELEASE}:${UNAME_VERSION}" in + *:NetBSD:*:*) + # NetBSD (nbsd) targets should (where applicable) match one or + # more of the tupples: *-*-netbsdelf*, *-*-netbsdaout*, + # *-*-netbsdecoff* and *-*-netbsd*. For targets that recently + # switched to ELF, *-*-netbsd* would select the old + # object file format. This provides both forward + # compatibility and a consistent mechanism for selecting the + # object file format. + # + # Note: NetBSD doesn't particularly care about the vendor + # portion of the name. We always set it to "unknown". + sysctl="sysctl -n hw.machine_arch" + UNAME_MACHINE_ARCH=`(/sbin/$sysctl 2>/dev/null || \ + /usr/sbin/$sysctl 2>/dev/null || echo unknown)` + case "${UNAME_MACHINE_ARCH}" in + armeb) machine=armeb-unknown ;; + arm*) machine=arm-unknown ;; + sh3el) machine=shl-unknown ;; + sh3eb) machine=sh-unknown ;; + *) machine=${UNAME_MACHINE_ARCH}-unknown ;; + esac + # The Operating System including object format, if it has switched + # to ELF recently, or will in the future. + case "${UNAME_MACHINE_ARCH}" in + arm*|i386|m68k|ns32k|sh3*|sparc|vax) + eval $set_cc_for_build + if echo __ELF__ | $CC_FOR_BUILD -E - 2>/dev/null \ + | grep __ELF__ >/dev/null + then + # Once all utilities can be ECOFF (netbsdecoff) or a.out (netbsdaout). + # Return netbsd for either. FIX? + os=netbsd + else + os=netbsdelf + fi + ;; + *) + os=netbsd + ;; + esac + # The OS release + # Debian GNU/NetBSD machines have a different userland, and + # thus, need a distinct triplet. However, they do not need + # kernel version information, so it can be replaced with a + # suitable tag, in the style of linux-gnu. + case "${UNAME_VERSION}" in + Debian*) + release='-gnu' + ;; + *) + release=`echo ${UNAME_RELEASE}|sed -e 's/[-_].*/\./'` + ;; + esac + # Since CPU_TYPE-MANUFACTURER-KERNEL-OPERATING_SYSTEM: + # contains redundant information, the shorter form: + # CPU_TYPE-MANUFACTURER-OPERATING_SYSTEM is used. + echo "${machine}-${os}${release}" + exit ;; + *:OpenBSD:*:*) + UNAME_MACHINE_ARCH=`arch | sed 's/OpenBSD.//'` + echo ${UNAME_MACHINE_ARCH}-unknown-openbsd${UNAME_RELEASE} + exit ;; + *:ekkoBSD:*:*) + echo ${UNAME_MACHINE}-unknown-ekkobsd${UNAME_RELEASE} + exit ;; + macppc:MirBSD:*:*) + echo powerppc-unknown-mirbsd${UNAME_RELEASE} + exit ;; + *:MirBSD:*:*) + echo ${UNAME_MACHINE}-unknown-mirbsd${UNAME_RELEASE} + exit ;; + alpha:OSF1:*:*) + case $UNAME_RELEASE in + *4.0) + UNAME_RELEASE=`/usr/sbin/sizer -v | awk '{print $3}'` + ;; + *5.*) + UNAME_RELEASE=`/usr/sbin/sizer -v | awk '{print $4}'` + ;; + esac + # According to Compaq, /usr/sbin/psrinfo has been available on + # OSF/1 and Tru64 systems produced since 1995. I hope that + # covers most systems running today. This code pipes the CPU + # types through head -n 1, so we only detect the type of CPU 0. + ALPHA_CPU_TYPE=`/usr/sbin/psrinfo -v | sed -n -e 's/^ The alpha \(.*\) processor.*$/\1/p' | head -n 1` + case "$ALPHA_CPU_TYPE" in + "EV4 (21064)") + UNAME_MACHINE="alpha" ;; + "EV4.5 (21064)") + UNAME_MACHINE="alpha" ;; + "LCA4 (21066/21068)") + UNAME_MACHINE="alpha" ;; + "EV5 (21164)") + UNAME_MACHINE="alphaev5" ;; + "EV5.6 (21164A)") + UNAME_MACHINE="alphaev56" ;; + "EV5.6 (21164PC)") + UNAME_MACHINE="alphapca56" ;; + "EV5.7 (21164PC)") + UNAME_MACHINE="alphapca57" ;; + "EV6 (21264)") + UNAME_MACHINE="alphaev6" ;; + "EV6.7 (21264A)") + UNAME_MACHINE="alphaev67" ;; + "EV6.8CB (21264C)") + UNAME_MACHINE="alphaev68" ;; + "EV6.8AL (21264B)") + UNAME_MACHINE="alphaev68" ;; + "EV6.8CX (21264D)") + UNAME_MACHINE="alphaev68" ;; + "EV6.9A (21264/EV69A)") + UNAME_MACHINE="alphaev69" ;; + "EV7 (21364)") + UNAME_MACHINE="alphaev7" ;; + "EV7.9 (21364A)") + UNAME_MACHINE="alphaev79" ;; + esac + # A Pn.n version is a patched version. + # A Vn.n version is a released version. + # A Tn.n version is a released field test version. + # A Xn.n version is an unreleased experimental baselevel. + # 1.2 uses "1.2" for uname -r. + echo ${UNAME_MACHINE}-dec-osf`echo ${UNAME_RELEASE} | sed -e 's/^[PVTX]//' | tr 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' 'abcdefghijklmnopqrstuvwxyz'` + exit ;; + Alpha\ *:Windows_NT*:*) + # How do we know it's Interix rather than the generic POSIX subsystem? + # Should we change UNAME_MACHINE based on the output of uname instead + # of the specific Alpha model? + echo alpha-pc-interix + exit ;; + 21064:Windows_NT:50:3) + echo alpha-dec-winnt3.5 + exit ;; + Amiga*:UNIX_System_V:4.0:*) + echo m68k-unknown-sysv4 + exit ;; + *:[Aa]miga[Oo][Ss]:*:*) + echo ${UNAME_MACHINE}-unknown-amigaos + exit ;; + *:[Mm]orph[Oo][Ss]:*:*) + echo ${UNAME_MACHINE}-unknown-morphos + exit ;; + *:OS/390:*:*) + echo i370-ibm-openedition + exit ;; + *:z/VM:*:*) + echo s390-ibm-zvmoe + exit ;; + *:OS400:*:*) + echo powerpc-ibm-os400 + exit ;; + arm:RISC*:1.[012]*:*|arm:riscix:1.[012]*:*) + echo arm-acorn-riscix${UNAME_RELEASE} + exit ;; + arm:riscos:*:*|arm:RISCOS:*:*) + echo arm-unknown-riscos + exit ;; + SR2?01:HI-UX/MPP:*:* | SR8000:HI-UX/MPP:*:*) + echo hppa1.1-hitachi-hiuxmpp + exit ;; + Pyramid*:OSx*:*:* | MIS*:OSx*:*:* | MIS*:SMP_DC-OSx*:*:*) + # akee@wpdis03.wpafb.af.mil (Earle F. Ake) contributed MIS and NILE. + if test "`(/bin/universe) 2>/dev/null`" = att ; then + echo pyramid-pyramid-sysv3 + else + echo pyramid-pyramid-bsd + fi + exit ;; + NILE*:*:*:dcosx) + echo pyramid-pyramid-svr4 + exit ;; + DRS?6000:unix:4.0:6*) + echo sparc-icl-nx6 + exit ;; + DRS?6000:UNIX_SV:4.2*:7* | DRS?6000:isis:4.2*:7*) + case `/usr/bin/uname -p` in + sparc) echo sparc-icl-nx7; exit ;; + esac ;; + sun4H:SunOS:5.*:*) + echo sparc-hal-solaris2`echo ${UNAME_RELEASE}|sed -e 's/[^.]*//'` + exit ;; + sun4*:SunOS:5.*:* | tadpole*:SunOS:5.*:*) + echo sparc-sun-solaris2`echo ${UNAME_RELEASE}|sed -e 's/[^.]*//'` + exit ;; + i86pc:SunOS:5.*:*) + echo i386-pc-solaris2`echo ${UNAME_RELEASE}|sed -e 's/[^.]*//'` + exit ;; + sun4*:SunOS:6*:*) + # According to config.sub, this is the proper way to canonicalize + # SunOS6. Hard to guess exactly what SunOS6 will be like, but + # it's likely to be more like Solaris than SunOS4. + echo sparc-sun-solaris3`echo ${UNAME_RELEASE}|sed -e 's/[^.]*//'` + exit ;; + sun4*:SunOS:*:*) + case "`/usr/bin/arch -k`" in + Series*|S4*) + UNAME_RELEASE=`uname -v` + ;; + esac + # Japanese Language versions have a version number like `4.1.3-JL'. + echo sparc-sun-sunos`echo ${UNAME_RELEASE}|sed -e 's/-/_/'` + exit ;; + sun3*:SunOS:*:*) + echo m68k-sun-sunos${UNAME_RELEASE} + exit ;; + sun*:*:4.2BSD:*) + UNAME_RELEASE=`(sed 1q /etc/motd | awk '{print substr($5,1,3)}') 2>/dev/null` + test "x${UNAME_RELEASE}" = "x" && UNAME_RELEASE=3 + case "`/bin/arch`" in + sun3) + echo m68k-sun-sunos${UNAME_RELEASE} + ;; + sun4) + echo sparc-sun-sunos${UNAME_RELEASE} + ;; + esac + exit ;; + aushp:SunOS:*:*) + echo sparc-auspex-sunos${UNAME_RELEASE} + exit ;; + # The situation for MiNT is a little confusing. The machine name + # can be virtually everything (everything which is not + # "atarist" or "atariste" at least should have a processor + # > m68000). The system name ranges from "MiNT" over "FreeMiNT" + # to the lowercase version "mint" (or "freemint"). Finally + # the system name "TOS" denotes a system which is actually not + # MiNT. But MiNT is downward compatible to TOS, so this should + # be no problem. + atarist[e]:*MiNT:*:* | atarist[e]:*mint:*:* | atarist[e]:*TOS:*:*) + echo m68k-atari-mint${UNAME_RELEASE} + exit ;; + atari*:*MiNT:*:* | atari*:*mint:*:* | atarist[e]:*TOS:*:*) + echo m68k-atari-mint${UNAME_RELEASE} + exit ;; + *falcon*:*MiNT:*:* | *falcon*:*mint:*:* | *falcon*:*TOS:*:*) + echo m68k-atari-mint${UNAME_RELEASE} + exit ;; + milan*:*MiNT:*:* | milan*:*mint:*:* | *milan*:*TOS:*:*) + echo m68k-milan-mint${UNAME_RELEASE} + exit ;; + hades*:*MiNT:*:* | hades*:*mint:*:* | *hades*:*TOS:*:*) + echo m68k-hades-mint${UNAME_RELEASE} + exit ;; + *:*MiNT:*:* | *:*mint:*:* | *:*TOS:*:*) + echo m68k-unknown-mint${UNAME_RELEASE} + exit ;; + m68k:machten:*:*) + echo m68k-apple-machten${UNAME_RELEASE} + exit ;; + powerpc:machten:*:*) + echo powerpc-apple-machten${UNAME_RELEASE} + exit ;; + RISC*:Mach:*:*) + echo mips-dec-mach_bsd4.3 + exit ;; + RISC*:ULTRIX:*:*) + echo mips-dec-ultrix${UNAME_RELEASE} + exit ;; + VAX*:ULTRIX*:*:*) + echo vax-dec-ultrix${UNAME_RELEASE} + exit ;; + 2020:CLIX:*:* | 2430:CLIX:*:*) + echo clipper-intergraph-clix${UNAME_RELEASE} + exit ;; + mips:*:*:UMIPS | mips:*:*:RISCos) + eval $set_cc_for_build + sed 's/^ //' << EOF >$dummy.c +#ifdef __cplusplus +#include /* for printf() prototype */ + int main (int argc, char *argv[]) { +#else + int main (argc, argv) int argc; char *argv[]; { +#endif + #if defined (host_mips) && defined (MIPSEB) + #if defined (SYSTYPE_SYSV) + printf ("mips-mips-riscos%ssysv\n", argv[1]); exit (0); + #endif + #if defined (SYSTYPE_SVR4) + printf ("mips-mips-riscos%ssvr4\n", argv[1]); exit (0); + #endif + #if defined (SYSTYPE_BSD43) || defined(SYSTYPE_BSD) + printf ("mips-mips-riscos%sbsd\n", argv[1]); exit (0); + #endif + #endif + exit (-1); + } +EOF + $CC_FOR_BUILD -o $dummy $dummy.c && + dummyarg=`echo "${UNAME_RELEASE}" | sed -n 's/\([0-9]*\).*/\1/p'` && + SYSTEM_NAME=`$dummy $dummyarg` && + { echo "$SYSTEM_NAME"; exit; } + echo mips-mips-riscos${UNAME_RELEASE} + exit ;; + Motorola:PowerMAX_OS:*:*) + echo powerpc-motorola-powermax + exit ;; + Motorola:*:4.3:PL8-*) + echo powerpc-harris-powermax + exit ;; + Night_Hawk:*:*:PowerMAX_OS | Synergy:PowerMAX_OS:*:*) + echo powerpc-harris-powermax + exit ;; + Night_Hawk:Power_UNIX:*:*) + echo powerpc-harris-powerunix + exit ;; + m88k:CX/UX:7*:*) + echo m88k-harris-cxux7 + exit ;; + m88k:*:4*:R4*) + echo m88k-motorola-sysv4 + exit ;; + m88k:*:3*:R3*) + echo m88k-motorola-sysv3 + exit ;; + AViiON:dgux:*:*) + # DG/UX returns AViiON for all architectures + UNAME_PROCESSOR=`/usr/bin/uname -p` + if [ $UNAME_PROCESSOR = mc88100 ] || [ $UNAME_PROCESSOR = mc88110 ] + then + if [ ${TARGET_BINARY_INTERFACE}x = m88kdguxelfx ] || \ + [ ${TARGET_BINARY_INTERFACE}x = x ] + then + echo m88k-dg-dgux${UNAME_RELEASE} + else + echo m88k-dg-dguxbcs${UNAME_RELEASE} + fi + else + echo i586-dg-dgux${UNAME_RELEASE} + fi + exit ;; + M88*:DolphinOS:*:*) # DolphinOS (SVR3) + echo m88k-dolphin-sysv3 + exit ;; + M88*:*:R3*:*) + # Delta 88k system running SVR3 + echo m88k-motorola-sysv3 + exit ;; + XD88*:*:*:*) # Tektronix XD88 system running UTekV (SVR3) + echo m88k-tektronix-sysv3 + exit ;; + Tek43[0-9][0-9]:UTek:*:*) # Tektronix 4300 system running UTek (BSD) + echo m68k-tektronix-bsd + exit ;; + *:IRIX*:*:*) + echo mips-sgi-irix`echo ${UNAME_RELEASE}|sed -e 's/-/_/g'` + exit ;; + ????????:AIX?:[12].1:2) # AIX 2.2.1 or AIX 2.1.1 is RT/PC AIX. + echo romp-ibm-aix # uname -m gives an 8 hex-code CPU id + exit ;; # Note that: echo "'`uname -s`'" gives 'AIX ' + i*86:AIX:*:*) + echo i386-ibm-aix + exit ;; + ia64:AIX:*:*) + if [ -x /usr/bin/oslevel ] ; then + IBM_REV=`/usr/bin/oslevel` + else + IBM_REV=${UNAME_VERSION}.${UNAME_RELEASE} + fi + echo ${UNAME_MACHINE}-ibm-aix${IBM_REV} + exit ;; + *:AIX:2:3) + if grep bos325 /usr/include/stdio.h >/dev/null 2>&1; then + eval $set_cc_for_build + sed 's/^ //' << EOF >$dummy.c + #include + + main() + { + if (!__power_pc()) + exit(1); + puts("powerpc-ibm-aix3.2.5"); + exit(0); + } +EOF + if $CC_FOR_BUILD -o $dummy $dummy.c && SYSTEM_NAME=`$dummy` + then + echo "$SYSTEM_NAME" + else + echo rs6000-ibm-aix3.2.5 + fi + elif grep bos324 /usr/include/stdio.h >/dev/null 2>&1; then + echo rs6000-ibm-aix3.2.4 + else + echo rs6000-ibm-aix3.2 + fi + exit ;; + *:AIX:*:[45]) + IBM_CPU_ID=`/usr/sbin/lsdev -C -c processor -S available | sed 1q | awk '{ print $1 }'` + if /usr/sbin/lsattr -El ${IBM_CPU_ID} | grep ' POWER' >/dev/null 2>&1; then + IBM_ARCH=rs6000 + else + IBM_ARCH=powerpc + fi + if [ -x /usr/bin/oslevel ] ; then + IBM_REV=`/usr/bin/oslevel` + else + IBM_REV=${UNAME_VERSION}.${UNAME_RELEASE} + fi + echo ${IBM_ARCH}-ibm-aix${IBM_REV} + exit ;; + *:AIX:*:*) + echo rs6000-ibm-aix + exit ;; + ibmrt:4.4BSD:*|romp-ibm:BSD:*) + echo romp-ibm-bsd4.4 + exit ;; + ibmrt:*BSD:*|romp-ibm:BSD:*) # covers RT/PC BSD and + echo romp-ibm-bsd${UNAME_RELEASE} # 4.3 with uname added to + exit ;; # report: romp-ibm BSD 4.3 + *:BOSX:*:*) + echo rs6000-bull-bosx + exit ;; + DPX/2?00:B.O.S.:*:*) + echo m68k-bull-sysv3 + exit ;; + 9000/[34]??:4.3bsd:1.*:*) + echo m68k-hp-bsd + exit ;; + hp300:4.4BSD:*:* | 9000/[34]??:4.3bsd:2.*:*) + echo m68k-hp-bsd4.4 + exit ;; + 9000/[34678]??:HP-UX:*:*) + HPUX_REV=`echo ${UNAME_RELEASE}|sed -e 's/[^.]*.[0B]*//'` + case "${UNAME_MACHINE}" in + 9000/31? ) HP_ARCH=m68000 ;; + 9000/[34]?? ) HP_ARCH=m68k ;; + 9000/[678][0-9][0-9]) + if [ -x /usr/bin/getconf ]; then + sc_cpu_version=`/usr/bin/getconf SC_CPU_VERSION 2>/dev/null` + sc_kernel_bits=`/usr/bin/getconf SC_KERNEL_BITS 2>/dev/null` + case "${sc_cpu_version}" in + 523) HP_ARCH="hppa1.0" ;; # CPU_PA_RISC1_0 + 528) HP_ARCH="hppa1.1" ;; # CPU_PA_RISC1_1 + 532) # CPU_PA_RISC2_0 + case "${sc_kernel_bits}" in + 32) HP_ARCH="hppa2.0n" ;; + 64) HP_ARCH="hppa2.0w" ;; + '') HP_ARCH="hppa2.0" ;; # HP-UX 10.20 + esac ;; + esac + fi + if [ "${HP_ARCH}" = "" ]; then + eval $set_cc_for_build + sed 's/^ //' << EOF >$dummy.c + + #define _HPUX_SOURCE + #include + #include + + int main () + { + #if defined(_SC_KERNEL_BITS) + long bits = sysconf(_SC_KERNEL_BITS); + #endif + long cpu = sysconf (_SC_CPU_VERSION); + + switch (cpu) + { + case CPU_PA_RISC1_0: puts ("hppa1.0"); break; + case CPU_PA_RISC1_1: puts ("hppa1.1"); break; + case CPU_PA_RISC2_0: + #if defined(_SC_KERNEL_BITS) + switch (bits) + { + case 64: puts ("hppa2.0w"); break; + case 32: puts ("hppa2.0n"); break; + default: puts ("hppa2.0"); break; + } break; + #else /* !defined(_SC_KERNEL_BITS) */ + puts ("hppa2.0"); break; + #endif + default: puts ("hppa1.0"); break; + } + exit (0); + } +EOF + (CCOPTS= $CC_FOR_BUILD -o $dummy $dummy.c 2>/dev/null) && HP_ARCH=`$dummy` + test -z "$HP_ARCH" && HP_ARCH=hppa + fi ;; + esac + if [ ${HP_ARCH} = "hppa2.0w" ] + then + eval $set_cc_for_build + + # hppa2.0w-hp-hpux* has a 64-bit kernel and a compiler generating + # 32-bit code. hppa64-hp-hpux* has the same kernel and a compiler + # generating 64-bit code. GNU and HP use different nomenclature: + # + # $ CC_FOR_BUILD=cc ./config.guess + # => hppa2.0w-hp-hpux11.23 + # $ CC_FOR_BUILD="cc +DA2.0w" ./config.guess + # => hppa64-hp-hpux11.23 + + if echo __LP64__ | (CCOPTS= $CC_FOR_BUILD -E - 2>/dev/null) | + grep __LP64__ >/dev/null + then + HP_ARCH="hppa2.0w" + else + HP_ARCH="hppa64" + fi + fi + echo ${HP_ARCH}-hp-hpux${HPUX_REV} + exit ;; + ia64:HP-UX:*:*) + HPUX_REV=`echo ${UNAME_RELEASE}|sed -e 's/[^.]*.[0B]*//'` + echo ia64-hp-hpux${HPUX_REV} + exit ;; + 3050*:HI-UX:*:*) + eval $set_cc_for_build + sed 's/^ //' << EOF >$dummy.c + #include + int + main () + { + long cpu = sysconf (_SC_CPU_VERSION); + /* The order matters, because CPU_IS_HP_MC68K erroneously returns + true for CPU_PA_RISC1_0. CPU_IS_PA_RISC returns correct + results, however. */ + if (CPU_IS_PA_RISC (cpu)) + { + switch (cpu) + { + case CPU_PA_RISC1_0: puts ("hppa1.0-hitachi-hiuxwe2"); break; + case CPU_PA_RISC1_1: puts ("hppa1.1-hitachi-hiuxwe2"); break; + case CPU_PA_RISC2_0: puts ("hppa2.0-hitachi-hiuxwe2"); break; + default: puts ("hppa-hitachi-hiuxwe2"); break; + } + } + else if (CPU_IS_HP_MC68K (cpu)) + puts ("m68k-hitachi-hiuxwe2"); + else puts ("unknown-hitachi-hiuxwe2"); + exit (0); + } +EOF + $CC_FOR_BUILD -o $dummy $dummy.c && SYSTEM_NAME=`$dummy` && + { echo "$SYSTEM_NAME"; exit; } + echo unknown-hitachi-hiuxwe2 + exit ;; + 9000/7??:4.3bsd:*:* | 9000/8?[79]:4.3bsd:*:* ) + echo hppa1.1-hp-bsd + exit ;; + 9000/8??:4.3bsd:*:*) + echo hppa1.0-hp-bsd + exit ;; + *9??*:MPE/iX:*:* | *3000*:MPE/iX:*:*) + echo hppa1.0-hp-mpeix + exit ;; + hp7??:OSF1:*:* | hp8?[79]:OSF1:*:* ) + echo hppa1.1-hp-osf + exit ;; + hp8??:OSF1:*:*) + echo hppa1.0-hp-osf + exit ;; + i*86:OSF1:*:*) + if [ -x /usr/sbin/sysversion ] ; then + echo ${UNAME_MACHINE}-unknown-osf1mk + else + echo ${UNAME_MACHINE}-unknown-osf1 + fi + exit ;; + parisc*:Lites*:*:*) + echo hppa1.1-hp-lites + exit ;; + C1*:ConvexOS:*:* | convex:ConvexOS:C1*:*) + echo c1-convex-bsd + exit ;; + C2*:ConvexOS:*:* | convex:ConvexOS:C2*:*) + if getsysinfo -f scalar_acc + then echo c32-convex-bsd + else echo c2-convex-bsd + fi + exit ;; + C34*:ConvexOS:*:* | convex:ConvexOS:C34*:*) + echo c34-convex-bsd + exit ;; + C38*:ConvexOS:*:* | convex:ConvexOS:C38*:*) + echo c38-convex-bsd + exit ;; + C4*:ConvexOS:*:* | convex:ConvexOS:C4*:*) + echo c4-convex-bsd + exit ;; + CRAY*Y-MP:*:*:*) + echo ymp-cray-unicos${UNAME_RELEASE} | sed -e 's/\.[^.]*$/.X/' + exit ;; + CRAY*[A-Z]90:*:*:*) + echo ${UNAME_MACHINE}-cray-unicos${UNAME_RELEASE} \ + | sed -e 's/CRAY.*\([A-Z]90\)/\1/' \ + -e y/ABCDEFGHIJKLMNOPQRSTUVWXYZ/abcdefghijklmnopqrstuvwxyz/ \ + -e 's/\.[^.]*$/.X/' + exit ;; + CRAY*TS:*:*:*) + echo t90-cray-unicos${UNAME_RELEASE} | sed -e 's/\.[^.]*$/.X/' + exit ;; + CRAY*T3E:*:*:*) + echo alphaev5-cray-unicosmk${UNAME_RELEASE} | sed -e 's/\.[^.]*$/.X/' + exit ;; + CRAY*SV1:*:*:*) + echo sv1-cray-unicos${UNAME_RELEASE} | sed -e 's/\.[^.]*$/.X/' + exit ;; + *:UNICOS/mp:*:*) + echo craynv-cray-unicosmp${UNAME_RELEASE} | sed -e 's/\.[^.]*$/.X/' + exit ;; + F30[01]:UNIX_System_V:*:* | F700:UNIX_System_V:*:*) + FUJITSU_PROC=`uname -m | tr 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' 'abcdefghijklmnopqrstuvwxyz'` + FUJITSU_SYS=`uname -p | tr 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' 'abcdefghijklmnopqrstuvwxyz' | sed -e 's/\///'` + FUJITSU_REL=`echo ${UNAME_RELEASE} | sed -e 's/ /_/'` + echo "${FUJITSU_PROC}-fujitsu-${FUJITSU_SYS}${FUJITSU_REL}" + exit ;; + 5000:UNIX_System_V:4.*:*) + FUJITSU_SYS=`uname -p | tr 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' 'abcdefghijklmnopqrstuvwxyz' | sed -e 's/\///'` + FUJITSU_REL=`echo ${UNAME_RELEASE} | tr 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' 'abcdefghijklmnopqrstuvwxyz' | sed -e 's/ /_/'` + echo "sparc-fujitsu-${FUJITSU_SYS}${FUJITSU_REL}" + exit ;; + i*86:BSD/386:*:* | i*86:BSD/OS:*:* | *:Ascend\ Embedded/OS:*:*) + echo ${UNAME_MACHINE}-pc-bsdi${UNAME_RELEASE} + exit ;; + sparc*:BSD/OS:*:*) + echo sparc-unknown-bsdi${UNAME_RELEASE} + exit ;; + *:BSD/OS:*:*) + echo ${UNAME_MACHINE}-unknown-bsdi${UNAME_RELEASE} + exit ;; + *:FreeBSD:*:*) + echo ${UNAME_MACHINE}-unknown-freebsd`echo ${UNAME_RELEASE}|sed -e 's/[-(].*//'` + exit ;; + i*:CYGWIN*:*) + echo ${UNAME_MACHINE}-pc-cygwin + exit ;; + i*:MINGW*:*) + echo ${UNAME_MACHINE}-pc-mingw32 + exit ;; + i*:windows32*:*) + # uname -m includes "-pc" on this system. + echo ${UNAME_MACHINE}-mingw32 + exit ;; + i*:PW*:*) + echo ${UNAME_MACHINE}-pc-pw32 + exit ;; + x86:Interix*:[34]*) + echo i586-pc-interix${UNAME_RELEASE}|sed -e 's/\..*//' + exit ;; + [345]86:Windows_95:* | [345]86:Windows_98:* | [345]86:Windows_NT:*) + echo i${UNAME_MACHINE}-pc-mks + exit ;; + i*:Windows_NT*:* | Pentium*:Windows_NT*:*) + # How do we know it's Interix rather than the generic POSIX subsystem? + # It also conflicts with pre-2.0 versions of AT&T UWIN. Should we + # UNAME_MACHINE based on the output of uname instead of i386? + echo i586-pc-interix + exit ;; + i*:UWIN*:*) + echo ${UNAME_MACHINE}-pc-uwin + exit ;; + amd64:CYGWIN*:*:* | x86_64:CYGWIN*:*:*) + echo x86_64-unknown-cygwin + exit ;; + p*:CYGWIN*:*) + echo powerpcle-unknown-cygwin + exit ;; + prep*:SunOS:5.*:*) + echo powerpcle-unknown-solaris2`echo ${UNAME_RELEASE}|sed -e 's/[^.]*//'` + exit ;; + *:GNU:*:*) + # the GNU system + echo `echo ${UNAME_MACHINE}|sed -e 's,[-/].*$,,'`-unknown-gnu`echo ${UNAME_RELEASE}|sed -e 's,/.*$,,'` + exit ;; + *:GNU/*:*:*) + # other systems with GNU libc and userland + echo ${UNAME_MACHINE}-unknown-`echo ${UNAME_SYSTEM} | sed 's,^[^/]*/,,' | tr '[A-Z]' '[a-z]'``echo ${UNAME_RELEASE}|sed -e 's/[-(].*//'`-gnu + exit ;; + i*86:Minix:*:*) + echo ${UNAME_MACHINE}-pc-minix + exit ;; + arm*:Linux:*:*) + echo ${UNAME_MACHINE}-unknown-linux-gnu + exit ;; + cris:Linux:*:*) + echo cris-axis-linux-gnu + exit ;; + crisv32:Linux:*:*) + echo crisv32-axis-linux-gnu + exit ;; + frv:Linux:*:*) + echo frv-unknown-linux-gnu + exit ;; + ia64:Linux:*:*) + echo ${UNAME_MACHINE}-unknown-linux-gnu + exit ;; + m32r*:Linux:*:*) + echo ${UNAME_MACHINE}-unknown-linux-gnu + exit ;; + m68*:Linux:*:*) + echo ${UNAME_MACHINE}-unknown-linux-gnu + exit ;; + mips:Linux:*:*) + eval $set_cc_for_build + sed 's/^ //' << EOF >$dummy.c + #undef CPU + #undef mips + #undef mipsel + #if defined(__MIPSEL__) || defined(__MIPSEL) || defined(_MIPSEL) || defined(MIPSEL) + CPU=mipsel + #else + #if defined(__MIPSEB__) || defined(__MIPSEB) || defined(_MIPSEB) || defined(MIPSEB) + CPU=mips + #else + CPU= + #endif + #endif +EOF + eval `$CC_FOR_BUILD -E $dummy.c 2>/dev/null | grep ^CPU=` + test x"${CPU}" != x && { echo "${CPU}-unknown-linux-gnu"; exit; } + ;; + mips64:Linux:*:*) + eval $set_cc_for_build + sed 's/^ //' << EOF >$dummy.c + #undef CPU + #undef mips64 + #undef mips64el + #if defined(__MIPSEL__) || defined(__MIPSEL) || defined(_MIPSEL) || defined(MIPSEL) + CPU=mips64el + #else + #if defined(__MIPSEB__) || defined(__MIPSEB) || defined(_MIPSEB) || defined(MIPSEB) + CPU=mips64 + #else + CPU= + #endif + #endif +EOF + eval `$CC_FOR_BUILD -E $dummy.c 2>/dev/null | grep ^CPU=` + test x"${CPU}" != x && { echo "${CPU}-unknown-linux-gnu"; exit; } + ;; + or32:Linux:*:*) + echo or32-unknown-linux-gnu + exit ;; + ppc:Linux:*:*) + echo powerpc-unknown-linux-gnu + exit ;; + ppc64:Linux:*:*) + echo powerpc64-unknown-linux-gnu + exit ;; + alpha:Linux:*:*) + case `sed -n '/^cpu model/s/^.*: \(.*\)/\1/p' < /proc/cpuinfo` in + EV5) UNAME_MACHINE=alphaev5 ;; + EV56) UNAME_MACHINE=alphaev56 ;; + PCA56) UNAME_MACHINE=alphapca56 ;; + PCA57) UNAME_MACHINE=alphapca56 ;; + EV6) UNAME_MACHINE=alphaev6 ;; + EV67) UNAME_MACHINE=alphaev67 ;; + EV68*) UNAME_MACHINE=alphaev68 ;; + esac + objdump --private-headers /bin/sh | grep ld.so.1 >/dev/null + if test "$?" = 0 ; then LIBC="libc1" ; else LIBC="" ; fi + echo ${UNAME_MACHINE}-unknown-linux-gnu${LIBC} + exit ;; + parisc:Linux:*:* | hppa:Linux:*:*) + # Look for CPU level + case `grep '^cpu[^a-z]*:' /proc/cpuinfo 2>/dev/null | cut -d' ' -f2` in + PA7*) echo hppa1.1-unknown-linux-gnu ;; + PA8*) echo hppa2.0-unknown-linux-gnu ;; + *) echo hppa-unknown-linux-gnu ;; + esac + exit ;; + parisc64:Linux:*:* | hppa64:Linux:*:*) + echo hppa64-unknown-linux-gnu + exit ;; + s390:Linux:*:* | s390x:Linux:*:*) + echo ${UNAME_MACHINE}-ibm-linux + exit ;; + sh64*:Linux:*:*) + echo ${UNAME_MACHINE}-unknown-linux-gnu + exit ;; + sh*:Linux:*:*) + echo ${UNAME_MACHINE}-unknown-linux-gnu + exit ;; + sparc:Linux:*:* | sparc64:Linux:*:*) + echo ${UNAME_MACHINE}-unknown-linux-gnu + exit ;; + x86_64:Linux:*:*) + echo x86_64-unknown-linux-gnu + exit ;; + i*86:Linux:*:*) + # The BFD linker knows what the default object file format is, so + # first see if it will tell us. cd to the root directory to prevent + # problems with other programs or directories called `ld' in the path. + # Set LC_ALL=C to ensure ld outputs messages in English. + ld_supported_targets=`cd /; LC_ALL=C ld --help 2>&1 \ + | sed -ne '/supported targets:/!d + s/[ ][ ]*/ /g + s/.*supported targets: *// + s/ .*// + p'` + case "$ld_supported_targets" in + elf32-i386) + TENTATIVE="${UNAME_MACHINE}-pc-linux-gnu" + ;; + a.out-i386-linux) + echo "${UNAME_MACHINE}-pc-linux-gnuaout" + exit ;; + coff-i386) + echo "${UNAME_MACHINE}-pc-linux-gnucoff" + exit ;; + "") + # Either a pre-BFD a.out linker (linux-gnuoldld) or + # one that does not give us useful --help. + echo "${UNAME_MACHINE}-pc-linux-gnuoldld" + exit ;; + esac + # Determine whether the default compiler is a.out or elf + eval $set_cc_for_build + sed 's/^ //' << EOF >$dummy.c + #include + #ifdef __ELF__ + # ifdef __GLIBC__ + # if __GLIBC__ >= 2 + LIBC=gnu + # else + LIBC=gnulibc1 + # endif + # else + LIBC=gnulibc1 + # endif + #else + #ifdef __INTEL_COMPILER + LIBC=gnu + #else + LIBC=gnuaout + #endif + #endif + #ifdef __dietlibc__ + LIBC=dietlibc + #endif +EOF + eval `$CC_FOR_BUILD -E $dummy.c 2>/dev/null | grep ^LIBC=` + test x"${LIBC}" != x && { + echo "${UNAME_MACHINE}-pc-linux-${LIBC}" + exit + } + test x"${TENTATIVE}" != x && { echo "${TENTATIVE}"; exit; } + ;; + i*86:DYNIX/ptx:4*:*) + # ptx 4.0 does uname -s correctly, with DYNIX/ptx in there. + # earlier versions are messed up and put the nodename in both + # sysname and nodename. + echo i386-sequent-sysv4 + exit ;; + i*86:UNIX_SV:4.2MP:2.*) + # Unixware is an offshoot of SVR4, but it has its own version + # number series starting with 2... + # I am not positive that other SVR4 systems won't match this, + # I just have to hope. -- rms. + # Use sysv4.2uw... so that sysv4* matches it. + echo ${UNAME_MACHINE}-pc-sysv4.2uw${UNAME_VERSION} + exit ;; + i*86:OS/2:*:*) + # If we were able to find `uname', then EMX Unix compatibility + # is probably installed. + echo ${UNAME_MACHINE}-pc-os2-emx + exit ;; + i*86:XTS-300:*:STOP) + echo ${UNAME_MACHINE}-unknown-stop + exit ;; + i*86:atheos:*:*) + echo ${UNAME_MACHINE}-unknown-atheos + exit ;; + i*86:syllable:*:*) + echo ${UNAME_MACHINE}-pc-syllable + exit ;; + i*86:LynxOS:2.*:* | i*86:LynxOS:3.[01]*:* | i*86:LynxOS:4.0*:*) + echo i386-unknown-lynxos${UNAME_RELEASE} + exit ;; + i*86:*DOS:*:*) + echo ${UNAME_MACHINE}-pc-msdosdjgpp + exit ;; + i*86:*:4.*:* | i*86:SYSTEM_V:4.*:*) + UNAME_REL=`echo ${UNAME_RELEASE} | sed 's/\/MP$//'` + if grep Novell /usr/include/link.h >/dev/null 2>/dev/null; then + echo ${UNAME_MACHINE}-univel-sysv${UNAME_REL} + else + echo ${UNAME_MACHINE}-pc-sysv${UNAME_REL} + fi + exit ;; + i*86:*:5:[678]*) + # UnixWare 7.x, OpenUNIX and OpenServer 6. + case `/bin/uname -X | grep "^Machine"` in + *486*) UNAME_MACHINE=i486 ;; + *Pentium) UNAME_MACHINE=i586 ;; + *Pent*|*Celeron) UNAME_MACHINE=i686 ;; + esac + echo ${UNAME_MACHINE}-unknown-sysv${UNAME_RELEASE}${UNAME_SYSTEM}${UNAME_VERSION} + exit ;; + i*86:*:3.2:*) + if test -f /usr/options/cb.name; then + UNAME_REL=`sed -n 's/.*Version //p' /dev/null >/dev/null ; then + UNAME_REL=`(/bin/uname -X|grep Release|sed -e 's/.*= //')` + (/bin/uname -X|grep i80486 >/dev/null) && UNAME_MACHINE=i486 + (/bin/uname -X|grep '^Machine.*Pentium' >/dev/null) \ + && UNAME_MACHINE=i586 + (/bin/uname -X|grep '^Machine.*Pent *II' >/dev/null) \ + && UNAME_MACHINE=i686 + (/bin/uname -X|grep '^Machine.*Pentium Pro' >/dev/null) \ + && UNAME_MACHINE=i686 + echo ${UNAME_MACHINE}-pc-sco$UNAME_REL + else + echo ${UNAME_MACHINE}-pc-sysv32 + fi + exit ;; + pc:*:*:*) + # Left here for compatibility: + # uname -m prints for DJGPP always 'pc', but it prints nothing about + # the processor, so we play safe by assuming i386. + echo i386-pc-msdosdjgpp + exit ;; + Intel:Mach:3*:*) + echo i386-pc-mach3 + exit ;; + paragon:*:*:*) + echo i860-intel-osf1 + exit ;; + i860:*:4.*:*) # i860-SVR4 + if grep Stardent /usr/include/sys/uadmin.h >/dev/null 2>&1 ; then + echo i860-stardent-sysv${UNAME_RELEASE} # Stardent Vistra i860-SVR4 + else # Add other i860-SVR4 vendors below as they are discovered. + echo i860-unknown-sysv${UNAME_RELEASE} # Unknown i860-SVR4 + fi + exit ;; + mini*:CTIX:SYS*5:*) + # "miniframe" + echo m68010-convergent-sysv + exit ;; + mc68k:UNIX:SYSTEM5:3.51m) + echo m68k-convergent-sysv + exit ;; + M680?0:D-NIX:5.3:*) + echo m68k-diab-dnix + exit ;; + M68*:*:R3V[5678]*:*) + test -r /sysV68 && { echo 'm68k-motorola-sysv'; exit; } ;; + 3[345]??:*:4.0:3.0 | 3[34]??A:*:4.0:3.0 | 3[34]??,*:*:4.0:3.0 | 3[34]??/*:*:4.0:3.0 | 4400:*:4.0:3.0 | 4850:*:4.0:3.0 | SKA40:*:4.0:3.0 | SDS2:*:4.0:3.0 | SHG2:*:4.0:3.0 | S7501*:*:4.0:3.0) + OS_REL='' + test -r /etc/.relid \ + && OS_REL=.`sed -n 's/[^ ]* [^ ]* \([0-9][0-9]\).*/\1/p' < /etc/.relid` + /bin/uname -p 2>/dev/null | grep 86 >/dev/null \ + && { echo i486-ncr-sysv4.3${OS_REL}; exit; } + /bin/uname -p 2>/dev/null | /bin/grep entium >/dev/null \ + && { echo i586-ncr-sysv4.3${OS_REL}; exit; } ;; + 3[34]??:*:4.0:* | 3[34]??,*:*:4.0:*) + /bin/uname -p 2>/dev/null | grep 86 >/dev/null \ + && { echo i486-ncr-sysv4; exit; } ;; + m68*:LynxOS:2.*:* | m68*:LynxOS:3.0*:*) + echo m68k-unknown-lynxos${UNAME_RELEASE} + exit ;; + mc68030:UNIX_System_V:4.*:*) + echo m68k-atari-sysv4 + exit ;; + TSUNAMI:LynxOS:2.*:*) + echo sparc-unknown-lynxos${UNAME_RELEASE} + exit ;; + rs6000:LynxOS:2.*:*) + echo rs6000-unknown-lynxos${UNAME_RELEASE} + exit ;; + PowerPC:LynxOS:2.*:* | PowerPC:LynxOS:3.[01]*:* | PowerPC:LynxOS:4.0*:*) + echo powerpc-unknown-lynxos${UNAME_RELEASE} + exit ;; + SM[BE]S:UNIX_SV:*:*) + echo mips-dde-sysv${UNAME_RELEASE} + exit ;; + RM*:ReliantUNIX-*:*:*) + echo mips-sni-sysv4 + exit ;; + RM*:SINIX-*:*:*) + echo mips-sni-sysv4 + exit ;; + *:SINIX-*:*:*) + if uname -p 2>/dev/null >/dev/null ; then + UNAME_MACHINE=`(uname -p) 2>/dev/null` + echo ${UNAME_MACHINE}-sni-sysv4 + else + echo ns32k-sni-sysv + fi + exit ;; + PENTIUM:*:4.0*:*) # Unisys `ClearPath HMP IX 4000' SVR4/MP effort + # says + echo i586-unisys-sysv4 + exit ;; + *:UNIX_System_V:4*:FTX*) + # From Gerald Hewes . + # How about differentiating between stratus architectures? -djm + echo hppa1.1-stratus-sysv4 + exit ;; + *:*:*:FTX*) + # From seanf@swdc.stratus.com. + echo i860-stratus-sysv4 + exit ;; + i*86:VOS:*:*) + # From Paul.Green@stratus.com. + echo ${UNAME_MACHINE}-stratus-vos + exit ;; + *:VOS:*:*) + # From Paul.Green@stratus.com. + echo hppa1.1-stratus-vos + exit ;; + mc68*:A/UX:*:*) + echo m68k-apple-aux${UNAME_RELEASE} + exit ;; + news*:NEWS-OS:6*:*) + echo mips-sony-newsos6 + exit ;; + R[34]000:*System_V*:*:* | R4000:UNIX_SYSV:*:* | R*000:UNIX_SV:*:*) + if [ -d /usr/nec ]; then + echo mips-nec-sysv${UNAME_RELEASE} + else + echo mips-unknown-sysv${UNAME_RELEASE} + fi + exit ;; + BeBox:BeOS:*:*) # BeOS running on hardware made by Be, PPC only. + echo powerpc-be-beos + exit ;; + BeMac:BeOS:*:*) # BeOS running on Mac or Mac clone, PPC only. + echo powerpc-apple-beos + exit ;; + BePC:BeOS:*:*) # BeOS running on Intel PC compatible. + echo i586-pc-beos + exit ;; + SX-4:SUPER-UX:*:*) + echo sx4-nec-superux${UNAME_RELEASE} + exit ;; + SX-5:SUPER-UX:*:*) + echo sx5-nec-superux${UNAME_RELEASE} + exit ;; + SX-6:SUPER-UX:*:*) + echo sx6-nec-superux${UNAME_RELEASE} + exit ;; + Power*:Rhapsody:*:*) + echo powerpc-apple-rhapsody${UNAME_RELEASE} + exit ;; + *:Rhapsody:*:*) + echo ${UNAME_MACHINE}-apple-rhapsody${UNAME_RELEASE} + exit ;; + *:Darwin:*:*) + UNAME_PROCESSOR=`uname -p` || UNAME_PROCESSOR=unknown + case $UNAME_PROCESSOR in + *86) UNAME_PROCESSOR=i686 ;; + unknown) UNAME_PROCESSOR=powerpc ;; + esac + echo ${UNAME_PROCESSOR}-apple-darwin${UNAME_RELEASE} + exit ;; + *:procnto*:*:* | *:QNX:[0123456789]*:*) + UNAME_PROCESSOR=`uname -p` + if test "$UNAME_PROCESSOR" = "x86"; then + UNAME_PROCESSOR=i386 + UNAME_MACHINE=pc + fi + echo ${UNAME_PROCESSOR}-${UNAME_MACHINE}-nto-qnx${UNAME_RELEASE} + exit ;; + *:QNX:*:4*) + echo i386-pc-qnx + exit ;; + NSE-?:NONSTOP_KERNEL:*:*) + echo nse-tandem-nsk${UNAME_RELEASE} + exit ;; + NSR-?:NONSTOP_KERNEL:*:*) + echo nsr-tandem-nsk${UNAME_RELEASE} + exit ;; + *:NonStop-UX:*:*) + echo mips-compaq-nonstopux + exit ;; + BS2000:POSIX*:*:*) + echo bs2000-siemens-sysv + exit ;; + DS/*:UNIX_System_V:*:*) + echo ${UNAME_MACHINE}-${UNAME_SYSTEM}-${UNAME_RELEASE} + exit ;; + *:Plan9:*:*) + # "uname -m" is not consistent, so use $cputype instead. 386 + # is converted to i386 for consistency with other x86 + # operating systems. + if test "$cputype" = "386"; then + UNAME_MACHINE=i386 + else + UNAME_MACHINE="$cputype" + fi + echo ${UNAME_MACHINE}-unknown-plan9 + exit ;; + *:TOPS-10:*:*) + echo pdp10-unknown-tops10 + exit ;; + *:TENEX:*:*) + echo pdp10-unknown-tenex + exit ;; + KS10:TOPS-20:*:* | KL10:TOPS-20:*:* | TYPE4:TOPS-20:*:*) + echo pdp10-dec-tops20 + exit ;; + XKL-1:TOPS-20:*:* | TYPE5:TOPS-20:*:*) + echo pdp10-xkl-tops20 + exit ;; + *:TOPS-20:*:*) + echo pdp10-unknown-tops20 + exit ;; + *:ITS:*:*) + echo pdp10-unknown-its + exit ;; + SEI:*:*:SEIUX) + echo mips-sei-seiux${UNAME_RELEASE} + exit ;; + *:DragonFly:*:*) + echo ${UNAME_MACHINE}-unknown-dragonfly`echo ${UNAME_RELEASE}|sed -e 's/[-(].*//'` + exit ;; + *:*VMS:*:*) + UNAME_MACHINE=`(uname -p) 2>/dev/null` + case "${UNAME_MACHINE}" in + A*) echo alpha-dec-vms ; exit ;; + I*) echo ia64-dec-vms ; exit ;; + V*) echo vax-dec-vms ; exit ;; + esac ;; + *:XENIX:*:SysV) + echo i386-pc-xenix + exit ;; + i*86:skyos:*:*) + echo ${UNAME_MACHINE}-pc-skyos`echo ${UNAME_RELEASE}` | sed -e 's/ .*$//' + exit ;; +esac + +#echo '(No uname command or uname output not recognized.)' 1>&2 +#echo "${UNAME_MACHINE}:${UNAME_SYSTEM}:${UNAME_RELEASE}:${UNAME_VERSION}" 1>&2 + +eval $set_cc_for_build +cat >$dummy.c < +# include +#endif +main () +{ +#if defined (sony) +#if defined (MIPSEB) + /* BFD wants "bsd" instead of "newsos". Perhaps BFD should be changed, + I don't know.... */ + printf ("mips-sony-bsd\n"); exit (0); +#else +#include + printf ("m68k-sony-newsos%s\n", +#ifdef NEWSOS4 + "4" +#else + "" +#endif + ); exit (0); +#endif +#endif + +#if defined (__arm) && defined (__acorn) && defined (__unix) + printf ("arm-acorn-riscix\n"); exit (0); +#endif + +#if defined (hp300) && !defined (hpux) + printf ("m68k-hp-bsd\n"); exit (0); +#endif + +#if defined (NeXT) +#if !defined (__ARCHITECTURE__) +#define __ARCHITECTURE__ "m68k" +#endif + int version; + version=`(hostinfo | sed -n 's/.*NeXT Mach \([0-9]*\).*/\1/p') 2>/dev/null`; + if (version < 4) + printf ("%s-next-nextstep%d\n", __ARCHITECTURE__, version); + else + printf ("%s-next-openstep%d\n", __ARCHITECTURE__, version); + exit (0); +#endif + +#if defined (MULTIMAX) || defined (n16) +#if defined (UMAXV) + printf ("ns32k-encore-sysv\n"); exit (0); +#else +#if defined (CMU) + printf ("ns32k-encore-mach\n"); exit (0); +#else + printf ("ns32k-encore-bsd\n"); exit (0); +#endif +#endif +#endif + +#if defined (__386BSD__) + printf ("i386-pc-bsd\n"); exit (0); +#endif + +#if defined (sequent) +#if defined (i386) + printf ("i386-sequent-dynix\n"); exit (0); +#endif +#if defined (ns32000) + printf ("ns32k-sequent-dynix\n"); exit (0); +#endif +#endif + +#if defined (_SEQUENT_) + struct utsname un; + + uname(&un); + + if (strncmp(un.version, "V2", 2) == 0) { + printf ("i386-sequent-ptx2\n"); exit (0); + } + if (strncmp(un.version, "V1", 2) == 0) { /* XXX is V1 correct? */ + printf ("i386-sequent-ptx1\n"); exit (0); + } + printf ("i386-sequent-ptx\n"); exit (0); + +#endif + +#if defined (vax) +# if !defined (ultrix) +# include +# if defined (BSD) +# if BSD == 43 + printf ("vax-dec-bsd4.3\n"); exit (0); +# else +# if BSD == 199006 + printf ("vax-dec-bsd4.3reno\n"); exit (0); +# else + printf ("vax-dec-bsd\n"); exit (0); +# endif +# endif +# else + printf ("vax-dec-bsd\n"); exit (0); +# endif +# else + printf ("vax-dec-ultrix\n"); exit (0); +# endif +#endif + +#if defined (alliant) && defined (i860) + printf ("i860-alliant-bsd\n"); exit (0); +#endif + + exit (1); +} +EOF + +$CC_FOR_BUILD -o $dummy $dummy.c 2>/dev/null && SYSTEM_NAME=`$dummy` && + { echo "$SYSTEM_NAME"; exit; } + +# Apollos put the system type in the environment. + +test -d /usr/apollo && { echo ${ISP}-apollo-${SYSTYPE}; exit; } + +# Convex versions that predate uname can use getsysinfo(1) + +if [ -x /usr/convex/getsysinfo ] +then + case `getsysinfo -f cpu_type` in + c1*) + echo c1-convex-bsd + exit ;; + c2*) + if getsysinfo -f scalar_acc + then echo c32-convex-bsd + else echo c2-convex-bsd + fi + exit ;; + c34*) + echo c34-convex-bsd + exit ;; + c38*) + echo c38-convex-bsd + exit ;; + c4*) + echo c4-convex-bsd + exit ;; + esac +fi + +cat >&2 < in order to provide the needed +information to handle your system. + +config.guess timestamp = $timestamp + +uname -m = `(uname -m) 2>/dev/null || echo unknown` +uname -r = `(uname -r) 2>/dev/null || echo unknown` +uname -s = `(uname -s) 2>/dev/null || echo unknown` +uname -v = `(uname -v) 2>/dev/null || echo unknown` + +/usr/bin/uname -p = `(/usr/bin/uname -p) 2>/dev/null` +/bin/uname -X = `(/bin/uname -X) 2>/dev/null` + +hostinfo = `(hostinfo) 2>/dev/null` +/bin/universe = `(/bin/universe) 2>/dev/null` +/usr/bin/arch -k = `(/usr/bin/arch -k) 2>/dev/null` +/bin/arch = `(/bin/arch) 2>/dev/null` +/usr/bin/oslevel = `(/usr/bin/oslevel) 2>/dev/null` +/usr/convex/getsysinfo = `(/usr/convex/getsysinfo) 2>/dev/null` + +UNAME_MACHINE = ${UNAME_MACHINE} +UNAME_RELEASE = ${UNAME_RELEASE} +UNAME_SYSTEM = ${UNAME_SYSTEM} +UNAME_VERSION = ${UNAME_VERSION} +EOF + +exit 1 + +# Local variables: +# eval: (add-hook 'write-file-hooks 'time-stamp) +# time-stamp-start: "timestamp='" +# time-stamp-format: "%:y-%02m-%02d" +# time-stamp-end: "'" +# End: diff --git a/source4/cluster/ctdb/config.mk b/source4/cluster/ctdb/config.mk index 893bd9f136..0e0629bfb1 100644 --- a/source4/cluster/ctdb/config.mk +++ b/source4/cluster/ctdb/config.mk @@ -21,6 +21,9 @@ OBJ_FILES = \ common/ctdb_call.o \ common/ctdb_message.o \ common/ctdb_ltdb.o \ - common/ctdb_util.o + common/ctdb_util.o \ + common/ctdb_io.o \ + common/ctdb_client.o \ + common/ctdb_daemon.o PUBLIC_DEPENDENCIES = LIBTDB LIBTALLOC PRIVATE_DEPENDENCIES = ctdb_tcp diff --git a/source4/cluster/ctdb/configure.ac b/source4/cluster/ctdb/configure.ac new file mode 100644 index 0000000000..784eef2190 --- /dev/null +++ b/source4/cluster/ctdb/configure.ac @@ -0,0 +1,33 @@ +AC_PREREQ(2.50) +AC_DEFUN([AC_CHECK_LIB_EXT], [ + AC_CHECK_LIB([$1],[$3],[$4],[$5],[$7]) + ac_cv_lib_ext_$1_$3=$ac_cv_lib_$1_$3 +]) +AC_DEFUN([AC_CHECK_FUNC_EXT], [ + AC_CHECK_FUNC([$1],[$3],[$4]) + ac_cv_func_ext_$1=$ac_cv_func_$1 +]) +AC_DEFUN([SMB_MODULE_DEFAULT], [echo -n ""]) +AC_DEFUN([SMB_LIBRARY_ENABLE], [echo -n ""]) +AC_DEFUN([SMB_EXT_LIB], [echo -n ""]) +AC_DEFUN([SMB_ENABLE], [echo -n ""]) +AC_INIT(ctdb.h) +AC_CONFIG_SRCDIR([tests/ctdb_test.c]) + +AC_LIBREPLACE_ALL_CHECKS + +if test "$ac_cv_prog_gcc" = yes; then + CFLAGS="$CFLAGS -Wall -Wshadow -Wstrict-prototypes -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings" +fi + +AC_CONFIG_HEADER(config.h) + +EXTRA_OBJ="" + +m4_include(libtalloc.m4) +m4_include(libtdb.m4) +m4_include(ib/config.m4) + +AC_SUBST(EXTRA_OBJ) + +AC_OUTPUT(Makefile) diff --git a/source4/cluster/ctdb/ctdb_cluster.c b/source4/cluster/ctdb/ctdb_cluster.c index ceff53ee5d..aee47c6281 100644 --- a/source4/cluster/ctdb/ctdb_cluster.c +++ b/source4/cluster/ctdb/ctdb_cluster.c @@ -141,6 +141,17 @@ static NTSTATUS ctdb_message_init(struct cluster_ops *ops, { struct cluster_state *state = ops->private; struct cluster_messaging_list *m; + int ret; + + /* setup messaging handler */ + ret = ctdb_set_message_handler(state->ctdb, ctdb_message_handler, + server.id, state); + if (ret == -1) { + DEBUG(0,("ctdb_set_message_handler failed - %s\n", + ctdb_errstr(state->ctdb))); + exit(1); + } + m = talloc(msg, struct cluster_messaging_list); NT_STATUS_HAVE_NO_MEMORY(m); @@ -248,14 +259,6 @@ void cluster_ctdb_init(struct event_context *ev) goto failed; } - /* setup messaging handler */ - ret = ctdb_set_message_handler(state->ctdb, ctdb_message_handler, state); - if (ret == -1) { - DEBUG(0,("ctdb_set_message_handler failed - %s\n", - ctdb_errstr(state->ctdb))); - goto failed; - } - /* attach all the databases we will need */ for (i=0;i0){ + cnt+=numread; + } + } + /* read the rest of the pdu */ + tot=rep.hdr.length; + while(cnt!=tot){ + int numread; + numread=read(fd, ((char *)&rep)+cnt, tot-cnt); + if(numread>0){ + cnt+=numread; + } + } + + return rep.vnn; +} + + +int send_a_message(int fd, int ourvnn, int vnn, int pid, TDB_DATA data) +{ + struct ctdb_req_message r; + int len, cnt; + + len = offsetof(struct ctdb_req_message, data) + data.dsize; + r.hdr.length = len; + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.operation = CTDB_REQ_MESSAGE; + r.hdr.destnode = vnn; + r.hdr.srcnode = ourvnn; + r.hdr.reqid = 0; + r.srvid = pid; + r.datalen = data.dsize; + + /* write header */ + cnt=write(fd, &r, offsetof(struct ctdb_req_message, data)); + /* write data */ + if(data.dsize){ + cnt=write(fd, data.dptr, data.dsize); + } + return 0; +} + +int receive_a_message(int fd, struct ctdb_req_message **preply) +{ + int cnt,tot; + struct ctdb_req_message *rep; + uint32_t length; + + /* read the 4 bytes of length for the pdu */ + cnt=0; + tot=4; + while(cnt!=tot){ + int numread; + numread=read(fd, ((char *)&length)+cnt, tot-cnt); + if(numread>0){ + cnt+=numread; + } + } + + /* read the rest of the pdu */ + rep = malloc(length); + rep->hdr.length = length; + cnt = 0; + tot = length-4; + while(cnt!=tot){ + int numread; + numread=read(fd, ((char *)rep)+cnt, tot-cnt); + if(numread>0){ + cnt+=numread; + } + } + + *preply = rep; + return 0; +} + +/* + hash function for mapping data to a VNN - taken from tdb +*/ +uint32_t ctdb_hash(const TDB_DATA *key) +{ + uint32_t value; /* Used to compute the hash value. */ + uint32_t i; /* Used to cycle through random values. */ + + /* Set the initial value from the key size. */ + for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++) + value = (value + (key->dptr[i] << (i*5 % 24))); + + return (1103515243 * value + 12345); +} + +void fetch_lock(int fd, uint32_t db_id, TDB_DATA key) +{ + struct ctdb_req_fetch_lock *req; + struct ctdb_reply_fetch_lock *rep; + uint32_t length; + int len, cnt, tot; + + len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize; + req = malloc(len); + + req->hdr.length = len; + req->hdr.ctdb_magic = CTDB_MAGIC; + req->hdr.ctdb_version = CTDB_VERSION; + req->hdr.operation = CTDB_REQ_FETCH_LOCK; + req->hdr.reqid = 1; + req->db_id = db_id; + req->keylen = key.dsize; + memcpy(&req->key[0], key.dptr, key.dsize); + + cnt=write(fd, req, len); + + + /* wait fot the reply */ + /* read the 4 bytes of length for the pdu */ + cnt=0; + tot=4; + while(cnt!=tot){ + int numread; + numread=read(fd, ((char *)&length)+cnt, tot-cnt); + if(numread>0){ + cnt+=numread; + } + } + /* read the rest of the pdu */ + rep = malloc(length); + tot=length; + while(cnt!=tot){ + int numread; + numread=read(fd, ((char *)rep)+cnt, tot-cnt); + if(numread>0){ + cnt+=numread; + } + } + printf("fetch lock reply: state:%d datalen:%d\n",rep->state,rep->datalen); + if(!rep->datalen){ + printf("no data\n"); + } else { + printf("data:[%s]\n",rep->data); + } + +} + +void store_unlock(int fd, uint32_t db_id, TDB_DATA key, TDB_DATA data) +{ + struct ctdb_req_store_unlock *req; + struct ctdb_reply_store_unlock *rep; + uint32_t length; + int len, cnt, tot; + + len = offsetof(struct ctdb_req_store_unlock, data) + key.dsize + data.dsize; + req = malloc(len); + + req->hdr.length = len; + req->hdr.ctdb_magic = CTDB_MAGIC; + req->hdr.ctdb_version = CTDB_VERSION; + req->hdr.operation = CTDB_REQ_STORE_UNLOCK; + req->hdr.reqid = 1; + req->db_id = db_id; + req->keylen = key.dsize; + req->datalen = data.dsize; + memcpy(&req->data[0], key.dptr, key.dsize); + memcpy(&req->data[key.dsize], data.dptr, data.dsize); + + cnt=write(fd, req, len); + + + /* wait fot the reply */ + /* read the 4 bytes of length for the pdu */ + cnt=0; + tot=4; + while(cnt!=tot){ + int numread; + numread=read(fd, ((char *)&length)+cnt, tot-cnt); + if(numread>0){ + cnt+=numread; + } + } + /* read the rest of the pdu */ + rep = malloc(length); + tot=length; + while(cnt!=tot){ + int numread; + numread=read(fd, ((char *)rep)+cnt, tot-cnt); + if(numread>0){ + cnt+=numread; + } + } + printf("store unlock reply: state:%d\n",rep->state); +} + +int main(int argc, const char *argv[]) +{ + int fd, pid, vnn, dstvnn, dstpid; + TDB_DATA message; + struct ctdb_req_message *reply; + TDB_DATA dbname; + uint32_t db_id; + TDB_DATA key, data; + char str[256]; + + /* open the socket to talk to the local ctdb daemon */ + fd=ux_socket_connect(CTDB_SOCKET); + if (fd==-1) { + printf("failed to open domain socket\n"); + exit(10); + } + + + /* register our local server id with the daemon so that it knows + where to send messages addressed to our local pid. + */ + pid=getpid(); + register_pid_with_daemon(fd, pid); + + + /* do a connect wait to ensure that all nodes in the cluster are up + and operational. + this also tells us the vnn of the local cluster. + If someone wants to send us a emssage they should send it to + this vnn and our pid + */ + vnn=wait_for_cluster(fd); + printf("our address is vnn:%d pid:%d if someone wants to send us a message!\n",vnn,pid); + + + /* send a message to ourself */ + dstvnn=vnn; + dstpid=pid; + message.dptr=discard_const("Test message"); + message.dsize=strlen((const char *)message.dptr)+1; + printf("sending test message [%s] to ourself\n", message.dptr); + send_a_message(fd, vnn, dstvnn, dstpid, message); + + /* wait for the message to come back */ + receive_a_message(fd, &reply); + printf("received message: [%s]\n",&reply->data[0]); + + /* create the db id for "test.tdb" */ + dbname.dptr = discard_const("test.tdb"); + dbname.dsize = strlen((const char *)(dbname.dptr)); + db_id = ctdb_hash(&dbname); + printf("the has for the database id is 0x%08x\n",db_id); + printf("\n"); + + /* send a fetch lock */ + key.dptr=discard_const("TestKey"); + key.dsize=strlen((const char *)(key.dptr)); + printf("fetch the test key:[%s]\n",key.dptr); + fetch_lock(fd, db_id, key); + printf("\n"); + + + /* send a store unlock */ + sprintf(str,"TestData_%d",getpid()); + data.dptr=discard_const(str); + data.dsize=strlen((const char *)(data.dptr)); + printf("store new data==[%s] for this record\n",data.dptr); + store_unlock(fd, db_id, key, data); + printf("\n"); + + /* send a fetch lock */ + printf("fetch the test key:[%s]\n",key.dptr); + fetch_lock(fd, db_id, key); + printf("\n"); + + + return 0; +} diff --git a/source4/cluster/ctdb/direct/nodes.txt b/source4/cluster/ctdb/direct/nodes.txt new file mode 100644 index 0000000000..e1198b59ac --- /dev/null +++ b/source4/cluster/ctdb/direct/nodes.txt @@ -0,0 +1,2 @@ +127.0.0.1:9001 +127.0.0.2:9001 diff --git a/source4/cluster/ctdb/ib/README.txt b/source4/cluster/ctdb/ib/README.txt new file mode 100644 index 0000000000..74fc129c35 --- /dev/null +++ b/source4/cluster/ctdb/ib/README.txt @@ -0,0 +1,20 @@ +Compilation +=========== + +For the configure script, please set the OFED include & library path by e.g.: + +export CFLAGS="-I/usr/local/ofed/include -L/usr/local/ofed/lib" + +After then: + +./configure --enable-infiniband + +Example for testing +=================== +bin/ctdb_test --transport ib --nlist ../2nodes_rm.txt --listen 10.0.0.1:9001 +bin/ctdb_test --transport ib --nlist ../2nodes_rm.txt --listen 10.0.0.2:9001 + +where 2nodes_rm.txt: +10.0.0.1:9001 +10.0.0.2:9001 + diff --git a/source4/cluster/ctdb/ib/config.m4 b/source4/cluster/ctdb/ib/config.m4 new file mode 100644 index 0000000000..9d95ea7a5a --- /dev/null +++ b/source4/cluster/ctdb/ib/config.m4 @@ -0,0 +1,31 @@ +AC_ARG_ENABLE(--enable-infiniband, +[ --enable-infiniband Turn on infiniband support (default=no)]) + +HAVE_INFINIBAND=no + +if eval "test x$enable_infiniband = xyes"; then + AC_DEFINE(USE_INFINIBAND,1,[Use infiniband]) + HAVE_INFINIBAND=yes + + INFINIBAND_WRAPPER_OBJ="ib/ibwrapper.o ib/ibw_ctdb.o ib/ibw_ctdb_init.o" + INFINIBAND_LIBS="-lrdmacm -libverbs" + INFINIBAND_BINS="bin/ibwrapper_test" + + AC_CHECK_HEADERS(infiniband/verbs.h, [], [ + echo "ERROR: you need infiniband/verbs.h when ib enabled!" + exit -1]) + AC_CHECK_HEADERS(rdma/rdma_cma.h, [], [ + echo "ERROR: you need rdma/rdma_cma.h when ib enabled!" + exit -1]) + AC_CHECK_LIB(ibverbs, ibv_create_qp, [], [ + echo "ERROR: you need libibverbs when ib enabled!" + exit -1]) + AC_CHECK_LIB(rdmacm, rdma_connect, [], [ + echo "ERROR: you need librdmacm when ib enabled!" + exit -1]) +fi + +AC_SUBST(HAVE_INFINIBAND) +AC_SUBST(INFINIBAND_WRAPPER_OBJ) +AC_SUBST(INFINIBAND_LIBS) +AC_SUBST(INFINIBAND_BINS) diff --git a/source4/cluster/ctdb/ib/ibw_ctdb.c b/source4/cluster/ctdb/ib/ibw_ctdb.c new file mode 100644 index 0000000000..dfe4adc6d3 --- /dev/null +++ b/source4/cluster/ctdb/ib/ibw_ctdb.c @@ -0,0 +1,157 @@ +/* + * Unix SMB/CIFS implementation. + * Join infiniband wrapper and ctdb. + * + * Copyright (C) Sven Oehme 2006 + * + * Major code contributions by Peter Somogyi + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include "includes.h" +#include "lib/events/events.h" +#include +#include +#include "ctdb_private.h" +#include "ibwrapper.h" +#include "ibw_ctdb.h" + +int ctdb_ibw_node_connect(struct ctdb_node *node) +{ + struct ctdb_ibw_node *cn = talloc_get_type(node->private_data, struct ctdb_ibw_node); + int rc; + + assert(cn!=NULL); + assert(cn->conn!=NULL); + struct sockaddr_in sock_out; + + memset(&sock_out, 0, sizeof(struct sockaddr_in)); + inet_pton(AF_INET, node->address.address, &sock_out.sin_addr); + sock_out.sin_port = htons(node->address.port); + sock_out.sin_family = PF_INET; + + rc = ibw_connect(cn->conn, &sock_out, node); + if (rc) { + DEBUG(0, ("ctdb_ibw_node_connect/ibw_connect failed - retrying...\n")); + /* try again once a second */ + event_add_timed(node->ctdb->ev, node, timeval_current_ofs(1, 0), + ctdb_ibw_node_connect_event, node); + } + + /* continues at ibw_ctdb.c/IBWC_CONNECTED in good case */ + return 0; +} + +void ctdb_ibw_node_connect_event(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ctdb_node *node = talloc_get_type(private_data, struct ctdb_node); + + ctdb_ibw_node_connect(node); +} + +int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn) +{ + if (ctx!=NULL) { + /* ctx->state changed */ + switch(ctx->state) { + case IBWS_INIT: /* ctx start - after ibw_init */ + break; + case IBWS_READY: /* after ibw_bind & ibw_listen */ + break; + case IBWS_CONNECT_REQUEST: /* after [IBWS_READY + incoming request] */ + /* => [(ibw_accept)IBWS_READY | (ibw_disconnect)STOPPED | ERROR] */ + if (ibw_accept(ctx, conn, NULL)) { + DEBUG(0, ("connstate_handler/ibw_accept failed\n")); + return -1; + } /* else continue in IBWC_CONNECTED */ + break; + case IBWS_STOPPED: /* normal stop <= ibw_disconnect+(IBWS_READY | IBWS_CONNECT_REQUEST) */ + /* TODO: have a CTDB upcall for which CTDB should wait in a (final) loop */ + break; + case IBWS_ERROR: /* abnormal state; ibw_stop must be called after this */ + break; + default: + assert(0); + break; + } + } + + if (conn!=NULL) { + /* conn->state changed */ + switch(conn->state) { + case IBWC_INIT: /* conn start - internal state */ + break; + case IBWC_CONNECTED: { /* after ibw_accept or ibw_connect */ + struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node); + if (node!=NULL) { /* after ibw_connect */ + struct ctdb_ibw_node *cn = talloc_get_type(node->private_data, struct ctdb_ibw_node); + + node->ctdb->upcalls->node_connected(node); + ctdb_flush_cn_queue(cn); + } else { /* after ibw_accept */ + /* NOP in CTDB case */ + } + } break; + case IBWC_DISCONNECTED: { /* after ibw_disconnect */ + struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node); + if (node!=NULL) + node->ctdb->upcalls->node_dead(node); + talloc_free(conn); + /* normal + intended disconnect => not reconnecting in this layer */ + } break; + case IBWC_ERROR: { + struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node); + if (node!=NULL) { + struct ctdb_ibw_node *cn = talloc_get_type(node->private_data, struct ctdb_ibw_node); + struct ibw_ctx *ictx = cn->conn->ctx; + + DEBUG(10, ("IBWC_ERROR, reconnecting...\n")); + talloc_free(cn->conn); /* internal queue content is destroyed */ + cn->conn = (void *)ibw_conn_new(ictx, node); + event_add_timed(node->ctdb->ev, node, timeval_current_ofs(1, 0), + ctdb_ibw_node_connect_event, node); + } + } break; + default: + assert(0); + break; + } + } + + return 0; +} + +int ctdb_ibw_receive_handler(struct ibw_conn *conn, void *buf, int n) +{ + struct ctdb_context *ctdb = talloc_get_type(conn->ctx->ctx_userdata, struct ctdb_context); + void *buf2; /* future TODO: a solution for removal of this */ + + assert(ctdb!=NULL); + assert(buf!=NULL); + assert(conn!=NULL); + assert(conn->state==IBWC_CONNECTED); + + /* so far "buf" is an ib-registered memory area + * and being reused for next receive + * noticed that HL requires talloc-ed memory to be stolen */ + buf2 = talloc_zero_size(conn, n); + memcpy(buf2, buf, n); + + ctdb->upcalls->recv_pkt(ctdb, (uint8_t *)buf2, (uint32_t)n); + + return 0; +} diff --git a/source4/cluster/ctdb/ib/ibw_ctdb.h b/source4/cluster/ctdb/ib/ibw_ctdb.h new file mode 100644 index 0000000000..8286eef65a --- /dev/null +++ b/source4/cluster/ctdb/ib/ibw_ctdb.h @@ -0,0 +1,46 @@ +/* + * Unix SMB/CIFS implementation. + * Join infiniband wrapper and ctdb. + * + * Copyright (C) Sven Oehme 2006 + * + * Major code contributions by Peter Somogyi + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +struct ctdb_ibw_msg { + uint8_t *data; + uint32_t length; + struct ctdb_ibw_msg *prev; + struct ctdb_ibw_msg *next; +}; + +struct ctdb_ibw_node { + struct ibw_conn *conn; + + struct ctdb_ibw_msg *queue; + struct ctdb_ibw_msg *queue_last; + int qcnt; +}; + +int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn); +int ctdb_ibw_receive_handler(struct ibw_conn *conn, void *buf, int n); + +int ctdb_ibw_node_connect(struct ctdb_node *node); +void ctdb_ibw_node_connect_event(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data); + +int ctdb_flush_cn_queue(struct ctdb_ibw_node *cn); diff --git a/source4/cluster/ctdb/ib/ibw_ctdb_init.c b/source4/cluster/ctdb/ib/ibw_ctdb_init.c new file mode 100644 index 0000000000..3b0c6ad28f --- /dev/null +++ b/source4/cluster/ctdb/ib/ibw_ctdb_init.c @@ -0,0 +1,214 @@ +/* + * Unix SMB/CIFS implementation. + * Join infiniband wrapper and ctdb. + * + * Copyright (C) Sven Oehme 2006 + * + * Major code contributions by Peter Somogyi + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include "includes.h" +#include "lib/events/events.h" +#include +#include +#include "ctdb_private.h" +#include "ibwrapper.h" +#include "ibw_ctdb.h" + +static int ctdb_ibw_listen(struct ctdb_context *ctdb, int backlog) +{ + struct ibw_ctx *ictx = talloc_get_type(ctdb->private_data, struct ibw_ctx); + struct sockaddr_in my_addr; + + assert(ictx!=NULL); + memset(&my_addr, 0, sizeof(struct sockaddr_in)); + my_addr.sin_port = htons(ctdb->address.port); + my_addr.sin_family = PF_INET; + inet_pton(AF_INET, ctdb->address.address, &my_addr.sin_addr); + + if (ibw_bind(ictx, &my_addr)) { + DEBUG(0, ("ctdb_ibw_listen: ibw_bind failed\n")); + return -1; + } + + if (ibw_listen(ictx, backlog)) { + DEBUG(0, ("ctdb_ibw_listen: ibw_listen failed\n")); + return -1; + } + + return 0; +} + +/* + * Start infiniband + */ +static int ctdb_ibw_start(struct ctdb_context *ctdb) +{ + int i; + + /* listen on our own address */ + if (ctdb_ibw_listen(ctdb, 10)) /* TODO: backlog as param */ + return -1; + + /* everything async here */ + for (i=0;inum_nodes;i++) { + struct ctdb_node *node = ctdb->nodes[i]; + if (!(ctdb->flags & CTDB_FLAG_SELF_CONNECT) && + ctdb_same_address(&ctdb->address, &node->address)) + continue; + ctdb_ibw_node_connect(node); + } + + return 0; +} + +/* + * initialise ibw portion of a ctdb node + */ +static int ctdb_ibw_add_node(struct ctdb_node *node) +{ + struct ibw_ctx *ictx = talloc_get_type(node->ctdb->private_data, struct ibw_ctx); + struct ctdb_ibw_node *cn = talloc_zero(node, struct ctdb_ibw_node); + + assert(cn!=NULL); + cn->conn = ibw_conn_new(ictx, node); + node->private_data = (void *)cn; + + return (cn->conn!=NULL ? 0 : -1); +} + +static int ctdb_ibw_send_pkt(struct ibw_conn *conn, uint8_t *data, uint32_t length) +{ + void *buf, *key; + + if (ibw_alloc_send_buf(conn, &buf, &key, length)) { + DEBUG(0, ("queue_pkt/ibw_alloc_send_buf failed\n")); + return -1; + } + + memcpy(buf, data, length); + return ibw_send(conn, buf, key, length); +} + +int ctdb_flush_cn_queue(struct ctdb_ibw_node *cn) +{ + struct ctdb_ibw_msg *p; + int rc = 0; + + while(cn->queue) { + p = cn->queue; + rc = ctdb_ibw_send_pkt(cn->conn, p->data, p->length); + if (rc) + return -1; /* will be retried later when conn is up */ + + DLIST_REMOVE(cn->queue, p); + cn->qcnt--; + talloc_free(p); /* it will talloc_free p->data as well */ + } + assert(cn->qcnt==0); + /* cn->queue_last = NULL is not needed - see DLIST_ADD_AFTER */ + + return rc; +} + +static int ctdb_ibw_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length) +{ + struct ctdb_ibw_node *cn = talloc_get_type(node->private_data, struct ctdb_ibw_node); + int rc; + + assert(length>=sizeof(uint32_t)); + assert(cn!=NULL); + + if (cn->conn==NULL) { + DEBUG(0, ("ctdb_ibw_queue_pkt: conn is NULL\n")); + return -1; + } + + if (cn->conn->state==IBWC_CONNECTED) { + rc = ctdb_ibw_send_pkt(cn->conn, data, length); + } else { + struct ctdb_ibw_msg *p = talloc_zero(cn, struct ctdb_ibw_msg); + p->data = talloc_memdup(p, data, length); + p->length = length; + + DLIST_ADD_AFTER(cn->queue, p, cn->queue_last); + cn->queue_last = p; + cn->qcnt++; + + rc = 0; + } + + return rc; +} + +/* + * transport packet allocator - allows transport to control memory for packets + */ +static void *ctdb_ibw_allocate_pkt(struct ctdb_context *ctdb, size_t size) +{ + /* TODO: use ibw_alloc_send_buf instead... */ + return talloc_size(ctdb, size); +} + +#ifdef __NOTDEF__ + +static int ctdb_ibw_stop(struct ctdb_context *cctx) +{ + struct ibw_ctx *ictx = talloc_get_type(cctx->private_data, struct ibw_ctx); + + assert(ictx!=NULL); + return ibw_stop(ictx); +} + +#endif /* __NOTDEF__ */ + +static const struct ctdb_methods ctdb_ibw_methods = { + .start = ctdb_ibw_start, + .add_node = ctdb_ibw_add_node, + .queue_pkt = ctdb_ibw_queue_pkt, + .allocate_pkt = ctdb_ibw_allocate_pkt, + +// .stop = ctdb_ibw_stop +}; + +/* + * initialise ibw portion of ctdb + */ +int ctdb_ibw_init(struct ctdb_context *ctdb) +{ + struct ibw_ctx *ictx; + + DEBUG(10, ("ctdb_ibw_init invoked...\n")); + ictx = ibw_init( + NULL, //struct ibw_initattr *attr, /* TODO */ + 0, //int nattr, /* TODO */ + ctdb, + ctdb_ibw_connstate_handler, + ctdb_ibw_receive_handler, + ctdb->ev); + + if (ictx==NULL) { + DEBUG(0, ("ctdb_ibw_init: ibw_init failed\n")); + return -1; + } + + ctdb->methods = &ctdb_ibw_methods; + ctdb->private_data = ictx; + + DEBUG(10, ("ctdb_ibw_init succeeded.\n")); + return 0; +} diff --git a/source4/cluster/ctdb/ib/ibwrapper.c b/source4/cluster/ctdb/ib/ibwrapper.c new file mode 100644 index 0000000000..f7b233954d --- /dev/null +++ b/source4/cluster/ctdb/ib/ibwrapper.c @@ -0,0 +1,1344 @@ +/* + * Unix SMB/CIFS implementation. + * Wrap Infiniband calls. + * + * Copyright (C) Sven Oehme 2006 + * + * Major code contributions by Peter Somogyi + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "includes.h" +#include "lib/events/events.h" +#include "ibwrapper.h" + +#include +#include "infiniband/sa-kern-abi.h" + +#include "ibwrapper_internal.h" +#include "lib/util/dlinklist.h" + +#define IBW_LASTERR_BUFSIZE 512 +static char ibw_lasterr[IBW_LASTERR_BUFSIZE]; + +#define IBW_MAX_SEND_WR 256 +#define IBW_MAX_RECV_WR 1024 +#define IBW_RECV_BUFSIZE 256 +#define IBW_RECV_THRESHOLD (1 * 1024 * 1024) + +static void ibw_event_handler_verbs(struct event_context *ev, + struct fd_event *fde, uint16_t flags, void *private_data); +static int ibw_fill_cq(struct ibw_conn *conn); +static int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc); +static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc); +static int ibw_send_packet(struct ibw_conn *conn, void *buf, struct ibw_wr *p, uint32_t len); + +static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn, + uint32_t n, struct ibv_mr **ppmr) +{ + void *buf; + + DEBUG(10, ("ibw_alloc_mr(cmid=%p, n=%u)\n", pconn->cm_id, n)); + buf = memalign(pctx->pagesize, n); + if (!buf) { + sprintf(ibw_lasterr, "couldn't allocate memory\n"); + return NULL; + } + + *ppmr = ibv_reg_mr(pconn->pd, buf, n, IBV_ACCESS_LOCAL_WRITE); + if (!*ppmr) { + sprintf(ibw_lasterr, "couldn't allocate mr\n"); + free(buf); + return NULL; + } + + return buf; +} + +static void ibw_free_mr(char **ppbuf, struct ibv_mr **ppmr) +{ + DEBUG(10, ("ibw_free_mr(%u %u)\n", (uint32_t)*ppbuf, (uint32_t)*ppmr)); + if (*ppmr!=NULL) { + ibv_dereg_mr(*ppmr); + *ppmr = NULL; + } + if (*ppbuf) { + free(*ppbuf); + *ppbuf = NULL; + } +} + +static int ibw_init_memory(struct ibw_conn *conn) +{ + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + struct ibw_opts *opts = &pctx->opts; + int i; + struct ibw_wr *p; + + DEBUG(10, ("ibw_init_memory(cmid: %p)\n", pconn->cm_id)); + pconn->buf_send = ibw_alloc_mr(pctx, pconn, + opts->max_send_wr * opts->recv_bufsize, &pconn->mr_send); + if (!pconn->buf_send) { + sprintf(ibw_lasterr, "couldn't allocate work send buf\n"); + return -1; + } + + pconn->buf_recv = ibw_alloc_mr(pctx, pconn, + opts->max_recv_wr * opts->recv_bufsize, &pconn->mr_recv); + if (!pconn->buf_recv) { + sprintf(ibw_lasterr, "couldn't allocate work recv buf\n"); + return -1; + } + + pconn->wr_index = talloc_size(pconn, opts->max_send_wr * sizeof(struct ibw_wr *)); + assert(pconn->wr_index!=NULL); + + for(i=0; imax_send_wr; i++) { + p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr); + p->buf = pconn->buf_send + (i * opts->recv_bufsize); + p->wr_id = i; + + DLIST_ADD(pconn->wr_list_avail, p); + } + + return 0; +} + +static int ibw_ctx_priv_destruct(struct ibw_ctx_priv *pctx) +{ + DEBUG(10, ("ibw_ctx_priv_destruct(%u)\n", (uint32_t)pctx)); + + /* destroy cm */ + if (pctx->cm_channel) { + rdma_destroy_event_channel(pctx->cm_channel); + pctx->cm_channel = NULL; + } + if (pctx->cm_channel_event) { + /* TODO: do we have to do this here? */ + talloc_free(pctx->cm_channel_event); + pctx->cm_channel_event = NULL; + } + if (pctx->cm_id) { + rdma_destroy_id(pctx->cm_id); + pctx->cm_id = NULL; + } + + return 0; +} + +static int ibw_ctx_destruct(struct ibw_ctx *ctx) +{ + DEBUG(10, ("ibw_ctx_destruct(%u)\n", (uint32_t)ctx)); + return 0; +} + +static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn) +{ + DEBUG(10, ("ibw_conn_priv_destruct(%p, cmid: %p)\n", + pconn, pconn->cm_id)); + + /* pconn->wr_index is freed by talloc */ + /* pconn->wr_index[i] are freed by talloc */ + + /* destroy verbs */ + if (pconn->cm_id!=NULL && pconn->cm_id->qp!=NULL) { + rdma_destroy_qp(pconn->cm_id); + pconn->cm_id->qp = NULL; + } + + if (pconn->cq!=NULL) { + ibv_destroy_cq(pconn->cq); + pconn->cq = NULL; + } + + if (pconn->verbs_channel!=NULL) { + ibv_destroy_comp_channel(pconn->verbs_channel); + pconn->verbs_channel = NULL; + } + + /* must be freed here because its order is important */ + if (pconn->verbs_channel_event) { + talloc_free(pconn->verbs_channel_event); + pconn->verbs_channel_event = NULL; + } + + /* free memory regions */ + ibw_free_mr(&pconn->buf_send, &pconn->mr_send); + ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv); + + if (pconn->pd) { + ibv_dealloc_pd(pconn->pd); + pconn->pd = NULL; + DEBUG(10, ("pconn=%p pd deallocated\n", pconn)); + } + + if (pconn->cm_id) { + rdma_destroy_id(pconn->cm_id); + pconn->cm_id = NULL; + DEBUG(10, ("pconn=%p cm_id destroyed\n", pconn)); + } + + return 0; +} + +static int ibw_wr_destruct(struct ibw_wr *wr) +{ + if (wr->buf_large!=NULL) + ibw_free_mr(&wr->buf_large, &wr->mr_large); + return 0; +} + +static int ibw_conn_destruct(struct ibw_conn *conn) +{ + DEBUG(10, ("ibw_conn_destruct(%u)\n", (uint32_t)conn)); + + /* important here: ctx is a talloc _parent_ */ + DLIST_REMOVE(conn->ctx->conn_list, conn); + return 0; +} + +struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx, TALLOC_CTX *mem_ctx) +{ + struct ibw_conn *conn; + struct ibw_conn_priv *pconn; + + assert(ctx!=NULL); + + conn = talloc_zero(mem_ctx, struct ibw_conn); + assert(conn!=NULL); + talloc_set_destructor(conn, ibw_conn_destruct); + + pconn = talloc_zero(conn, struct ibw_conn_priv); + assert(pconn!=NULL); + talloc_set_destructor(pconn, ibw_conn_priv_destruct); + + conn->ctx = ctx; + conn->internal = (void *)pconn; + + DLIST_ADD(ctx->conn_list, conn); + + return conn; +} + +static int ibw_setup_cq_qp(struct ibw_conn *conn) +{ + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + struct ibv_qp_init_attr init_attr; + struct ibv_qp_attr attr; + int rc; + + DEBUG(10, ("ibw_setup_cq_qp(cmid: %p)\n", pconn->cm_id)); + + /* init verbs */ + pconn->verbs_channel = ibv_create_comp_channel(pconn->cm_id->verbs); + if (!pconn->verbs_channel) { + sprintf(ibw_lasterr, "ibv_create_comp_channel failed %d\n", errno); + return -1; + } + DEBUG(10, ("created channel %p\n", pconn->verbs_channel)); + + pconn->verbs_channel_event = event_add_fd(pctx->ectx, NULL, /* not pconn or conn */ + pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn); + + pconn->pd = ibv_alloc_pd(pconn->cm_id->verbs); + if (!pconn->pd) { + sprintf(ibw_lasterr, "ibv_alloc_pd failed %d\n", errno); + return -1; + } + DEBUG(10, ("created pd %p\n", pconn->pd)); + + /* init mr */ + if (ibw_init_memory(conn)) + return -1; + + /* init cq */ + pconn->cq = ibv_create_cq(pconn->cm_id->verbs, + pctx->opts.max_recv_wr + pctx->opts.max_send_wr, + conn, pconn->verbs_channel, 0); + if (pconn->cq==NULL) { + sprintf(ibw_lasterr, "ibv_create_cq failed\n"); + return -1; + } + + rc = ibv_req_notify_cq(pconn->cq, 0); + if (rc) { + sprintf(ibw_lasterr, "ibv_req_notify_cq failed with %d\n", rc); + return rc; + } + + /* init qp */ + memset(&init_attr, 0, sizeof(init_attr)); + init_attr.cap.max_send_wr = pctx->opts.max_send_wr; + init_attr.cap.max_recv_wr = pctx->opts.max_recv_wr; + init_attr.cap.max_recv_sge = 1; + init_attr.cap.max_send_sge = 1; + init_attr.qp_type = IBV_QPT_RC; + init_attr.send_cq = pconn->cq; + init_attr.recv_cq = pconn->cq; + + rc = rdma_create_qp(pconn->cm_id, pconn->pd, &init_attr); + if (rc) { + sprintf(ibw_lasterr, "rdma_create_qp failed with %d\n", rc); + return rc; + } + /* elase result is in pconn->cm_id->qp */ + + rc = ibv_query_qp(pconn->cm_id->qp, &attr, IBV_QP_PATH_MTU, &init_attr); + if (rc) { + sprintf(ibw_lasterr, "ibv_query_qp failed with %d\n", rc); + return rc; + } + + return ibw_fill_cq(conn); +} + +static int ibw_refill_cq_recv(struct ibw_conn *conn) +{ + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + int rc; + struct ibv_sge list = { + .addr = (uintptr_t) NULL, /* filled below */ + .length = pctx->opts.recv_bufsize, + .lkey = pconn->mr_recv->lkey /* always the same */ + }; + struct ibv_recv_wr wr = { + .wr_id = 0, /* filled below */ + .sg_list = &list, + .num_sge = 1, + }; + struct ibv_recv_wr *bad_wr; + + DEBUG(10, ("ibw_refill_cq_recv(cmid: %p)\n", pconn->cm_id)); + + list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index; + wr.wr_id = pconn->recv_index; + pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr; + + rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr); + if (rc) { + sprintf(ibw_lasterr, "refill/ibv_post_recv failed with %d\n", rc); + DEBUG(0, (ibw_lasterr)); + return -2; + } + + return 0; +} + +static int ibw_fill_cq(struct ibw_conn *conn) +{ + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + int i, rc; + struct ibv_sge list = { + .addr = (uintptr_t) NULL, /* filled below */ + .length = pctx->opts.recv_bufsize, + .lkey = pconn->mr_recv->lkey /* always the same */ + }; + struct ibv_recv_wr wr = { + .wr_id = 0, /* filled below */ + .sg_list = &list, + .num_sge = 1, + }; + struct ibv_recv_wr *bad_wr; + + DEBUG(10, ("ibw_fill_cq(cmid: %p)\n", pconn->cm_id)); + + for(i = pctx->opts.max_recv_wr; i!=0; i--) { + list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index; + wr.wr_id = pconn->recv_index; + pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr; + + rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr); + if (rc) { + sprintf(ibw_lasterr, "fill/ibv_post_recv failed with %d\n", rc); + DEBUG(0, (ibw_lasterr)); + return -2; + } + } + + return 0; +} + +static int ibw_manage_connect(struct ibw_conn *conn) +{ + struct rdma_conn_param conn_param; + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + int rc; + + DEBUG(10, ("ibw_manage_connect(cmid: %p)\n", pconn->cm_id)); + + if (ibw_setup_cq_qp(conn)) + return -1; + + /* cm connect */ + memset(&conn_param, 0, sizeof conn_param); + conn_param.responder_resources = 1; + conn_param.initiator_depth = 1; + conn_param.retry_count = 10; + + rc = rdma_connect(pconn->cm_id, &conn_param); + if (rc) + sprintf(ibw_lasterr, "rdma_connect error %d\n", rc); + + return rc; +} + +static void ibw_event_handler_cm(struct event_context *ev, + struct fd_event *fde, uint16_t flags, void *private_data) +{ + int rc; + struct ibw_ctx *ctx = talloc_get_type(private_data, struct ibw_ctx); + struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv); + struct ibw_conn *conn = NULL; + struct ibw_conn_priv *pconn = NULL; + struct rdma_cm_id *cma_id = NULL; + struct rdma_cm_event *event = NULL; + + assert(ctx!=NULL); + + rc = rdma_get_cm_event(pctx->cm_channel, &event); + if (rc) { + ctx->state = IBWS_ERROR; + sprintf(ibw_lasterr, "rdma_get_cm_event error %d\n", rc); + goto error; + } + cma_id = event->id; + + DEBUG(10, ("cma_event type %d cma_id %p (%s)\n", event->event, cma_id, + (cma_id == pctx->cm_id) ? "parent" : "child")); + + switch (event->event) { + case RDMA_CM_EVENT_ADDR_RESOLVED: + DEBUG(11, ("RDMA_CM_EVENT_ADDR_RESOLVED\n")); + /* continuing from ibw_connect ... */ + rc = rdma_resolve_route(cma_id, 2000); + if (rc) { + sprintf(ibw_lasterr, "rdma_resolve_route error %d\n", rc); + goto error; + } + /* continued at RDMA_CM_EVENT_ROUTE_RESOLVED */ + break; + + case RDMA_CM_EVENT_ROUTE_RESOLVED: + DEBUG(11, ("RDMA_CM_EVENT_ROUTE_RESOLVED\n")); + /* after RDMA_CM_EVENT_ADDR_RESOLVED: */ + assert(cma_id->context!=NULL); + conn = talloc_get_type(cma_id->context, struct ibw_conn); + + rc = ibw_manage_connect(conn); + if (rc) + goto error; + + break; + + case RDMA_CM_EVENT_CONNECT_REQUEST: + DEBUG(11, ("RDMA_CM_EVENT_CONNECT_REQUEST\n")); + ctx->state = IBWS_CONNECT_REQUEST; + conn = ibw_conn_new(ctx, ctx); + pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + pconn->cm_id = cma_id; /* !!! event will be freed but id not */ + cma_id->context = (void *)conn; + DEBUG(10, ("pconn->cm_id %p\n", pconn->cm_id)); + + if (ibw_setup_cq_qp(conn)) + goto error; + + conn->state = IBWC_INIT; + pctx->connstate_func(ctx, conn); + + /* continued at ibw_accept when invoked by the func above */ + if (!pconn->is_accepted) { + rc = rdma_reject(cma_id, NULL, 0); + if (rc) + DEBUG(0, ("rdma_reject failed with rc=%d\n", rc)); + talloc_free(conn); + DEBUG(10, ("pconn->cm_id %p wasn't accepted\n", pconn->cm_id)); + } + + /* TODO: clarify whether if it's needed by upper layer: */ + ctx->state = IBWS_READY; + pctx->connstate_func(ctx, NULL); + + /* NOTE: more requests can arrive until RDMA_CM_EVENT_ESTABLISHED ! */ + break; + + case RDMA_CM_EVENT_ESTABLISHED: + /* expected after ibw_accept and ibw_connect[not directly] */ + DEBUG(0, ("ESTABLISHED (conn: %p)\n", cma_id->context)); + conn = talloc_get_type(cma_id->context, struct ibw_conn); + assert(conn!=NULL); /* important assumption */ + + DEBUG(10, ("ibw_setup_cq_qp succeeded (cmid=%p)\n", cma_id)); + + /* client conn is up */ + conn->state = IBWC_CONNECTED; + + /* both ctx and conn have changed */ + pctx->connstate_func(ctx, conn); + break; + + case RDMA_CM_EVENT_ADDR_ERROR: + sprintf(ibw_lasterr, "RDMA_CM_EVENT_ADDR_ERROR, error %d\n", event->status); + case RDMA_CM_EVENT_ROUTE_ERROR: + sprintf(ibw_lasterr, "RDMA_CM_EVENT_ROUTE_ERROR, error %d\n", event->status); + case RDMA_CM_EVENT_CONNECT_ERROR: + sprintf(ibw_lasterr, "RDMA_CM_EVENT_CONNECT_ERROR, error %d\n", event->status); + case RDMA_CM_EVENT_UNREACHABLE: + sprintf(ibw_lasterr, "RDMA_CM_EVENT_UNREACHABLE, error %d\n", event->status); + case RDMA_CM_EVENT_REJECTED: + sprintf(ibw_lasterr, "RDMA_CM_EVENT_REJECTED, error %d\n", event->status); + conn = talloc_get_type(cma_id->context, struct ibw_conn); + if (conn) { + if ((rc=rdma_ack_cm_event(event))) + DEBUG(0, ("reject/rdma_ack_cm_event failed with %d\n", rc)); + event = NULL; + pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + ibw_conn_priv_destruct(pconn); + } + goto error; + + case RDMA_CM_EVENT_DISCONNECTED: + DEBUG(11, ("RDMA_CM_EVENT_DISCONNECTED\n")); + if ((rc=rdma_ack_cm_event(event))) + DEBUG(0, ("disc/rdma_ack_cm_event failed with %d\n", rc)); + event = NULL; /* don't ack more */ + + if (cma_id!=pctx->cm_id) { + DEBUG(0, ("client DISCONNECT event cm_id=%p\n", cma_id)); + conn = talloc_get_type(cma_id->context, struct ibw_conn); + conn->state = IBWC_DISCONNECTED; + pctx->connstate_func(NULL, conn); + } + break; + + case RDMA_CM_EVENT_DEVICE_REMOVAL: + sprintf(ibw_lasterr, "cma detected device removal!\n"); + goto error; + + default: + sprintf(ibw_lasterr, "unknown event %d\n", event->event); + goto error; + } + + if (event!=NULL && (rc=rdma_ack_cm_event(event))) { + sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc); + goto error; + } + + return; +error: + if (event!=NULL && (rc=rdma_ack_cm_event(event))) { + sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc); + goto error; + } + + DEBUG(0, ("cm event handler: %s", ibw_lasterr)); + + if (cma_id!=pctx->cm_id) { + conn = talloc_get_type(cma_id->context, struct ibw_conn); + if (conn) + conn->state = IBWC_ERROR; + pctx->connstate_func(NULL, conn); + } else { + ctx->state = IBWS_ERROR; + pctx->connstate_func(ctx, NULL); + } +} + +static void ibw_event_handler_verbs(struct event_context *ev, + struct fd_event *fde, uint16_t flags, void *private_data) +{ + struct ibw_conn *conn = talloc_get_type(private_data, struct ibw_conn); + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + + struct ibv_wc wc; + int rc; + struct ibv_cq *ev_cq; + void *ev_ctx; + + DEBUG(10, ("ibw_event_handler_verbs(%u)\n", (uint32_t)flags)); + + /* TODO: check whether if it's good to have more channels here... */ + rc = ibv_get_cq_event(pconn->verbs_channel, &ev_cq, &ev_ctx); + if (rc) { + sprintf(ibw_lasterr, "Failed to get cq_event with %d\n", rc); + goto error; + } + if (ev_cq != pconn->cq) { + sprintf(ibw_lasterr, "ev_cq(%p) != pconn->cq(%p)\n", ev_cq, pconn->cq); + goto error; + } + rc = ibv_req_notify_cq(pconn->cq, 0); + if (rc) { + sprintf(ibw_lasterr, "Couldn't request CQ notification (%d)\n", rc); + goto error; + } + + while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) { + if (wc.status) { + sprintf(ibw_lasterr, "cq completion failed status=%d, opcode=%d, rc=%d\n", + wc.status, wc.opcode, rc); + goto error; + } + + switch(wc.opcode) { + case IBV_WC_SEND: + DEBUG(10, ("send completion\n")); + if (ibw_wc_send(conn, &wc)) + goto error; + break; + + case IBV_WC_RDMA_WRITE: + DEBUG(10, ("rdma write completion\n")); + break; + + case IBV_WC_RDMA_READ: + DEBUG(10, ("rdma read completion\n")); + break; + + case IBV_WC_RECV: + DEBUG(10, ("recv completion\n")); + if (ibw_wc_recv(conn, &wc)) + goto error; + break; + + default: + sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode); + goto error; + } + } + if (rc!=0) { + sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc); + goto error; + } + + ibv_ack_cq_events(pconn->cq, 1); + + return; +error: + ibv_ack_cq_events(pconn->cq, 1); + + DEBUG(0, (ibw_lasterr)); + + if (conn->state!=IBWC_ERROR) { + conn->state = IBWC_ERROR; + pctx->connstate_func(NULL, conn); + } +} + +static int ibw_process_queue(struct ibw_conn *conn) +{ + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + struct ibw_ctx_priv *pctx; + struct ibw_wr *p; + int rc; + uint32_t msg_size; + + if (pconn->queue==NULL) + return 0; /* NOP */ + + p = pconn->queue; + + /* we must have at least 1 fragment to send */ + assert(p->queued_ref_cnt>0); + p->queued_ref_cnt--; + + pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + msg_size = (p->queued_ref_cnt) ? pctx->opts.recv_bufsize : p->queued_rlen; + + assert(p->queued_msg!=NULL); + assert(msg_size!=0); + + DEBUG(10, ("ibw_process_queue refcnt=%d msgsize=%u\n", + p->queued_ref_cnt, msg_size)); + + rc = ibw_send_packet(conn, p->queued_msg, p, msg_size); + + /* was this the last fragment? */ + if (p->queued_ref_cnt) { + p->queued_msg += pctx->opts.recv_bufsize; + } else { + DLIST_REMOVE2(pconn->queue, p, qprev, qnext); + p->queued_msg = NULL; + } + + return rc; +} + +static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc) +{ + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + struct ibw_wr *p; + int send_index; + + DEBUG(10, ("ibw_wc_send(cmid: %p, wr_id: %u, bl: %u)\n", + pconn->cm_id, (uint32_t)wc->wr_id, (uint32_t)wc->byte_len)); + + assert(pconn->cm_id->qp->qp_num==wc->qp_num); + assert(wc->wr_id >= pctx->opts.max_recv_wr); + send_index = wc->wr_id - pctx->opts.max_recv_wr; + pconn->wr_sent--; + + if (send_index < pctx->opts.max_send_wr) { + DEBUG(10, ("ibw_wc_send#1 %u\n", (int)wc->wr_id)); + p = pconn->wr_index[send_index]; + if (p->buf_large!=NULL) { + if (p->ref_cnt) { + /* awaiting more of it... */ + p->ref_cnt--; + } else { + ibw_free_mr(&p->buf_large, &p->mr_large); + DLIST_REMOVE(pconn->wr_list_used, p); + DLIST_ADD(pconn->wr_list_avail, p); + } + } else { /* nasty - but necessary */ + DLIST_REMOVE(pconn->wr_list_used, p); + DLIST_ADD(pconn->wr_list_avail, p); + } + } else { /* "extra" request - not optimized */ + DEBUG(10, ("ibw_wc_send#2 %u\n", (int)wc->wr_id)); + for(p=pconn->extra_sent; p!=NULL; p=p->next) + if ((p->wr_id + pctx->opts.max_recv_wr)==(int)wc->wr_id) + break; + if (p==NULL) { + sprintf(ibw_lasterr, "failed to find wr_id %d\n", (int)wc->wr_id); + return -1; + } + if (p->ref_cnt) { + p->ref_cnt--; + } else { + ibw_free_mr(&p->buf_large, &p->mr_large); + DLIST_REMOVE(pconn->extra_sent, p); + DLIST_ADD(pconn->extra_avail, p); + } + } + + return ibw_process_queue(conn); +} + +static int ibw_append_to_part(struct ibw_conn_priv *pconn, + struct ibw_part *part, char **pp, uint32_t add_len, int info) +{ + DEBUG(10, ("ibw_append_to_part: cmid=%p, (bs=%u, len=%u, tr=%u), al=%u, i=%u\n", + pconn->cm_id, part->bufsize, part->len, part->to_read, add_len, info)); + + /* allocate more if necessary - it's an "evergrowing" buffer... */ + if (part->len + add_len > part->bufsize) { + if (part->buf==NULL) { + assert(part->len==0); + part->buf = talloc_size(pconn, add_len); + if (part->buf==NULL) { + sprintf(ibw_lasterr, "recv talloc_size error (%u) #%d\n", + add_len, info); + return -1; + } + part->bufsize = add_len; + } else { + part->buf = talloc_realloc_size(pconn, + part->buf, part->len + add_len); + if (part->buf==NULL) { + sprintf(ibw_lasterr, "recv realloc error (%u + %u) #%d\n", + part->len, add_len, info); + return -1; + } + } + part->bufsize = part->len + add_len; + } + + /* consume pp */ + memcpy(part->buf + part->len, *pp, add_len); + *pp += add_len; + part->len += add_len; + part->to_read -= add_len; + + return 0; +} + +static int ibw_wc_mem_threshold(struct ibw_conn_priv *pconn, + struct ibw_part *part, uint32_t threshold) +{ + DEBUG(10, ("ibw_wc_mem_threshold: cmid=%p, (bs=%u, len=%u, tr=%u), thr=%u\n", + pconn->cm_id, part->bufsize, part->len, part->to_read, threshold)); + + if (part->bufsize > threshold) { + DEBUG(3, ("ibw_wc_mem_threshold: cmid=%p, %u > %u\n", + pconn->cm_id, part->bufsize, threshold)); + talloc_free(part->buf); + part->buf = talloc_size(pconn, threshold); + if (part->buf==NULL) { + sprintf(ibw_lasterr, "talloc_size failed\n"); + return -1; + } + part->bufsize = threshold; + } + return 0; +} + +static int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc) +{ + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + struct ibw_part *part = &pconn->part; + char *p; + uint32_t remain = wc->byte_len; + + DEBUG(10, ("ibw_wc_recv: cmid=%p, wr_id: %u, bl: %u\n", + pconn->cm_id, (uint32_t)wc->wr_id, remain)); + + assert(pconn->cm_id->qp->qp_num==wc->qp_num); + assert((int)wc->wr_id < pctx->opts.max_recv_wr); + assert(wc->byte_len <= pctx->opts.recv_bufsize); + + p = pconn->buf_recv + ((int)wc->wr_id * pctx->opts.recv_bufsize); + + while(remain) { + /* here always true: (part->len!=0 && part->to_read!=0) || + (part->len==0 && part->to_read==0) */ + if (part->len) { /* is there a partial msg to be continued? */ + int read_len = (part->to_read<=remain) ? part->to_read : remain; + if (ibw_append_to_part(pconn, part, &p, read_len, 421)) + goto error; + remain -= read_len; + + if (part->len<=sizeof(uint32_t) && part->to_read==0) { + assert(part->len==sizeof(uint32_t)); + /* set it again now... */ + part->to_read = *((uint32_t *)(part->buf)); /* TODO: ntohl */ + if (part->to_readto_read); + goto error; + } + part->to_read -= sizeof(uint32_t); /* it's already read */ + } + + if (part->to_read==0) { + pctx->receive_func(conn, part->buf, part->len); + part->len = 0; /* tells not having partial data (any more) */ + if (ibw_wc_mem_threshold(pconn, part, pctx->opts.recv_threshold)) + goto error; + } + } else { + if (remain>=sizeof(uint32_t)) { + uint32_t msglen = *(uint32_t *)p; /* TODO: ntohl */ + if (msglenreceive_func(conn, p, msglen); + p += msglen; + remain -= msglen; + } else { + part->to_read = msglen; + /* part->len is already 0 */ + if (ibw_append_to_part(pconn, part, &p, remain, 422)) + goto error; + remain = 0; /* to be continued ... */ + /* part->to_read > 0 here */ + } + } else { /* edge case: */ + part->to_read = sizeof(uint32_t); + /* part->len is already 0 */ + if (ibw_append_to_part(pconn, part, &p, remain, 423)) + goto error; + remain = 0; + /* part->to_read > 0 here */ + } + } + } /* is always decreased at least by 1 */ + + if (ibw_refill_cq_recv(conn)) + goto error; + + return 0; + +error: + DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr)); + return -1; +} + +static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct ibw_opts *opts) +{ + int i; + const char *name, *value; + + DEBUG(10, ("ibw_process_init_attrs: nattr: %d\n", nattr)); + + opts->max_send_wr = IBW_MAX_SEND_WR; + opts->max_recv_wr = IBW_MAX_RECV_WR; + opts->recv_bufsize = IBW_RECV_BUFSIZE; + opts->recv_threshold = IBW_RECV_THRESHOLD; + + for(i=0; imax_send_wr = atoi(value); + else if (strcmp(name, "max_recv_wr")==0) + opts->max_recv_wr = atoi(value); + else if (strcmp(name, "recv_bufsize")==0) + opts->recv_bufsize = atoi(value); + else if (strcmp(name, "recv_threshold")==0) + opts->recv_threshold = atoi(value); + else { + sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name); + return -1; + } + } + return 0; +} + +struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr, + void *ctx_userdata, + ibw_connstate_fn_t ibw_connstate, + ibw_receive_fn_t ibw_receive, + struct event_context *ectx) +{ + struct ibw_ctx *ctx = talloc_zero(NULL, struct ibw_ctx); + struct ibw_ctx_priv *pctx; + int rc; + + DEBUG(10, ("ibw_init(ctx_userdata: %p, ectx: %p)\n", ctx_userdata, ectx)); + + /* initialize basic data structures */ + memset(ibw_lasterr, 0, IBW_LASTERR_BUFSIZE); + + assert(ctx!=NULL); + ibw_lasterr[0] = '\0'; + talloc_set_destructor(ctx, ibw_ctx_destruct); + ctx->ctx_userdata = ctx_userdata; + + pctx = talloc_zero(ctx, struct ibw_ctx_priv); + talloc_set_destructor(pctx, ibw_ctx_priv_destruct); + ctx->internal = (void *)pctx; + assert(pctx!=NULL); + + pctx->connstate_func = ibw_connstate; + pctx->receive_func = ibw_receive; + + pctx->ectx = ectx; + + /* process attributes */ + if (ibw_process_init_attrs(attr, nattr, &pctx->opts)) + goto cleanup; + + /* init cm */ + pctx->cm_channel = rdma_create_event_channel(); + if (!pctx->cm_channel) { + sprintf(ibw_lasterr, "rdma_create_event_channel error %d\n", errno); + goto cleanup; + } + + pctx->cm_channel_event = event_add_fd(pctx->ectx, pctx, + pctx->cm_channel->fd, EVENT_FD_READ, ibw_event_handler_cm, ctx); + + rc = rdma_create_id(pctx->cm_channel, &pctx->cm_id, ctx, RDMA_PS_TCP); + if (rc) { + rc = errno; + sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc); + goto cleanup; + } + DEBUG(10, ("created cm_id %p\n", pctx->cm_id)); + + pctx->pagesize = sysconf(_SC_PAGESIZE); + + return ctx; + /* don't put code here */ +cleanup: + DEBUG(0, (ibw_lasterr)); + + if (ctx) + talloc_free(ctx); + + return NULL; +} + +int ibw_stop(struct ibw_ctx *ctx) +{ + struct ibw_ctx_priv *pctx = (struct ibw_ctx_priv *)ctx->internal; + struct ibw_conn *p; + + DEBUG(10, ("ibw_stop\n")); + + for(p=ctx->conn_list; p!=NULL; p=p->next) { + if (ctx->state==IBWC_ERROR || ctx->state==IBWC_CONNECTED) { + if (ibw_disconnect(p)) + return -1; + } + } + + ctx->state = IBWS_STOPPED; + pctx->connstate_func(ctx, NULL); + + return 0; +} + +int ibw_bind(struct ibw_ctx *ctx, struct sockaddr_in *my_addr) +{ + struct ibw_ctx_priv *pctx = (struct ibw_ctx_priv *)ctx->internal; + int rc; + + DEBUG(10, ("ibw_bind: addr=%s, port=%u\n", + inet_ntoa(my_addr->sin_addr), ntohs(my_addr->sin_port))); + rc = rdma_bind_addr(pctx->cm_id, (struct sockaddr *) my_addr); + if (rc) { + sprintf(ibw_lasterr, "rdma_bind_addr error %d\n", rc); + DEBUG(0, (ibw_lasterr)); + return rc; + } + DEBUG(10, ("rdma_bind_addr successful\n")); + + return 0; +} + +int ibw_listen(struct ibw_ctx *ctx, int backlog) +{ + struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv); + int rc; + + DEBUG(10, ("ibw_listen\n")); + rc = rdma_listen(pctx->cm_id, backlog); + if (rc) { + sprintf(ibw_lasterr, "rdma_listen failed: %d\n", rc); + DEBUG(0, (ibw_lasterr)); + return rc; + } + + return 0; +} + +int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata) +{ + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + struct rdma_conn_param conn_param; + int rc; + + DEBUG(10, ("ibw_accept: cmid=%p\n", pconn->cm_id)); + conn->conn_userdata = conn_userdata; + + memset(&conn_param, 0, sizeof(struct rdma_conn_param)); + conn_param.responder_resources = 1; + conn_param.initiator_depth = 1; + rc = rdma_accept(pconn->cm_id, &conn_param); + if (rc) { + sprintf(ibw_lasterr, "rdma_accept failed %d\n", rc); + DEBUG(0, (ibw_lasterr)); + return -1;; + } + + pconn->is_accepted = 1; + + /* continued at RDMA_CM_EVENT_ESTABLISHED */ + + return 0; +} + +int ibw_connect(struct ibw_conn *conn, struct sockaddr_in *serv_addr, void *conn_userdata) +{ + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + struct ibw_conn_priv *pconn = NULL; + int rc; + + assert(conn!=NULL); + + conn->conn_userdata = conn_userdata; + pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + DEBUG(10, ("ibw_connect: addr=%s, port=%u\n", inet_ntoa(serv_addr->sin_addr), + ntohs(serv_addr->sin_port))); + + /* clean previous - probably half - initialization */ + if (ibw_conn_priv_destruct(pconn)) { + DEBUG(0, ("ibw_connect/ibw_pconn_destruct failed for cm_id=%p\n", pconn->cm_id)); + return -1; + } + + /* init cm */ + rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP); + if (rc) { + rc = errno; + sprintf(ibw_lasterr, "ibw_connect/rdma_create_id error %d\n", rc); + talloc_free(conn); + return -1; + } + DEBUG(10, ("ibw_connect: rdma_create_id succeeded, cm_id=%p\n", pconn->cm_id)); + + rc = rdma_resolve_addr(pconn->cm_id, NULL, (struct sockaddr *) serv_addr, 2000); + if (rc) { + sprintf(ibw_lasterr, "rdma_resolve_addr error %d\n", rc); + DEBUG(0, (ibw_lasterr)); + talloc_free(conn); + return -1; + } + + /* continued at RDMA_CM_EVENT_ADDR_RESOLVED */ + + return 0; +} + +int ibw_disconnect(struct ibw_conn *conn) +{ + int rc; + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + + DEBUG(10, ("ibw_disconnect: cmid=%p\n", pconn->cm_id)); + + assert(pconn!=NULL); + + switch(conn->state) { + case IBWC_ERROR: + ibw_conn_priv_destruct(pconn); /* do this here right now */ + break; + case IBWC_CONNECTED: + rc = rdma_disconnect(pconn->cm_id); + if (rc) { + sprintf(ibw_lasterr, "ibw_disconnect failed with %d\n", rc); + DEBUG(0, (ibw_lasterr)); + return rc; + } + break; + default: + DEBUG(9, ("invalid state for disconnect: %d\n", conn->state)); + break; + } + + return 0; +} + +int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, uint32_t len) +{ + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + struct ibw_wr *p = pconn->wr_list_avail; + + if (p!=NULL) { + DEBUG(10, ("ibw_alloc_send_buf#1: cmid=%p, len=%d\n", pconn->cm_id, len)); + + DLIST_REMOVE(pconn->wr_list_avail, p); + DLIST_ADD(pconn->wr_list_used, p); + + if (len <= pctx->opts.recv_bufsize) { + *buf = (void *)p->buf; + } else { + p->buf_large = ibw_alloc_mr(pctx, pconn, len, &p->mr_large); + if (p->buf_large==NULL) { + sprintf(ibw_lasterr, "ibw_alloc_mr#1 failed\n"); + goto error; + } + *buf = (void *)p->buf_large; + } + /* p->wr_id is already filled in ibw_init_memory */ + } else { + DEBUG(10, ("ibw_alloc_send_buf#2: cmid=%p, len=%d\n", pconn->cm_id, len)); + /* not optimized */ + p = pconn->extra_avail; + if (!p) { + p = pconn->extra_avail = talloc_zero(pconn, struct ibw_wr); + talloc_set_destructor(p, ibw_wr_destruct); + if (p==NULL) { + sprintf(ibw_lasterr, "talloc_zero failed (emax: %u)\n", pconn->extra_max); + goto error; + } + p->wr_id = pctx->opts.max_send_wr + pconn->extra_max; + pconn->extra_max++; + switch(pconn->extra_max) { + case 1: DEBUG(2, ("warning: queue performed\n")); break; + case 10: DEBUG(0, ("warning: queue reached 10\n")); break; + case 100: DEBUG(0, ("warning: queue reached 100\n")); break; + case 1000: DEBUG(0, ("warning: queue reached 1000\n")); break; + default: break; + } + } + + p->buf_large = ibw_alloc_mr(pctx, pconn, len, &p->mr_large); + if (p->buf_large==NULL) { + sprintf(ibw_lasterr, "ibw_alloc_mr#2 failed\n"); + goto error; + } + *buf = (void *)p->buf_large; + + DLIST_REMOVE(pconn->extra_avail, p); + /* we don't have prepared index for this, so that + * we will have to find this by wr_id later on */ + DLIST_ADD(pconn->extra_sent, p); + } + + *key = (void *)p; + + return 0; +error: + DEBUG(0, ("ibw_alloc_send_buf error: %s", ibw_lasterr)); + return -1; +} + + +static int ibw_send_packet(struct ibw_conn *conn, void *buf, struct ibw_wr *p, uint32_t len) +{ + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + int rc; + + /* can we send it right now? */ + if (pconn->wr_sentopts.max_send_wr) { + struct ibv_send_wr *bad_wr; + struct ibv_sge list = { + .addr = (uintptr_t)buf, + .length = len, + .lkey = pconn->mr_send->lkey + }; + struct ibv_send_wr wr = { + .wr_id = p->wr_id + pctx->opts.max_recv_wr, + .sg_list = &list, + .num_sge = 1, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED, + }; + + if (p->buf_large==NULL) { + DEBUG(10, ("ibw_send#normal(cmid: %p, wrid: %u, n: %d)\n", + pconn->cm_id, (uint32_t)wr.wr_id, len)); + } else { + DEBUG(10, ("ibw_send#large(cmid: %p, wrid: %u, n: %d)\n", + pconn->cm_id, (uint32_t)wr.wr_id, len)); + list.lkey = p->mr_large->lkey; + } + + rc = ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr); + if (rc) { + sprintf(ibw_lasterr, "ibv_post_send error %d (%d)\n", + rc, pconn->wr_sent); + goto error; + } + + pconn->wr_sent++; + + return rc; + } /* else put the request into our own queue: */ + + DEBUG(10, ("ibw_send#queued(cmid: %p, len: %u)\n", pconn->cm_id, len)); + + /* TODO: clarify how to continue when state==IBWC_STOPPED */ + + /* to be sent by ibw_wc_send */ + /* regardless "normal" or [a part of] "large" packet */ + if (!p->queued_ref_cnt) { + DLIST_ADD_END2(pconn->queue, p, struct ibw_wr *, + qprev, qnext); /* TODO: optimize */ + p->queued_msg = buf; + } + p->queued_ref_cnt++; + p->queued_rlen = len; /* last wins; see ibw_wc_send */ + + return 0; +error: + DEBUG(0, (ibw_lasterr)); + return -1; +} + +int ibw_send(struct ibw_conn *conn, void *buf, void *key, uint32_t len) +{ + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + struct ibw_wr *p = talloc_get_type(key, struct ibw_wr); + int rc; + + assert(len>=sizeof(uint32_t)); + assert((*((uint32_t *)buf)==len)); /* TODO: htonl */ + + if (len > pctx->opts.recv_bufsize) { + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + int rlen = len; + char *packet = (char *)buf; + uint32_t recv_bufsize = pctx->opts.recv_bufsize; + + DEBUG(10, ("ibw_send#frag(cmid: %p, buf: %p, len: %u)\n", + pconn->cm_id, buf, len)); + + /* single threaded => no race here: */ + assert(p->ref_cnt==0); + while(rlen > recv_bufsize) { + rc = ibw_send_packet(conn, packet, p, recv_bufsize); + if (rc) + return rc; + packet += recv_bufsize; + rlen -= recv_bufsize; + p->ref_cnt++; /* not good to have it in ibw_send_packet */ + } + if (rlen) { + rc = ibw_send_packet(conn, packet, p, rlen); + p->ref_cnt++; /* not good to have it in ibw_send_packet */ + } + p->ref_cnt--; /* for the same handling */ + } else { + assert(p->ref_cnt==0); + assert(p->queued_ref_cnt==0); + + rc = ibw_send_packet(conn, buf, p, len); + } + return rc; +} + +int ibw_cancel_send_buf(struct ibw_conn *conn, void *buf, void *key) +{ + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + struct ibw_wr *p = talloc_get_type(key, struct ibw_wr); + + assert(p!=NULL); + assert(buf!=NULL); + assert(conn!=NULL); + + if (p->buf_large!=NULL) + ibw_free_mr(&p->buf_large, &p->mr_large); + + /* parallel case */ + if (p->wr_id < pctx->opts.max_send_wr) { + DEBUG(10, ("ibw_cancel_send_buf#1 %u", (int)p->wr_id)); + DLIST_REMOVE(pconn->wr_list_used, p); + DLIST_ADD(pconn->wr_list_avail, p); + } else { /* "extra" packet */ + DEBUG(10, ("ibw_cancel_send_buf#2 %u", (int)p->wr_id)); + DLIST_REMOVE(pconn->extra_sent, p); + DLIST_ADD(pconn->extra_avail, p); + } + + return 0; +} + +const char *ibw_getLastError(void) +{ + return ibw_lasterr; +} diff --git a/source4/cluster/ctdb/ib/ibwrapper.h b/source4/cluster/ctdb/ib/ibwrapper.h new file mode 100644 index 0000000000..36385d6f46 --- /dev/null +++ b/source4/cluster/ctdb/ib/ibwrapper.h @@ -0,0 +1,219 @@ +/* + * Unix SMB/CIFS implementation. + * Wrap Infiniband calls. + * + * Copyright (C) Sven Oehme 2006 + * + * Major code contributions by Peter Somogyi + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +/* Server communication state */ +enum ibw_state_ctx { + IBWS_INIT = 0, /* ctx start - after ibw_init */ + IBWS_READY, /* after ibw_bind & ibw_listen */ + IBWS_CONNECT_REQUEST, /* after [IBWS_READY + incoming request] */ + /* => [(ibw_accept)IBWS_READY | (ibw_disconnect)STOPPED | ERROR] */ + IBWS_STOPPED, /* normal stop <= ibw_disconnect+(IBWS_READY | IBWS_CONNECT_REQUEST) */ + IBWS_ERROR /* abnormal state; ibw_stop must be called after this */ +}; + +/* Connection state */ +struct ibw_ctx { + void *ctx_userdata; /* see ibw_init */ + + enum ibw_state_ctx state; + void *internal; + + struct ibw_conn *conn_list; /* 1st elem of double linked list */ +}; + +enum ibw_state_conn { + IBWC_INIT = 0, /* conn start - internal state */ + IBWC_CONNECTED, /* after ibw_accept or ibw_connect */ + IBWC_DISCONNECTED, /* after ibw_disconnect */ + IBWC_ERROR +}; + +struct ibw_conn { + struct ibw_ctx *ctx; + enum ibw_state_conn state; + + void *conn_userdata; /* see ibw_connect and ibw_accept */ + void *internal; + + struct ibw_conn *prev, *next; +}; + +/* + * (name, value) pair for array param of ibw_init + */ +struct ibw_initattr { + const char *name; + const char *value; +}; + +/* + * Callback function definition which should inform you about + * connection state change + * This callback is invoked whenever server or client connection changes. + * Both and can be NULL if their state didn't change. + * Return nonzero on error. + */ +typedef int (*ibw_connstate_fn_t)(struct ibw_ctx *ctx, struct ibw_conn *conn); + +/* + * Callback function definition which should process incoming packets + * This callback is invoked whenever any message arrives. + * Return nonzero on error. + * + * Important: you mustn't store buf pointer for later use. + * Process its contents before returning. + */ +typedef int (*ibw_receive_fn_t)(struct ibw_conn *conn, void *buf, int n); + +/* + * settings: array of (name, value) pairs + * where name is one of: + * max_send_wr [default is 256] + * max_recv_wr [default is 1024] + * <...> + * + * Must be called _ONCE_ for each node. + * + * max_msg_size is the maximum size of a message + * (max_send_wr + max_recv_wr) * max_msg_size bytes allocated per connection + * + * returns non-NULL on success + * + * talloc_free must be called for the result in IBWS_STOPPED; + * it will close resources by destructor + * connections(ibw_conn *) must have been closed prior talloc_free + */ +struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr, + void *ctx_userdata, + ibw_connstate_fn_t ibw_connstate, + ibw_receive_fn_t ibw_receive, + struct event_context *ectx); + +/* + * Must be called in states of (IBWS_ERROR, IBWS_READY, IBWS_CONNECT_REQUEST) + * + * It will send out disconnect requests and free up ibw_conn structures. + * The ctx->state will transit to IBWS_STOPPED after every conn are disconnected. + * During that time, you mustn't send/recv/disconnect any more. + * Only after ctx->state=IBWS_STOPPED you can talloc_free the ctx. + */ +int ibw_stop(struct ibw_ctx *ctx); + +/*************** connection initiation - like stream sockets *****/ + +/* + * works like socket bind + * needs a normal internet address here + * + * return 0 on success + */ +int ibw_bind(struct ibw_ctx *ctx, struct sockaddr_in *my_addr); + +/* + * works like socket listen + * non-blocking + * enables accepting incoming connections (after IBWS_READY) + * (it doesn't touch ctx->state by itself) + * + * returns 0 on success + */ +int ibw_listen(struct ibw_ctx *ctx, int backlog); + +/* + * works like socket accept + * initializes a connection to a client + * must be called when state=IBWS_CONNECT_REQUEST + * + * returns 0 on success + * + * You have +1 waiting here: you will get ibw_conn (having the + * same member) structure in ibw_connstate_fn_t. + * + * Important: you won't get remote IP address (only internal conn info) + */ +int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata); + +/* + * Create a new connection structure + * available for queueing ibw_send + * + * is needed to be notified by talloc destruct action. + */ +struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx, TALLOC_CTX *mem_ctx); + +/* + * Needs a normal internet address here + * can be called within IBWS_READY|IBWS_CONNECT_REQUEST + * + * returns non-NULL on success + * + * You have +1 waiting here: you will get ibw_conn (having the + * same member) structure in ibw_connstate_fn_t. + */ +int ibw_connect(struct ibw_conn *conn, struct sockaddr_in *serv_addr, void *conn_userdata); + +/* + * Sends out a disconnect request. + * You should process fds after calling this function + * and then process it with ibw_process_event normally + * until you get conn->state = IBWC_DISCONNECTED + * + * You mustn't talloc_free yet right after this, + * first wait for IBWC_DISCONNECTED. + */ +int ibw_disconnect(struct ibw_conn *conn); + +/************ Infiniband specific event loop wrapping ******************/ + +/* + * You have to use this buf to fill in before send. + * It's just to avoid memcpy.in ibw_send. + * Use the same (buf, key) pair with ibw_send. + * Don't use more space than maxsize (see ibw_init). + * + * Returns 0 on success. + */ +int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, uint32_t len); + +/* + * Send the message in one + * Can be invoked any times (should fit into buffers) and at any time + * (in conn->state=IBWC_CONNECTED) + * n must be less or equal than max_msg_size (see ibw_init) + * + * You mustn't use (buf, key) any more for sending. + */ +int ibw_send(struct ibw_conn *conn, void *buf, void *key, uint32_t len); + +/* + * Call this after ibw_alloc_send_buf + * when you won't call ibw_send for (buf, key) + * You mustn't use (buf, key) any more. + */ +int ibw_cancel_send_buf(struct ibw_conn *conn, void *buf, void *key); + +/* + * Retrieves the last error + * result: always non-zero, mustn't be freed (static) + */ +const char *ibw_getLastError(void); diff --git a/source4/cluster/ctdb/ib/ibwrapper_internal.h b/source4/cluster/ctdb/ib/ibwrapper_internal.h new file mode 100644 index 0000000000..9c6bfab519 --- /dev/null +++ b/source4/cluster/ctdb/ib/ibwrapper_internal.h @@ -0,0 +1,127 @@ +/* + * Unix SMB/CIFS implementation. + * Wrap Infiniband calls. + * + * Copyright (C) Sven Oehme 2006 + * + * Major code contributions by Peter Somogyi + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +struct ibw_opts { + uint32_t max_send_wr; + uint32_t max_recv_wr; + uint32_t recv_bufsize; + uint32_t recv_threshold; +}; + +struct ibw_wr { + char *buf; /* initialized in ibw_init_memory once per connection */ + int wr_id; /* position in wr_index list; also used as wr id */ + + char *buf_large; /* allocated specially for "large" message */ + struct ibv_mr *mr_large; + int ref_cnt; /* reference count for ibw_wc_send to know when to release */ + + char *queued_msg; /* set at ibw_send - can be different than above */ + int queued_ref_cnt; /* instead of adding the same to the queue again */ + uint32_t queued_rlen; /* last wins when queued_ref_cnt>0; or simple msg size */ + + struct ibw_wr *next, *prev; /* in wr_list_avail or wr_list_used */ + /* or extra_sent or extra_avail */ + struct ibw_wr *qnext, *qprev; /* in queue */ +}; + +struct ibw_ctx_priv { + struct event_context *ectx; + + struct ibw_opts opts; + + struct rdma_cm_id *cm_id; /* server cm id */ + + struct rdma_event_channel *cm_channel; + struct fd_event *cm_channel_event; + + ibw_connstate_fn_t connstate_func; /* see ibw_init */ + ibw_receive_fn_t receive_func; /* see ibw_init */ + + long pagesize; /* sysconf result for memalign */ +}; + +struct ibw_part { + char *buf; /* talloced memory buffer */ + uint32_t bufsize; /* allocated size of buf - always grows */ + uint32_t len; /* message part length */ + uint32_t to_read; /* 4 or *((uint32_t)buf) if len>=sizeof(uint32_t) */ +}; + +struct ibw_conn_priv { + struct ibv_comp_channel *verbs_channel; + struct fd_event *verbs_channel_event; + + struct rdma_cm_id *cm_id; /* client's cm id */ + struct ibv_pd *pd; + int is_accepted; + + struct ibv_cq *cq; /* qp is in cm_id */ + + char *buf_send; /* max_send_wr * avg_send_size */ + struct ibv_mr *mr_send; + struct ibw_wr *wr_list_avail; + struct ibw_wr *wr_list_used; + struct ibw_wr **wr_index; /* array[0..(qsize-1)] of (ibw_wr *) */ + int wr_sent; /* # of send wrs in the CQ */ + + struct ibw_wr *extra_sent; + struct ibw_wr *extra_avail; + int extra_max; /* max wr_id in the queue */ + + struct ibw_wr *queue; + + /* buf_recv is a ring buffer */ + char *buf_recv; /* max_recv_wr * avg_recv_size */ + struct ibv_mr *mr_recv; + int recv_index; /* index of the next recv buffer when refilling */ + struct ibw_part part; +}; + +/* remove an element from a list - element doesn't have to be in list. */ +#define DLIST_REMOVE2(list, p, prev, next) \ +do { \ + if ((p) == (list)) { \ + (list) = (p)->next; \ + if (list) (list)->prev = NULL; \ + } else { \ + if ((p)->prev) (p)->prev->next = (p)->next; \ + if ((p)->next) (p)->next->prev = (p)->prev; \ + } \ + if ((p) != (list)) (p)->next = (p)->prev = NULL; \ +} while (0) + +/* hook into the end of the list - needs a tmp pointer */ +#define DLIST_ADD_END2(list, p, type, prev, next) \ +do { \ + if (!(list)) { \ + (list) = (p); \ + (p)->next = (p)->prev = NULL; \ + } else { \ + type tmp; \ + for (tmp = (list); tmp->next; tmp = tmp->next) ; \ + tmp->next = (p); \ + (p)->next = NULL; \ + (p)->prev = tmp; \ + } \ +} while (0) diff --git a/source4/cluster/ctdb/ib/ibwrapper_test.c b/source4/cluster/ctdb/ib/ibwrapper_test.c new file mode 100644 index 0000000000..fa506500e4 --- /dev/null +++ b/source4/cluster/ctdb/ib/ibwrapper_test.c @@ -0,0 +1,641 @@ +/* + * Unix SMB/CIFS implementation. + * Test the infiniband wrapper. + * + * Copyright (C) Sven Oehme 2006 + * + * Major code contributions by Peter Somogyi + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "includes.h" +#include "lib/events/events.h" +#include "ib/ibwrapper.h" + +struct ibwtest_ctx { + int is_server; + char *id; /* my id */ + + struct ibw_initattr *attrs; + int nattrs; + char *opts; /* option string */ + + struct sockaddr_in *addrs; /* dynamic array of dest addrs */ + int naddrs; + + unsigned int nsec; /* delta times between messages in nanosec */ + unsigned int sleep_usec; /* microsecs to sleep in the main loop to emulate overloading */ + uint32_t maxsize; /* maximum variable message size */ + + int cnt; + int nsent; + + int nmsg; /* number of messages to send (client) */ + + int kill_me; + int stopping; + int error; + struct ibw_ctx *ibwctx; + + struct timeval start_time, end_time; +}; + +struct ibwtest_conn { + char *id; +}; + +enum testopcode { + TESTOP_SEND_ID = 1, + TESTOP_SEND_TEXT = 2, + TESTOP_SEND_RND = 3 +}; + +int ibwtest_connect_everybody(struct ibwtest_ctx *tcx) +{ + struct ibw_conn *conn; + struct ibwtest_conn *tconn = talloc_zero(tcx, struct ibwtest_conn); + int i; + + for(i=0; inaddrs; i++) { + conn = ibw_conn_new(tcx->ibwctx, tconn); + if (ibw_connect(conn, &tcx->addrs[i], tconn)) { + fprintf(stderr, "ibw_connect error at %d\n", i); + return -1; + } + } + DEBUG(10, ("sent %d connect request...\n", tcx->naddrs)); + + return 0; +} + +int ibwtest_send_id(struct ibw_conn *conn) +{ + struct ibwtest_ctx *tcx = talloc_get_type(conn->ctx->ctx_userdata, struct ibwtest_ctx); + char *buf; + void *key; + uint32_t len; + + DEBUG(10, ("ibwtest_send_id\n")); + len = sizeof(uint32_t)+strlen(tcx->id)+2; + if (ibw_alloc_send_buf(conn, (void **)&buf, &key, len)) { + DEBUG(0, ("send_id: ibw_alloc_send_buf failed\n")); + return -1; + } + + /* first sizeof(uint32_t) size bytes are for length */ + *((uint32_t *)buf) = len; + buf[sizeof(uint32_t)] = (char)TESTOP_SEND_ID; + strcpy(buf+sizeof(uint32_t)+1, tcx->id); + + if (ibw_send(conn, buf, key, len)) { + DEBUG(0, ("send_id: ibw_send error\n")); + return -1; + } + tcx->nsent++; + + return 0; +} + +int ibwtest_send_test_msg(struct ibwtest_ctx *tcx, struct ibw_conn *conn, const char *msg) +{ + char *buf, *p; + void *key; + uint32_t len; + + if (conn->state!=IBWC_CONNECTED) + return 0; /* not yet up */ + + len = strlen(msg) + 2 + sizeof(uint32_t); + if (ibw_alloc_send_buf(conn, (void **)&buf, &key, len)) { + fprintf(stderr, "send_test_msg: ibw_alloc_send_buf failed\n"); + return -1; + } + + *((uint32_t *)buf) = len; + p = buf; + p += sizeof(uint32_t); + p[0] = (char)TESTOP_SEND_TEXT; + p++; + strcpy(p, msg); + + if (ibw_send(conn, buf, key, len)) { + DEBUG(0, ("send_test_msg: ibw_send error\n")); + return -1; + } + tcx->nsent++; + + return 0; +} + +unsigned char ibwtest_fill_random(unsigned char *buf, uint32_t size) +{ + uint32_t i = size; + unsigned char sum = 0; + unsigned char value; + while(i) { + i--; + value = (unsigned char)(256.0 * (rand() / (RAND_MAX + 1.0))); + buf[i] = value; + sum += value; + } + return sum; +} + +unsigned char ibwtest_get_sum(unsigned char *buf, uint32_t size) +{ + uint32_t i = size; + unsigned char sum = 0; + + while(i) { + i--; + sum += buf[i]; + } + return sum; +} + +int ibwtest_do_varsize_scenario_conn_size(struct ibwtest_ctx *tcx, struct ibw_conn *conn, uint32_t size) +{ + unsigned char *buf; + void *key; + uint32_t len; + unsigned char sum; + + len = sizeof(uint32_t) + 1 + size + 1; + if (ibw_alloc_send_buf(conn, (void **)&buf, &key, len)) { + DEBUG(0, ("varsize/ibw_alloc_send_buf failed\n")); + return -1; + } + *((uint32_t *)buf) = len; + buf[sizeof(uint32_t)] = TESTOP_SEND_RND; + sum = ibwtest_fill_random(buf + sizeof(uint32_t) + 1, size); + buf[sizeof(uint32_t) + 1 + size] = sum; + if (ibw_send(conn, buf, key, len)) { + DEBUG(0, ("varsize/ibw_send failed\n")); + return -1; + } + tcx->nsent++; + + return 0; +} + +int ibwtest_do_varsize_scenario_conn(struct ibwtest_ctx *tcx, struct ibw_conn *conn) +{ + uint32_t size; + int i; + + for(i=0; inmsg; i++) + { + //size = (uint32_t)((float)(tcx->maxsize) * (rand() / (RAND_MAX + 1.0))); + size = (uint32_t)((float)(tcx->maxsize) * ((float)(i+1)/(float)tcx->nmsg)); + if (ibwtest_do_varsize_scenario_conn_size(tcx, conn, size)) + return -1; + } + return 0; +} + +/*int ibwtest_do_varsize_scenario(ibwtest_ctx *tcx) +{ + int rc; + struct ibw_conn *conn; + + for(conn=tcx->ibwctx->conn_list; conn!=NULL; conn=conn->next) { + if (conn->state==IBWC_CONNECTED) { + rc = ibwtest_do_varsize_scenario_conn(tcx, conn); + if (rc) + tcx->error = rc; + } + } +}*/ + +int ibwtest_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn) +{ + struct ibwtest_ctx *tcx = NULL; /* userdata */ + struct ibwtest_conn *tconn = NULL; /* userdata */ + + if (ctx) { + tcx = talloc_get_type(ctx->ctx_userdata, struct ibwtest_ctx); + + switch(ctx->state) { + case IBWS_INIT: + DEBUG(10, ("test IBWS_INIT\n")); + break; + case IBWS_READY: + DEBUG(10, ("test IBWS_READY\n")); + break; + case IBWS_CONNECT_REQUEST: + DEBUG(10, ("test IBWS_CONNECT_REQUEST\n")); + tconn = talloc_zero(conn, struct ibwtest_conn); + if (ibw_accept(ctx, conn, tconn)) { + DEBUG(0, ("error accepting the connect request\n")); + } + break; + case IBWS_STOPPED: + DEBUG(10, ("test IBWS_STOPPED\n")); + tcx->kill_me = 1; /* main loop can exit */ + break; + case IBWS_ERROR: + DEBUG(10, ("test IBWS_ERROR\n")); + ibw_stop(tcx->ibwctx); + break; + default: + assert(0); + break; + } + } + + if (conn) { + tconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn); + switch(conn->state) { + case IBWC_INIT: + DEBUG(10, ("test IBWC_INIT\n")); + break; + case IBWC_CONNECTED: + if (gettimeofday(&tcx->start_time, NULL)) { + DEBUG(0, ("gettimeofday error %d", errno)); + return -1; + } + ibwtest_send_id(conn); + break; + case IBWC_DISCONNECTED: + DEBUG(10, ("test IBWC_DISCONNECTED\n")); + talloc_free(conn); + break; + case IBWC_ERROR: + DEBUG(10, ("test IBWC_ERROR\n")); + break; + default: + assert(0); + break; + } + } + return 0; +} + +int ibwtest_receive_handler(struct ibw_conn *conn, void *buf, int n) +{ + struct ibwtest_conn *tconn; + enum testopcode op; + struct ibwtest_ctx *tcx = talloc_get_type(conn->ctx->ctx_userdata, struct ibwtest_ctx); + int rc = 0; + + assert(conn!=NULL); + assert(n>=sizeof(uint32_t)+1); + tconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn); + + op = (enum testopcode)((char *)buf)[sizeof(uint32_t)]; + if (op==TESTOP_SEND_ID) { + tconn->id = talloc_strdup(tconn, ((char *)buf)+sizeof(uint32_t)+1); + } + if (op==TESTOP_SEND_ID || op==TESTOP_SEND_TEXT) { + DEBUG(11, ("[%d]msg from %s: \"%s\"(%d)\n", op, + tconn->id ? tconn->id : "NULL", ((char *)buf)+sizeof(uint32_t)+1, n)); + } + + if (tcx->is_server) { + if (op==TESTOP_SEND_RND) { + unsigned char sum; + sum = ibwtest_get_sum((unsigned char *)buf + sizeof(uint32_t) + 1, + n - sizeof(uint32_t) - 2); + DEBUG(11, ("[%d]msg varsize %u/sum %u from %s\n", + op, + n - sizeof(uint32_t) - 2, + (uint32_t)sum, + tconn->id ? tconn->id : "NULL")); + if (sum!=((unsigned char *)buf)[n-1]) { + DEBUG(0, ("ERROR: checksum mismatch %u!=%u\n", + (uint32_t)sum, (uint32_t)((unsigned char *)buf)[n-1])); + ibw_stop(tcx->ibwctx); + goto error; + } + } else { + char *buf2; + void *key2; + + /* bounce message regardless what it is */ + if (ibw_alloc_send_buf(conn, (void **)&buf2, &key2, n)) { + fprintf(stderr, "ibw_alloc_send_buf error #2\n"); + goto error; + } + memcpy(buf2, buf, n); + if (ibw_send(conn, buf2, key2, n)) { + fprintf(stderr, "ibw_send error #2\n"); + goto error; + } + tcx->nsent++; + } + } else { /* client: */ + if (op==TESTOP_SEND_ID && tcx->maxsize) { + /* send them in one blow */ + rc = ibwtest_do_varsize_scenario_conn(tcx, conn); + } + + if (tcx->nmsg) { + char msg[26]; + sprintf(msg, "hello world %d", tcx->nmsg--); + rc = ibwtest_send_test_msg(tcx, conn, msg); + if (tcx->nmsg==0) { + ibw_stop(tcx->ibwctx); + tcx->stopping = 1; + } + } + } + + if (rc) + tcx->error = rc; + + return rc; +error: + return -1; +} + +void ibwtest_timeout_handler(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ibwtest_ctx *tcx = talloc_get_type(private_data, struct ibwtest_ctx); + int rc; + + if (!tcx->is_server) { + struct ibw_conn *conn; + char msg[50]; + + /* fill it with something variable... */ + sprintf(msg, "hello world %d", tcx->cnt++); + + /* send something to everybody... */ + for(conn=tcx->ibwctx->conn_list; conn!=NULL; conn=conn->next) { + if (conn->state==IBWC_CONNECTED) { + rc = ibwtest_send_test_msg(tcx, conn, msg); + if (rc) + tcx->error = rc; + } + } + } /* else allow main loop run */ +} + +static struct ibwtest_ctx *testctx = NULL; + +void ibwtest_sigint_handler(int sig) +{ + DEBUG(0, ("got SIGINT\n")); + if (testctx) { + if (testctx->ibwctx->state==IBWS_READY || + testctx->ibwctx->state==IBWS_CONNECT_REQUEST || + testctx->ibwctx->state==IBWS_ERROR) + { + if (testctx->stopping) { + DEBUG(10, ("forcing exit...\n")); + testctx->kill_me = 1; + } else { + /* mostly expected case */ + ibw_stop(testctx->ibwctx); + testctx->stopping = 1; + } + } else + testctx->kill_me = 1; + } +} + +int ibwtest_parse_attrs(struct ibwtest_ctx *tcx, char *optext, + struct ibw_initattr **pattrs, int *nattrs, char op) +{ + int i = 0, n = 1; + int porcess_next = 1; + char *p, *q; + struct ibw_initattr *attrs = NULL; + + *pattrs = NULL; + for(p = optext; *p!='\0'; p++) { + if (*p==',') + n++; + } + + attrs = (struct ibw_initattr *)talloc_size(tcx, + n * sizeof(struct ibw_initattr)); + for(p = optext; *p!='\0'; p++) { + if (porcess_next) { + attrs[i].name = p; + q = strchr(p, ':'); + if (q==NULL) { + fprintf(stderr, "-%c format error\n", op); + return -1; + } + *q = '\0'; + attrs[i].value = q + 1; + + porcess_next = 0; + i++; + p = q; /* ++ at end */ + } + if (*p==',') { + *p = '\0'; /* ++ at end */ + porcess_next = 1; + } + } + *pattrs = attrs; + *nattrs = n; + + return 0; +} + +int ibwtest_getdests(struct ibwtest_ctx *tcx, char op) +{ + int i; + struct ibw_initattr *attrs = NULL; + struct sockaddr_in *p; + char *tmp; + + tmp = talloc_strdup(tcx, optarg); + /* hack to reuse the above ibw_initattr parser */ + if (ibwtest_parse_attrs(tcx, tmp, &attrs, &tcx->naddrs, op)) + return -1; + + tcx->addrs = talloc_size(tcx, + tcx->naddrs * sizeof(struct sockaddr_in)); + for(i=0; inaddrs; i++) { + p = tcx->addrs + i; + p->sin_family = AF_INET; + p->sin_addr.s_addr = inet_addr(attrs[i].name); + p->sin_port = htons(atoi(attrs[i].value)); + } + + return 0; +} + +int ibwtest_init_server(struct ibwtest_ctx *tcx) +{ + if (tcx->naddrs!=1) { + fprintf(stderr, "incorrect number of addrs(%d!=1)\n", tcx->naddrs); + return -1; + } + + if (ibw_bind(tcx->ibwctx, &tcx->addrs[0])) { + DEBUG(0, ("ERROR: ibw_bind failed\n")); + return -1; + } + + if (ibw_listen(tcx->ibwctx, 1)) { + DEBUG(0, ("ERROR: ibw_listen failed\n")); + return -1; + } + + /* continued at IBWS_READY */ + return 0; +} + +void ibwtest_usage(struct ibwtest_ctx *tcx, char *name) +{ + printf("Usage:\n"); + printf("\t%s -i -o {name:value} -d {addr:port} -t nsec -s\n", name); + printf("\t-i is a free text, acting as a server id, max 23 chars [mandatory]\n"); + printf("\t-o name1:value1,name2:value2,... is a list of (name, value) pairs\n"); + printf("\t-d addr1:port1,addr2:port2,... is a list of destination ip addresses\n"); + printf("\t-t nsec delta time between sends in nanosec [default %d]\n", tcx->nsec); + printf("\t\t send message periodically and endless when nsec is non-zero\n"); + printf("\t-s server mode (you have to give exactly one -d address:port in this case)\n"); + printf("\t-n number of messages to send [default %d]\n", tcx->nmsg); + printf("\t-l usec time to sleep in the main loop [default %d]\n", tcx->sleep_usec); + printf("\t-v max variable msg size in bytes [default %d], 0=don't send var. size\n", tcx->maxsize); + printf("Press ctrl+C to stop the program.\n"); +} + +int main(int argc, char *argv[]) +{ + int rc, op; + int result = 1; + struct event_context *ev = NULL; + struct ibwtest_ctx *tcx = NULL; + float usec; + + tcx = talloc_zero(NULL, struct ibwtest_ctx); + memset(tcx, 0, sizeof(struct ibwtest_ctx)); + tcx->nsec = 0; + tcx->nmsg = 1000; + + /* here is the only case we can't avoid using global... */ + testctx = tcx; + signal(SIGINT, ibwtest_sigint_handler); + srand((unsigned)time(NULL)); + + while ((op=getopt(argc, argv, "i:o:d:m:st:n:l:v:")) != -1) { + switch (op) { + case 'i': + tcx->id = talloc_strdup(tcx, optarg); + break; + case 'o': + tcx->opts = talloc_strdup(tcx, optarg); + if (ibwtest_parse_attrs(tcx, tcx->opts, &tcx->attrs, + &tcx->nattrs, op)) + goto cleanup; + break; + case 'd': + if (ibwtest_getdests(tcx, op)) + goto cleanup; + break; + case 's': + tcx->is_server = 1; + break; + case 't': + tcx->nsec = (unsigned int)atoi(optarg); + break; + case 'n': + tcx->nmsg = atoi(optarg); + break; + case 'l': + tcx->sleep_usec = (unsigned int)atoi(optarg); + break; + case 'v': + tcx->maxsize = (unsigned int)atoi(optarg); + break; + default: + fprintf(stderr, "ERROR: unknown option -%c\n", (char)op); + ibwtest_usage(tcx, argv[0]); + goto cleanup; + } + } + if (tcx->id==NULL) { + ibwtest_usage(tcx, argv[0]); + goto cleanup; + } + + ev = event_context_init(NULL); + assert(ev); + + tcx->ibwctx = ibw_init(tcx->attrs, tcx->nattrs, + tcx, + ibwtest_connstate_handler, + ibwtest_receive_handler, + ev + ); + if (!tcx->ibwctx) + goto cleanup; + + if (tcx->is_server) + rc = ibwtest_init_server(tcx); + else + rc = ibwtest_connect_everybody(tcx); + if (rc) + goto cleanup; + + while(!tcx->kill_me && !tcx->error) { + if (tcx->nsec) { + event_add_timed(ev, tcx, timeval_current_ofs(0, tcx->nsec), + ibwtest_timeout_handler, tcx); + } + + event_loop_once(ev); + + if (tcx->sleep_usec) + usleep(tcx->sleep_usec); + } + + if (!tcx->is_server && tcx->nsent!=0 && !tcx->error) { + if (gettimeofday(&tcx->end_time, NULL)) { + DEBUG(0, ("gettimeofday error %d\n", errno)); + goto cleanup; + } + usec = (tcx->end_time.tv_sec - tcx->start_time.tv_sec) * 1000000 + + (tcx->end_time.tv_usec - tcx->start_time.tv_usec); + printf("usec: %f, nmsg: %d, usec/nmsg: %f\n", + usec, tcx->nsent, usec/(float)tcx->nsent); + } + + if (!tcx->error) + result = 0; /* everything OK */ + +cleanup: + if (tcx) + talloc_free(tcx); + if (ev) + talloc_free(ev); + DEBUG(0, ("exited with code %d\n", result)); + return result; +} diff --git a/source4/cluster/ctdb/include/ctdb.h b/source4/cluster/ctdb/include/ctdb.h index 9049314401..d5a1b581e5 100644 --- a/source4/cluster/ctdb/include/ctdb.h +++ b/source4/cluster/ctdb/include/ctdb.h @@ -50,6 +50,10 @@ struct ctdb_call_info { ctdb flags */ #define CTDB_FLAG_SELF_CONNECT (1<<0) +/* fork off a separate ctdb daemon */ +#define CTDB_FLAG_DAEMON_MODE (1<<1) +/* for test code only: make ctdb_start() block until all nodes are connected */ +#define CTDB_FLAG_CONNECT_WAIT (1<<2) struct event_context; @@ -69,6 +73,11 @@ int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport); */ void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags); +/* + clear some flags +*/ +void ctdb_clear_flags(struct ctdb_context *ctdb, unsigned flags); + /* set max acess count before a dmaster migration */ @@ -143,8 +152,14 @@ uint32_t ctdb_get_num_nodes(struct ctdb_context *ctdb); /* setup a handler for ctdb messages */ typedef void (*ctdb_message_fn_t)(struct ctdb_context *, uint32_t srvid, TDB_DATA data, void *); -int ctdb_set_message_handler(struct ctdb_context *ctdb, ctdb_message_fn_t handler, - void *private); +int ctdb_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, + ctdb_message_fn_t handler, + void *private_data); + + +int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call); +struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call); +int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call); /* send a ctdb message */ int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn, @@ -164,7 +179,14 @@ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALL change the data in a record held with a ctdb_record_handle if the new data is zero length, this implies a delete of the record */ -int ctdb_record_store(struct ctdb_record_handle *rec, TDB_DATA data); +int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data); + +int ctdb_register_message_handler(struct ctdb_context *ctdb, + TALLOC_CTX *mem_ctx, + uint32_t srvid, + ctdb_message_fn_t handler, + void *private_data); +struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id); #endif diff --git a/source4/cluster/ctdb/include/ctdb_private.h b/source4/cluster/ctdb/include/ctdb_private.h index 00b74b2177..c50b481cf3 100644 --- a/source4/cluster/ctdb/include/ctdb_private.h +++ b/source4/cluster/ctdb/include/ctdb_private.h @@ -23,6 +23,13 @@ #include "ctdb.h" +/* location of daemon socket */ +#define CTDB_PATH "/tmp/ctdb.socket" + +/* we must align packets to ensure ctdb works on all architectures (eg. sparc) */ +#define CTDB_DS_ALIGNMENT 8 + + #define CTDB_FETCH_FUNC 0xf0000001 /* @@ -43,6 +50,13 @@ struct ctdb_address { int port; }; + +/* called from the queue code when a packet comes in. Called with data==NULL + on error */ +typedef void (*ctdb_queue_cb_fn_t)(uint8_t *data, size_t length, + void *private_data); + + /* state associated with one node */ @@ -50,10 +64,16 @@ struct ctdb_node { struct ctdb_context *ctdb; struct ctdb_address address; const char *name; /* for debug messages */ - void *private; /* private to transport */ + void *private_data; /* private to transport */ uint32_t vnn; }; +struct ctdb_record_handle { + struct ctdb_db_context *ctdb_db; + TDB_DATA key; + TDB_DATA *data; +}; + /* transport specific methods */ @@ -78,6 +98,22 @@ struct ctdb_upcalls { void (*node_connected)(struct ctdb_node *); }; +/* list of message handlers - needs to be changed to a more efficient data + structure so we can find a message handler given a srvid quickly */ +struct ctdb_message_list { + struct ctdb_context *ctdb; + struct ctdb_message_list *next, *prev; + uint32_t srvid; + ctdb_message_fn_t message_handler; + void *message_private; +}; + +/* additional data required for the daemon mode */ +struct ctdb_daemon_data { + int sd; + char *name; + struct ctdb_queue *queue; +}; /* main state of the ctdb daemon */ struct ctdb_context { @@ -93,11 +129,11 @@ struct ctdb_context { char *err_msg; const struct ctdb_methods *methods; /* transport methods */ const struct ctdb_upcalls *upcalls; /* transport upcalls */ - void *private; /* private to transport */ + void *private_data; /* private to transport */ unsigned max_lacount; - ctdb_message_fn_t message_handler; - void *message_private; struct ctdb_db_context *db_list; + struct ctdb_message_list *message_list; + struct ctdb_daemon_data daemon; }; struct ctdb_db_context { @@ -109,6 +145,7 @@ struct ctdb_db_context { struct ctdb_registered_call *calls; /* list of registered calls */ }; + #define CTDB_NO_MEMORY(ctdb, p) do { if (!(p)) { \ ctdb_set_error(ctdb, "Out of memory at %s:%d", __FILE__, __LINE__); \ return -1; }} while (0) @@ -141,18 +178,48 @@ struct ctdb_ltdb_header { uint32_t lacount; }; +enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR}; + +/* + state of a in-progress ctdb call +*/ +struct ctdb_call_state { + enum call_state state; + struct ctdb_req_call *c; + struct ctdb_db_context *ctdb_db; + struct ctdb_node *node; + const char *errmsg; + struct ctdb_call call; + int redirect_count; + struct ctdb_ltdb_header header; + void *fetch_private; + struct { + void (*fn)(struct ctdb_call_state *); + void *private_data; + } async; +}; + /* operation IDs */ enum ctdb_operation { - CTDB_REQ_CALL = 0, - CTDB_REPLY_CALL = 1, - CTDB_REPLY_REDIRECT = 2, - CTDB_REQ_DMASTER = 3, - CTDB_REPLY_DMASTER = 4, - CTDB_REPLY_ERROR = 5, - CTDB_REQ_MESSAGE = 6 + CTDB_REQ_CALL = 0, + CTDB_REPLY_CALL = 1, + CTDB_REPLY_REDIRECT = 2, + CTDB_REQ_DMASTER = 3, + CTDB_REPLY_DMASTER = 4, + CTDB_REPLY_ERROR = 5, + CTDB_REQ_MESSAGE = 6, + + /* only used on the domain socket */ + CTDB_REQ_REGISTER = 1000, + CTDB_REQ_CONNECT_WAIT = 1001, + CTDB_REPLY_CONNECT_WAIT = 1002, + CTDB_REQ_FETCH_LOCK = 1003, + CTDB_REPLY_FETCH_LOCK = 1004, + CTDB_REQ_STORE_UNLOCK = 1005, + CTDB_REPLY_STORE_UNLOCK = 1006 }; #define CTDB_MAGIC 0x43544442 /* CTDB */ @@ -215,6 +282,11 @@ struct ctdb_reply_dmaster { uint8_t data[1]; }; +struct ctdb_req_register { + struct ctdb_req_header hdr; + uint32_t srvid; +}; + struct ctdb_req_message { struct ctdb_req_header hdr; uint32_t srvid; @@ -222,6 +294,42 @@ struct ctdb_req_message { uint8_t data[1]; }; +struct ctdb_req_connect_wait { + struct ctdb_req_header hdr; +}; + +struct ctdb_reply_connect_wait { + struct ctdb_req_header hdr; + uint32_t vnn; + uint32_t num_connected; +}; + +struct ctdb_req_fetch_lock { + struct ctdb_req_header hdr; + uint32_t db_id; + uint32_t keylen; + uint8_t key[1]; /* key[] */ +}; + +struct ctdb_reply_fetch_lock { + struct ctdb_req_header hdr; + uint32_t state; + uint32_t datalen; + uint8_t data[1]; /* data[] */ +}; +struct ctdb_req_store_unlock { + struct ctdb_req_header hdr; + uint32_t db_id; + uint32_t keylen; + uint32_t datalen; + uint8_t data[1]; /* key[] and data[] */ +}; + +struct ctdb_reply_store_unlock { + struct ctdb_req_header hdr; + uint32_t state; +}; + /* internal prototypes */ void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...) PRINTF_ATTRIBUTE(2,3); void ctdb_fatal(struct ctdb_context *ctdb, const char *msg); @@ -246,5 +354,99 @@ int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data); void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr); +struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call, + struct ctdb_ltdb_header *header, + TDB_DATA *data); + + +int ctdbd_start(struct ctdb_context *ctdb); +struct ctdb_call_state *ctdbd_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call); +int ctdbd_call_recv(struct ctdb_call_state *state, struct ctdb_call *call); + +/* + queue a packet for sending +*/ +int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length); + +/* + setup the fd used by the queue + */ +int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd); + +/* + setup a packet queue on a socket + */ +struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb, + TALLOC_CTX *mem_ctx, int fd, int alignment, + + ctdb_queue_cb_fn_t callback, + void *private_data); + +/* + allocate a packet for use in client<->daemon communication + */ +void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len); + + +/* + lock a record in the ltdb, given a key + */ +int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key); + +/* + unlock a record in the ltdb, given a key + */ +int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key); + + +/* + make a ctdb call to the local daemon - async send. Called from client context. + + This constructs a ctdb_call request and queues it for processing. + This call never blocks. +*/ +struct ctdb_call_state *ctdb_client_call_send(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call); + +/* + make a recv call to the local ctdb daemon - called from client context + + This is called when the program wants to wait for a ctdb_call to complete and get the + results. This call will block unless the call has already completed. +*/ +int ctdb_client_call_recv(struct ctdb_call_state *state, struct ctdb_call *call); + +int ctdb_daemon_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, + ctdb_message_fn_t handler, + void *private_data); + +int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn, + uint32_t srvid, TDB_DATA data); + +/* + send a ctdb message +*/ +int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn, + uint32_t srvid, TDB_DATA data); + + +/* + wait for all nodes to be connected +*/ +void ctdb_daemon_connect_wait(struct ctdb_context *ctdb); + + +/* + do a fetch lock from a client to the local daemon +*/ +struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db, + TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data); + +/* + do a store unlock from a client to the local daemon +*/ +int ctdb_client_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data); #endif diff --git a/source4/cluster/ctdb/include/idtree.h b/source4/cluster/ctdb/include/idtree.h new file mode 100644 index 0000000000..259af91005 --- /dev/null +++ b/source4/cluster/ctdb/include/idtree.h @@ -0,0 +1,7 @@ +struct idr_context *idr_init(TALLOC_CTX *mem_ctx); +int idr_get_new(struct idr_context *idp, void *ptr, int limit); +int idr_get_new_above(struct idr_context *idp, void *ptr, int starting_id, int limit); +int idr_get_new_random(struct idr_context *idp, void *ptr, int limit); +void *idr_find(struct idr_context *idp, int id); +int idr_remove(struct idr_context *idp, int id); + diff --git a/source4/cluster/ctdb/include/includes.h b/source4/cluster/ctdb/include/includes.h new file mode 100644 index 0000000000..994c25452c --- /dev/null +++ b/source4/cluster/ctdb/include/includes.h @@ -0,0 +1,36 @@ +#define HAVE_UNIXSOCKET 1 + +#include "replace.h" +#include "talloc.h" +#include "tdb.h" +#include "idtree.h" +#include "ctdb.h" +#include "lib/util/dlinklist.h" + +typedef bool BOOL; + +#define True 1 +#define False 0 + +#define LogLevel 0 + +#define DEBUG(lvl, x) if ((lvl) <= LogLevel) (printf x) + +#define _PUBLIC_ + +#define ZERO_STRUCT(x) memset((char *)&(x), 0, sizeof(x)) + +#ifndef discard_const +#define discard_const(ptr) ((void *)((intptr_t)(ptr))) +#endif + +struct timeval timeval_zero(void); +bool timeval_is_zero(const struct timeval *tv); +struct timeval timeval_current(void); +struct timeval timeval_set(uint32_t secs, uint32_t usecs); +int timeval_compare(const struct timeval *tv1, const struct timeval *tv2); +struct timeval timeval_until(const struct timeval *tv1, + const struct timeval *tv2); +_PUBLIC_ struct timeval timeval_current_ofs(uint32_t secs, uint32_t usecs); +char **file_lines_load(const char *fname, int *numlines, TALLOC_CTX *mem_ctx); + diff --git a/source4/cluster/ctdb/install-sh b/source4/cluster/ctdb/install-sh new file mode 100755 index 0000000000..58719246f0 --- /dev/null +++ b/source4/cluster/ctdb/install-sh @@ -0,0 +1,238 @@ +#! /bin/sh +# +# install - install a program, script, or datafile +# This comes from X11R5. +# +# Calling this script install-sh is preferred over install.sh, to prevent +# `make' implicit rules from creating a file called install from it +# when there is no Makefile. +# +# This script is compatible with the BSD install script, but was written +# from scratch. +# + + +# set DOITPROG to echo to test this script + +# Don't use :- since 4.3BSD and earlier shells don't like it. +doit="${DOITPROG-}" + + +# put in absolute paths if you don't have them in your path; or use env. vars. + +mvprog="${MVPROG-mv}" +cpprog="${CPPROG-cp}" +chmodprog="${CHMODPROG-chmod}" +chownprog="${CHOWNPROG-chown}" +chgrpprog="${CHGRPPROG-chgrp}" +stripprog="${STRIPPROG-strip}" +rmprog="${RMPROG-rm}" +mkdirprog="${MKDIRPROG-mkdir}" + +transformbasename="" +transform_arg="" +instcmd="$mvprog" +chmodcmd="$chmodprog 0755" +chowncmd="" +chgrpcmd="" +stripcmd="" +rmcmd="$rmprog -f" +mvcmd="$mvprog" +src="" +dst="" +dir_arg="" + +while [ x"$1" != x ]; do + case $1 in + -c) instcmd="$cpprog" + shift + continue;; + + -d) dir_arg=true + shift + continue;; + + -m) chmodcmd="$chmodprog $2" + shift + shift + continue;; + + -o) chowncmd="$chownprog $2" + shift + shift + continue;; + + -g) chgrpcmd="$chgrpprog $2" + shift + shift + continue;; + + -s) stripcmd="$stripprog" + shift + continue;; + + -t=*) transformarg=`echo $1 | sed 's/-t=//'` + shift + continue;; + + -b=*) transformbasename=`echo $1 | sed 's/-b=//'` + shift + continue;; + + *) if [ x"$src" = x ] + then + src=$1 + else + # this colon is to work around a 386BSD /bin/sh bug + : + dst=$1 + fi + shift + continue;; + esac +done + +if [ x"$src" = x ] +then + echo "install: no input file specified" + exit 1 +else + true +fi + +if [ x"$dir_arg" != x ]; then + dst=$src + src="" + + if [ -d $dst ]; then + instcmd=: + else + instcmd=mkdir + fi +else + +# Waiting for this to be detected by the "$instcmd $src $dsttmp" command +# might cause directories to be created, which would be especially bad +# if $src (and thus $dsttmp) contains '*'. + + if [ -f $src -o -d $src ] + then + true + else + echo "install: $src does not exist" + exit 1 + fi + + if [ x"$dst" = x ] + then + echo "install: no destination specified" + exit 1 + else + true + fi + +# If destination is a directory, append the input filename; if your system +# does not like double slashes in filenames, you may need to add some logic + + if [ -d $dst ] + then + dst="$dst"/`basename $src` + else + true + fi +fi + +## this sed command emulates the dirname command +dstdir=`echo $dst | sed -e 's,[^/]*$,,;s,/$,,;s,^$,.,'` + +# Make sure that the destination directory exists. +# this part is taken from Noah Friedman's mkinstalldirs script + +# Skip lots of stat calls in the usual case. +if [ ! -d "$dstdir" ]; then +defaultIFS=' +' +IFS="${IFS-${defaultIFS}}" + +oIFS="${IFS}" +# Some sh's can't handle IFS=/ for some reason. +IFS='%' +set - `echo ${dstdir} | sed -e 's@/@%@g' -e 's@^%@/@'` +IFS="${oIFS}" + +pathcomp='' + +while [ $# -ne 0 ] ; do + pathcomp="${pathcomp}${1}" + shift + + if [ ! -d "${pathcomp}" ] ; + then + $mkdirprog "${pathcomp}" + else + true + fi + + pathcomp="${pathcomp}/" +done +fi + +if [ x"$dir_arg" != x ] +then + $doit $instcmd $dst && + + if [ x"$chowncmd" != x ]; then $doit $chowncmd $dst; else true ; fi && + if [ x"$chgrpcmd" != x ]; then $doit $chgrpcmd $dst; else true ; fi && + if [ x"$stripcmd" != x ]; then $doit $stripcmd $dst; else true ; fi && + if [ x"$chmodcmd" != x ]; then $doit $chmodcmd $dst; else true ; fi +else + +# If we're going to rename the final executable, determine the name now. + + if [ x"$transformarg" = x ] + then + dstfile=`basename $dst` + else + dstfile=`basename $dst $transformbasename | + sed $transformarg`$transformbasename + fi + +# don't allow the sed command to completely eliminate the filename + + if [ x"$dstfile" = x ] + then + dstfile=`basename $dst` + else + true + fi + +# Make a temp file name in the proper directory. + + dsttmp=$dstdir/#inst.$$# + +# Move or copy the file name to the temp name + + $doit $instcmd $src $dsttmp && + + trap "rm -f ${dsttmp}" 0 && + +# and set any options; do chmod last to preserve setuid bits + +# If any of these fail, we abort the whole thing. If we want to +# ignore errors from any of these, just make sure not to ignore +# errors from the above "$doit $instcmd $src $dsttmp" command. + + if [ x"$chowncmd" != x ]; then $doit $chowncmd $dsttmp; else true;fi && + if [ x"$chgrpcmd" != x ]; then $doit $chgrpcmd $dsttmp; else true;fi && + if [ x"$stripcmd" != x ]; then $doit $stripcmd $dsttmp; else true;fi && + if [ x"$chmodcmd" != x ]; then $doit $chmodcmd $dsttmp; else true;fi && + +# Now rename the file to the real destination. + + $doit $rmcmd -f $dstdir/$dstfile && + $doit $mvcmd $dsttmp $dstdir/$dstfile + +fi && + + +exit 0 diff --git a/source4/cluster/ctdb/opendb_ctdb.c b/source4/cluster/ctdb/opendb_ctdb.c index 55f051f10f..6e2748291e 100644 --- a/source4/cluster/ctdb/opendb_ctdb.c +++ b/source4/cluster/ctdb/opendb_ctdb.c @@ -223,7 +223,8 @@ static NTSTATUS odb_push_record(struct odb_lock *lck, struct opendb_file *file) if (!file->num_entries) { dbuf.dptr = NULL; dbuf.dsize = 0; - ctdb_record_store(lck->rec, dbuf); + ctdb_store_unlock(lck->rec, dbuf); + return NT_STATUS_OK; } status = ndr_push_struct_blob(&blob, lck, file, (ndr_push_flags_fn_t)ndr_push_opendb_file); @@ -232,7 +233,7 @@ static NTSTATUS odb_push_record(struct odb_lock *lck, struct opendb_file *file) dbuf.dptr = blob.data; dbuf.dsize = blob.length; - ret = ctdb_record_store(lck->rec, dbuf); + ret = ctdb_store_unlock(lck->rec, dbuf); data_blob_free(&blob); if (ret != 0) { return NT_STATUS_INTERNAL_DB_CORRUPTION; diff --git a/source4/cluster/ctdb/tcp/ctdb_tcp.h b/source4/cluster/ctdb/tcp/ctdb_tcp.h index 0f8ce300b4..a34cd9736d 100644 --- a/source4/cluster/ctdb/tcp/ctdb_tcp.h +++ b/source4/cluster/ctdb/tcp/ctdb_tcp.h @@ -24,33 +24,13 @@ struct ctdb_tcp { int listen_fd; }; -/* - incoming packet structure - only used when we get a partial packet - on read -*/ -struct ctdb_tcp_partial { - uint8_t *data; - uint32_t length; -}; - - /* state associated with an incoming connection */ struct ctdb_incoming { struct ctdb_context *ctdb; int fd; - struct ctdb_tcp_partial partial; -}; - -/* - outgoing packet structure - only allocated when we can't write immediately - to the socket -*/ -struct ctdb_tcp_packet { - struct ctdb_tcp_packet *next, *prev; - uint8_t *data; - uint32_t length; + struct ctdb_queue *queue; }; /* @@ -58,19 +38,16 @@ struct ctdb_tcp_packet { */ struct ctdb_tcp_node { int fd; - struct fd_event *fde; - struct ctdb_tcp_packet *queue; + struct ctdb_queue *queue; }; /* prototypes internal to tcp transport */ -void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private); -void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private); int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length); int ctdb_tcp_listen(struct ctdb_context *ctdb); void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private); + struct timeval t, void *private_data); +void ctdb_tcp_read_cb(uint8_t *data, size_t cnt, void *args); +void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private_data); #define CTDB_TCP_ALIGNMENT 8 diff --git a/source4/cluster/ctdb/tcp/tcp_connect.c b/source4/cluster/ctdb/tcp/tcp_connect.c index 85fffc2f70..a1f2d331cf 100644 --- a/source4/cluster/ctdb/tcp/tcp_connect.c +++ b/source4/cluster/ctdb/tcp/tcp_connect.c @@ -34,14 +34,33 @@ static void set_nonblocking(int fd) } +/* + called when a complete packet has come in - should not happen on this socket + */ +void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private_data) +{ + struct ctdb_node *node = talloc_get_type(private_data, struct ctdb_node); + struct ctdb_tcp_node *tnode = talloc_get_type( + node->private_data, struct ctdb_tcp_node); + + /* start a new connect cycle to try to re-establish the + link */ + close(tnode->fd); + ctdb_queue_set_fd(tnode->queue, -1); + tnode->fd = -1; + event_add_timed(node->ctdb->ev, node, timeval_zero(), + ctdb_tcp_node_connect, node); +} + /* called when socket becomes writeable on connect */ static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private) + uint16_t flags, void *private_data) { - struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); - struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_node *node = talloc_get_type(private_data, + struct ctdb_node); + struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data, struct ctdb_tcp_node); struct ctdb_context *ctdb = node->ctdb; int error = 0; @@ -59,17 +78,13 @@ static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *f } talloc_free(fde); - tnode->fde = event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_READ, - ctdb_tcp_node_write, node); + + setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one)); + + ctdb_queue_set_fd(tnode->queue, tnode->fd); /* tell the ctdb layer we are connected */ node->ctdb->upcalls->node_connected(node); - - setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one)); - - if (tnode->queue) { - EVENT_FD_WRITEABLE(tnode->fde); - } } @@ -92,10 +107,11 @@ static int ctdb_tcp_get_address(struct ctdb_context *ctdb, called when we should try and establish a tcp connection to a node */ void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private) + struct timeval t, void *private_data) { - struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); - struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_node *node = talloc_get_type(private_data, + struct ctdb_node); + struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data, struct ctdb_tcp_node); struct ctdb_context *ctdb = node->ctdb; struct sockaddr_in sock_in; @@ -155,7 +171,7 @@ static int ctdb_incoming_destructor(struct ctdb_incoming *in) node in our cluster */ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private) + uint16_t flags, void *private_data) { struct ctdb_context *ctdb; struct ctdb_tcp *ctcp; @@ -164,8 +180,8 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, int fd; struct ctdb_incoming *in; - ctdb = talloc_get_type(private, struct ctdb_context); - ctcp = talloc_get_type(ctdb->private, struct ctdb_tcp); + ctdb = talloc_get_type(private_data, struct ctdb_context); + ctcp = talloc_get_type(ctdb->private_data, struct ctdb_tcp); memset(&addr, 0, sizeof(addr)); len = sizeof(addr); fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len); @@ -177,8 +193,8 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, set_nonblocking(in->fd); - event_add_fd(ctdb->ev, in, in->fd, EVENT_FD_READ, - ctdb_tcp_incoming_read, in); + in->queue = ctdb_queue_setup(ctdb, in, in->fd, CTDB_TCP_ALIGNMENT, + ctdb_tcp_read_cb, in); talloc_set_destructor(in, ctdb_incoming_destructor); } @@ -189,7 +205,8 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, */ int ctdb_tcp_listen(struct ctdb_context *ctdb) { - struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private, struct ctdb_tcp); + struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data, + struct ctdb_tcp); struct sockaddr_in sock; int one = 1; diff --git a/source4/cluster/ctdb/tcp/tcp_init.c b/source4/cluster/ctdb/tcp/tcp_init.c index 0058e7ad85..20b9bc9e33 100644 --- a/source4/cluster/ctdb/tcp/tcp_init.c +++ b/source4/cluster/ctdb/tcp/tcp_init.c @@ -29,7 +29,7 @@ /* start the protocol going */ -int ctdb_tcp_start(struct ctdb_context *ctdb) +static int ctdb_tcp_start(struct ctdb_context *ctdb) { int i; @@ -46,6 +46,12 @@ int ctdb_tcp_start(struct ctdb_context *ctdb) ctdb_tcp_node_connect, node); } + if (ctdb->flags&CTDB_FLAG_CONNECT_WAIT) { + /* wait until all nodes are connected (should not be needed + outide of test code) */ + ctdb_connect_wait(ctdb); + } + return 0; } @@ -53,14 +59,18 @@ int ctdb_tcp_start(struct ctdb_context *ctdb) /* initialise tcp portion of a ctdb node */ -int ctdb_tcp_add_node(struct ctdb_node *node) +static int ctdb_tcp_add_node(struct ctdb_node *node) { struct ctdb_tcp_node *tnode; tnode = talloc_zero(node, struct ctdb_tcp_node); CTDB_NO_MEMORY(node->ctdb, tnode); tnode->fd = -1; - node->private = tnode; + node->private_data = tnode; + + tnode->queue = ctdb_queue_setup(node->ctdb, node, tnode->fd, CTDB_TCP_ALIGNMENT, + ctdb_tcp_tnode_cb, node); + return 0; } @@ -68,7 +78,7 @@ int ctdb_tcp_add_node(struct ctdb_node *node) /* transport packet allocator - allows transport to control memory for packets */ -void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size) +static void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size) { /* tcp transport needs to round to 8 byte alignment to ensure that we can use a length header and 64 bit elements in @@ -95,7 +105,7 @@ int ctdb_tcp_init(struct ctdb_context *ctdb) CTDB_NO_MEMORY(ctdb, ctcp); ctcp->listen_fd = -1; - ctdb->private = ctcp; + ctdb->private_data = ctcp; ctdb->methods = &ctdb_tcp_methods; return 0; } diff --git a/source4/cluster/ctdb/tcp/tcp_io.c b/source4/cluster/ctdb/tcp/tcp_io.c index e59f6167ff..150d726afb 100644 --- a/source4/cluster/ctdb/tcp/tcp_io.c +++ b/source4/cluster/ctdb/tcp/tcp_io.c @@ -29,161 +29,43 @@ /* - called when we fail to send a message to a node -*/ -static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private) -{ - struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); - struct ctdb_tcp_node *tnode = talloc_get_type(node->private, - struct ctdb_tcp_node); - - /* start a new connect cycle to try to re-establish the - link */ - talloc_free(tnode->fde); - close(tnode->fd); - tnode->fd = -1; - event_add_timed(node->ctdb->ev, node, timeval_zero(), - ctdb_tcp_node_connect, node); -} - -/* - called when socket becomes readable -*/ -void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private) -{ - struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); - struct ctdb_tcp_node *tnode = talloc_get_type(node->private, - struct ctdb_tcp_node); - if (flags & EVENT_FD_READ) { - /* getting a read event on this fd in the current tcp model is - always an error, as we have separate read and write - sockets. In future we may combine them, but for now it must - mean that the socket is dead, so we try to reconnect */ - node->ctdb->upcalls->node_dead(node); - talloc_free(tnode->fde); - close(tnode->fd); - tnode->fd = -1; - event_add_timed(node->ctdb->ev, node, timeval_zero(), - ctdb_tcp_node_connect, node); - return; - } - - while (tnode->queue) { - struct ctdb_tcp_packet *pkt = tnode->queue; - ssize_t n; - - n = write(tnode->fd, pkt->data, pkt->length); - - if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { - event_add_timed(node->ctdb->ev, node, timeval_zero(), - ctdb_tcp_node_dead, node); - EVENT_FD_NOT_WRITEABLE(tnode->fde); - return; - } - if (n <= 0) return; - - if (n != pkt->length) { - pkt->length -= n; - pkt->data += n; - return; - } - - DLIST_REMOVE(tnode->queue, pkt); - talloc_free(pkt); - } - - EVENT_FD_NOT_WRITEABLE(tnode->fde); -} - - -/* - called when an incoming connection is readable -*/ -void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private) + called when a complete packet has come in + */ +void ctdb_tcp_read_cb(uint8_t *data, size_t cnt, void *args) { - struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming); - int num_ready = 0; - ssize_t nread; - uint8_t *data, *data_base; + struct ctdb_incoming *in = talloc_get_type(args, struct ctdb_incoming); + struct ctdb_req_header *hdr; - if (ioctl(in->fd, FIONREAD, &num_ready) != 0 || - num_ready == 0) { - /* we've lost the link from another node. We don't - notify the upper layers, as we only want to trigger - a full node reorganisation when a send fails - that - allows nodes to restart without penalty as long as - the network is idle */ + if (data == NULL) { + /* incoming socket has died */ talloc_free(in); return; } - in->partial.data = talloc_realloc_size(in, in->partial.data, - num_ready + in->partial.length); - if (in->partial.data == NULL) { - /* not much we can do except drop the socket */ - talloc_free(in); + if (cnt < sizeof(*hdr)) { + ctdb_set_error(in->ctdb, "Bad packet length %d\n", cnt); return; } - - nread = read(in->fd, in->partial.data+in->partial.length, num_ready); - if (nread <= 0) { - /* the connection must be dead */ - talloc_free(in); + hdr = (struct ctdb_req_header *)data; + if (cnt != hdr->length) { + ctdb_set_error(in->ctdb, "Bad header length %d expected %d\n", + hdr->length, cnt); return; } - data = in->partial.data; - nread += in->partial.length; - - in->partial.data = NULL; - in->partial.length = 0; - - if (nread >= 4 && *(uint32_t *)data == nread) { - /* most common case - we got a whole packet in one go - tell the ctdb layer above that we have a packet */ - in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread); + if (hdr->ctdb_magic != CTDB_MAGIC) { + ctdb_set_error(in->ctdb, "Non CTDB packet rejected\n"); return; } - data_base = data; - - while (nread >= 4 && *(uint32_t *)data <= nread) { - /* we have at least one packet */ - uint8_t *d2; - uint32_t len; - len = *(uint32_t *)data; - d2 = talloc_memdup(in, data, len); - if (d2 == NULL) { - /* sigh */ - talloc_free(in); - return; - } - in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len); - data += len; - nread -= len; - } - - if (nread > 0) { - /* we have only part of a packet */ - if (data_base == data) { - in->partial.data = data; - in->partial.length = nread; - } else { - in->partial.data = talloc_memdup(in, data, nread); - if (in->partial.data == NULL) { - talloc_free(in); - return; - } - in->partial.length = nread; - talloc_free(data_base); - } + if (hdr->ctdb_version != CTDB_VERSION) { + ctdb_set_error(in->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); return; } - talloc_free(data_base); + /* most common case - we got a whole packet in one go + tell the ctdb layer above that we have a packet */ + in->ctdb->upcalls->recv_pkt(in->ctdb, data, cnt); } /* @@ -191,50 +73,7 @@ void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, */ int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length) { - struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data, struct ctdb_tcp_node); - struct ctdb_tcp_packet *pkt; - uint32_t length2; - - /* enforce the length and alignment rules from the tcp packet allocator */ - length2 = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1); - *(uint32_t *)data = length2; - - if (length2 != length) { - memset(data+length, 0, length2-length); - } - - /* if the queue is empty then try an immediate write, avoiding - queue overhead. This relies on non-blocking sockets */ - if (tnode->queue == NULL && tnode->fd != -1) { - ssize_t n = write(tnode->fd, data, length2); - if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { - event_add_timed(node->ctdb->ev, node, timeval_zero(), - ctdb_tcp_node_dead, node); - /* yes, we report success, as the dead node is - handled via a separate event */ - return 0; - } - if (n > 0) { - data += n; - length2 -= n; - } - if (length2 == 0) return 0; - } - - pkt = talloc(tnode, struct ctdb_tcp_packet); - CTDB_NO_MEMORY(node->ctdb, pkt); - - pkt->data = talloc_memdup(pkt, data, length2); - CTDB_NO_MEMORY(node->ctdb, pkt->data); - - pkt->length = length2; - - if (tnode->queue == NULL && tnode->fd != -1) { - EVENT_FD_WRITEABLE(tnode->fde); - } - - DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *); - - return 0; + return ctdb_queue_send(tnode->queue, data, length); } diff --git a/source4/cluster/ctdb/tests/1node.txt b/source4/cluster/ctdb/tests/1node.txt new file mode 100644 index 0000000000..db4350c0c0 --- /dev/null +++ b/source4/cluster/ctdb/tests/1node.txt @@ -0,0 +1 @@ +127.0.0.1:9001 diff --git a/source4/cluster/ctdb/tests/4nodes.txt b/source4/cluster/ctdb/tests/4nodes.txt new file mode 100644 index 0000000000..880fe914ff --- /dev/null +++ b/source4/cluster/ctdb/tests/4nodes.txt @@ -0,0 +1,4 @@ +127.0.0.1:9001 +127.0.0.2:9001 +127.0.0.3:9001 +127.0.0.4:9001 diff --git a/source4/cluster/ctdb/tests/bench-ssh.sh b/source4/cluster/ctdb/tests/bench-ssh.sh new file mode 100755 index 0000000000..0d11ee9cdd --- /dev/null +++ b/source4/cluster/ctdb/tests/bench-ssh.sh @@ -0,0 +1,43 @@ +#!/bin/sh + +if [ $# -lt 1 ]; then + echo "Usage: bench-ssh.sh " + exit 1 +fi + +while :; do + if [ "`echo $1 | cut -c1`" = "-" -o $# -eq 0 ]; then break; fi + nodes="$nodes $1"; + shift; +done + +options=$* +dir=`pwd` + +echo "Creating nodes-ssh.txt" +rm -f nodes-ssh.txt +count=0 +for h in $nodes; do + echo "$h:9001" >> nodes-ssh.txt + count=`expr $count + 1` +done + + +echo "Killing old processes" +for h in $nodes; do + scp -q nodes-ssh.txt $h:$dir + ssh $h killall -q ctdb_bench +done + +echo "Starting nodes" +i=0 +for h in $nodes; do + if [ $i -eq `expr $count - 1` ]; then + ssh $h $dir/bin/ctdb_bench --nlist $dir/nodes-ssh.txt --listen $h:9001 $options + else + ssh -f $h $dir/bin/ctdb_bench --nlist $dir/nodes-ssh.txt --listen $h:9001 $options + fi + i=`expr $i + 1` +done + +wait diff --git a/source4/cluster/ctdb/tests/bench.sh b/source4/cluster/ctdb/tests/bench.sh new file mode 100755 index 0000000000..50e9e08f99 --- /dev/null +++ b/source4/cluster/ctdb/tests/bench.sh @@ -0,0 +1,9 @@ +#!/bin/sh + +killall -q ctdb_bench + +echo "Trying 2 nodes" +bin/ctdb_bench --nlist tests/nodes.txt --listen 127.0.0.2:9001 $* & +bin/ctdb_bench --nlist tests/nodes.txt --listen 127.0.0.1:9001 $* + +killall -q ctdb_bench diff --git a/source4/cluster/ctdb/tests/bench1.sh b/source4/cluster/ctdb/tests/bench1.sh new file mode 100755 index 0000000000..3481d82be2 --- /dev/null +++ b/source4/cluster/ctdb/tests/bench1.sh @@ -0,0 +1,8 @@ +#!/bin/sh + +killall -q ctdb_bench + +echo "Trying 1 nodes" +bin/ctdb_bench --nlist tests/1node.txt --listen 127.0.0.2:9001 $* + +killall -q ctdb_bench diff --git a/source4/cluster/ctdb/tests/ctdb_bench.c b/source4/cluster/ctdb/tests/ctdb_bench.c index 023c76e7f9..78d66f6f2c 100644 --- a/source4/cluster/ctdb/tests/ctdb_bench.c +++ b/source4/cluster/ctdb/tests/ctdb_bench.c @@ -130,10 +130,10 @@ static int msg_plus, msg_minus; handler for messages in bench_ring() */ static void ring_message_handler(struct ctdb_context *ctdb, uint32_t srvid, - TDB_DATA data, void *private) + TDB_DATA data, void *private_data) { int incr = *(int *)data.dptr; - int *count = (int *)private; + int *count = (int *)private_data; int dest; (*count)++; dest = (ctdb_get_vnn(ctdb) + incr) % ctdb_get_num_nodes(ctdb); @@ -200,6 +200,7 @@ int main(int argc, const char *argv[]) const char *transport = "tcp"; const char *myaddress = NULL; int self_connect=0; + int daemon_mode=0; struct poptOption popt_options[] = { POPT_AUTOHELP @@ -207,6 +208,7 @@ int main(int argc, const char *argv[]) { "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" }, { "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL }, { "self-connect", 0, POPT_ARG_NONE, &self_connect, 0, "enable self connect", "boolean" }, + { "daemon", 0, POPT_ARG_NONE, &daemon_mode, 0, "spawn a ctdb daemon", "boolean" }, { "timelimit", 't', POPT_ARG_INT, &timelimit, 0, "timelimit", "integer" }, { "num-records", 'r', POPT_ARG_INT, &num_records, 0, "num_records", "integer" }, { "num-msgs", 'n', POPT_ARG_INT, &num_msgs, 0, "num_msgs", "integer" }, @@ -254,6 +256,9 @@ int main(int argc, const char *argv[]) if (self_connect) { ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT); } + if (daemon_mode) { + ctdb_set_flags(ctdb, CTDB_FLAG_DAEMON_MODE); + } ret = ctdb_set_transport(ctdb, transport); if (ret == -1) { @@ -286,11 +291,11 @@ int main(int argc, const char *argv[]) ret = ctdb_set_call(ctdb_db, incr_func, FUNC_INCR); ret = ctdb_set_call(ctdb_db, fetch_func, FUNC_FETCH); - ctdb_set_message_handler(ctdb, ring_message_handler, &msg_count); - /* start the protocol running */ ret = ctdb_start(ctdb); + ctdb_set_message_handler(ctdb, 0, ring_message_handler,&msg_count); + /* wait until all nodes are connected (should not be needed outside of test code) */ ctdb_connect_wait(ctdb); diff --git a/source4/cluster/ctdb/tests/ctdb_fetch.c b/source4/cluster/ctdb/tests/ctdb_fetch.c index c0491e9bb5..45e5248980 100644 --- a/source4/cluster/ctdb/tests/ctdb_fetch.c +++ b/source4/cluster/ctdb/tests/ctdb_fetch.c @@ -87,7 +87,7 @@ static void bench_fetch_1node(struct ctdb_context *ctdb) msg_count, ctdb_get_vnn(ctdb)); data.dsize = strlen((const char *)data.dptr)+1; - ret = ctdb_record_store(rec, data); + ret = ctdb_store_unlock(rec, data); if (ret != 0) { printf("Failed to store record\n"); } @@ -106,7 +106,7 @@ static void bench_fetch_1node(struct ctdb_context *ctdb) handler for messages in bench_ring() */ static void message_handler(struct ctdb_context *ctdb, uint32_t srvid, - TDB_DATA data, void *private) + TDB_DATA data, void *private_data) { msg_count++; bench_fetch_1node(ctdb); @@ -167,6 +167,7 @@ int main(int argc, const char *argv[]) const char *transport = "tcp"; const char *myaddress = NULL; int self_connect=0; + int daemon_mode=0; struct poptOption popt_options[] = { POPT_AUTOHELP @@ -174,6 +175,7 @@ int main(int argc, const char *argv[]) { "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" }, { "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL }, { "self-connect", 0, POPT_ARG_NONE, &self_connect, 0, "enable self connect", "boolean" }, + { "daemon", 0, POPT_ARG_NONE, &daemon_mode, 0, "spawn a ctdb daemon", "boolean" }, { "timelimit", 't', POPT_ARG_INT, &timelimit, 0, "timelimit", "integer" }, { "num-records", 'r', POPT_ARG_INT, &num_records, 0, "num_records", "integer" }, { "num-msgs", 'n', POPT_ARG_INT, &num_msgs, 0, "num_msgs", "integer" }, @@ -222,6 +224,9 @@ int main(int argc, const char *argv[]) if (self_connect) { ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT); } + if (daemon_mode) { + ctdb_set_flags(ctdb, CTDB_FLAG_DAEMON_MODE); + } ret = ctdb_set_transport(ctdb, transport); if (ret == -1) { @@ -252,11 +257,11 @@ int main(int argc, const char *argv[]) ret = ctdb_set_call(ctdb_db, fetch_func, FUNC_FETCH); - ctdb_set_message_handler(ctdb, message_handler, &msg_count); - /* start the protocol running */ ret = ctdb_start(ctdb); + ctdb_set_message_handler(ctdb, 0, message_handler, &msg_count); + /* wait until all nodes are connected (should not be needed outside of test code) */ ctdb_connect_wait(ctdb); diff --git a/source4/cluster/ctdb/tests/ctdb_fetch1.c b/source4/cluster/ctdb/tests/ctdb_fetch1.c new file mode 100644 index 0000000000..8071256a43 --- /dev/null +++ b/source4/cluster/ctdb/tests/ctdb_fetch1.c @@ -0,0 +1,274 @@ +/* + simple ctdb fetch test + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "system/filesys.h" +#include "popt.h" +#include "ctdb.h" +#include "ctdb_private.h" + +#define PARENT_SRVID 0 +#define CHILD1_SRVID 1 +#define CHILD2_SRVID 2 + +int num_msg=0; + +static void message_handler(struct ctdb_context *ctdb, uint32_t srvid, + TDB_DATA data, void *private_data) +{ + num_msg++; +} +static void child_handler(struct ctdb_context *ctdb, uint32_t srvid, + TDB_DATA data, void *private_data) +{ + num_msg++; +} + +void test1(struct ctdb_db_context *ctdb_db) +{ + struct ctdb_record_handle *rh; + TDB_DATA key, data, data2, store_data; + int ret; + + /* + test 1 : write data and read it back. should all be the same + */ + printf("Test1: write and verify we can read it back: "); + key.dptr = discard_const("Record"); + key.dsize = strlen((const char *)key.dptr)+1; + rh = ctdb_fetch_lock(ctdb_db, ctdb_db, key, &data); + + store_data.dptr = discard_const("data to store"); + store_data.dsize = strlen((const char *)store_data.dptr)+1; + ret = ctdb_store_unlock(rh, store_data); + + rh = ctdb_fetch_lock(ctdb_db, ctdb_db, key, &data2); + /* hopefully data2 will now contain the record written above */ + if (!strcmp("data to store", (const char *)data2.dptr)) { + printf("SUCCESS\n"); + } else { + printf("FAILURE\n"); + exit(10); + } + + /* just write it back to unlock it */ + ret = ctdb_store_unlock(rh, store_data); +} + +void child(int srvid, struct event_context *ev, struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db) +{ + TDB_DATA data; + struct ctdb_record_handle *rh; + TDB_DATA key, data2; + + data.dptr=discard_const("dummy message"); + data.dsize=strlen((const char *)data.dptr)+1; + + ctdb_set_message_handler(ctdb, srvid, child_handler, NULL); + + ctdb_send_message(ctdb, ctdb_get_vnn(ctdb), PARENT_SRVID, data); + while (num_msg==0) { + event_loop_once(ev); + } + + + /* fetch and lock the record */ + key.dptr = discard_const("Record"); + key.dsize = strlen((const char *)key.dptr)+1; + rh = ctdb_fetch_lock(ctdb_db, ctdb_db, key, &data2); + ctdb_send_message(ctdb, ctdb_get_vnn(ctdb), PARENT_SRVID, data); + + + while (1) { + event_loop_once(ev); + } +} + +/* + main program +*/ +int main(int argc, const char *argv[]) +{ + struct ctdb_context *ctdb; + struct ctdb_db_context *ctdb_db; + const char *nlist = NULL; + const char *transport = "tcp"; + const char *myaddress = NULL; + int self_connect=0; + int daemon_mode=0; + TDB_DATA data; + + struct poptOption popt_options[] = { + POPT_AUTOHELP + { "nlist", 0, POPT_ARG_STRING, &nlist, 0, "node list file", "filename" }, + { "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" }, + { "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL }, + { "self-connect", 0, POPT_ARG_NONE, &self_connect, 0, "enable self connect", "boolean" }, + { "daemon", 0, POPT_ARG_NONE, &daemon_mode, 0, "spawn a ctdb daemon", "boolean" }, + POPT_TABLEEND + }; + int opt; + const char **extra_argv; + int extra_argc = 0; + int ret; + poptContext pc; + struct event_context *ev; + + pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST); + + while ((opt = poptGetNextOpt(pc)) != -1) { + switch (opt) { + default: + fprintf(stderr, "Invalid option %s: %s\n", + poptBadOption(pc, 0), poptStrerror(opt)); + exit(1); + } + } + + /* setup the remaining options for the main program to use */ + extra_argv = poptGetArgs(pc); + if (extra_argv) { + extra_argv++; + while (extra_argv[extra_argc]) extra_argc++; + } + + if (nlist == NULL || myaddress == NULL) { + printf("You must provide a node list with --nlist and an address with --listen\n"); + exit(1); + } + + ev = event_context_init(NULL); + + /* initialise ctdb */ + ctdb = ctdb_init(ev); + if (ctdb == NULL) { + printf("Failed to init ctdb\n"); + exit(1); + } + + if (self_connect) { + ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT); + } + if (daemon_mode) { + ctdb_set_flags(ctdb, CTDB_FLAG_DAEMON_MODE); + } + + ret = ctdb_set_transport(ctdb, transport); + if (ret == -1) { + printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* tell ctdb what address to listen on */ + ret = ctdb_set_address(ctdb, myaddress); + if (ret == -1) { + printf("ctdb_set_address failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* tell ctdb what nodes are available */ + ret = ctdb_set_nlist(ctdb, nlist); + if (ret == -1) { + printf("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* attach to a specific database */ + ctdb_db = ctdb_attach(ctdb, "test.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666); + if (!ctdb_db) { + printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* start the protocol running */ + ret = ctdb_start(ctdb); + +#if 0 + /* wait until all nodes are connected (should not be needed + outside of test code) */ + ctdb_connect_wait(ctdb); +#endif + + /* + start two child processes + */ + if(fork()){ + /* + set up a message handler so our child processes can talk to us + */ + ctdb_set_message_handler(ctdb, PARENT_SRVID, message_handler, NULL); + } else { + sleep(3); + if(!fork()){ + child(CHILD1_SRVID, ev, ctdb, ctdb_db); + } else { + child(CHILD2_SRVID, ev, ctdb, ctdb_db); + } + } + + /* + test 1 : write data and read it back. + */ + test1(ctdb_db); + + /* + wait until both children have sent us a message they have started + */ + printf("Wait for both child processes to start: "); + while (num_msg!=2) { + event_loop_once(ev); + } + printf("STARTED\n"); + + + /* + send message to child 1 to make it to fetch and lock the record + */ + data.dptr=discard_const("dummy message"); + data.dsize=strlen((const char *)data.dptr)+1; + printf("Send message to child 1 to fetch_lock the record\n"); + ctdb_send_message(ctdb, ctdb_get_vnn(ctdb), CHILD1_SRVID, data); + + /* wait for child 1 to complete fetching and locking the record */ + while (num_msg!=3) { + event_loop_once(ev); + } + printf("Child 1 has fetched and locked the record\n"); + + /* now tell child 2 to fetch and lock the same record */ + printf("Send message to child 2 to fetch_lock the record\n"); + ctdb_send_message(ctdb, ctdb_get_vnn(ctdb), CHILD2_SRVID, data); + + /* wait for child 2 to complete fetching and locking the record */ + while (num_msg!=4) { + event_loop_once(ev); + } + printf("Child 2 has fetched and locked the record\n"); + + + while (1) { + event_loop_once(ev); + } + + /* shut it down */ + talloc_free(ctdb); + return 0; +} diff --git a/source4/cluster/ctdb/tests/ctdb_messaging.c b/source4/cluster/ctdb/tests/ctdb_messaging.c new file mode 100644 index 0000000000..050f14de38 --- /dev/null +++ b/source4/cluster/ctdb/tests/ctdb_messaging.c @@ -0,0 +1,187 @@ +/* + test of messaging + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "system/filesys.h" +#include "popt.h" + +static int timelimit = 10; +static int num_records = 10; +static int num_msgs = 1; +static int num_repeats = 100; +static int num_clients = 2; + + +/* + handler for messages in bench_ring() +*/ +static void message_handler(struct ctdb_context *ctdb, uint32_t srvid, + TDB_DATA data, void *private_data) +{ + printf("client vnn:%d received a message to srvid:%d [%s]\n",ctdb_get_vnn(ctdb),srvid,data.dptr); + fflush(stdout); +} + +/* + main program +*/ +int main(int argc, const char *argv[]) +{ + struct ctdb_context *ctdb; + struct ctdb_db_context *ctdb_db; + const char *nlist = NULL; + const char *transport = "tcp"; + const char *myaddress = NULL; + int self_connect=0; + int daemon_mode=0; + char buf[256]; + + struct poptOption popt_options[] = { + POPT_AUTOHELP + { "nlist", 0, POPT_ARG_STRING, &nlist, 0, "node list file", "filename" }, + { "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" }, + { "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL }, + { "self-connect", 0, POPT_ARG_NONE, &self_connect, 0, "enable self connect", "boolean" }, + { "daemon", 0, POPT_ARG_NONE, &daemon_mode, 0, "spawn a ctdb daemon", "boolean" }, + { "timelimit", 't', POPT_ARG_INT, &timelimit, 0, "timelimit", "integer" }, + { "num-records", 'r', POPT_ARG_INT, &num_records, 0, "num_records", "integer" }, + { "num-msgs", 'n', POPT_ARG_INT, &num_msgs, 0, "num_msgs", "integer" }, + { "num-clients", 0, POPT_ARG_INT, &num_clients, 0, "num_clients", "integer" }, + POPT_TABLEEND + }; + int opt; + const char **extra_argv; + int extra_argc = 0; + int ret, i, j; + poptContext pc; + struct event_context *ev; + pid_t pid; + int srvid; + TDB_DATA data; + + pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST); + + while ((opt = poptGetNextOpt(pc)) != -1) { + switch (opt) { + default: + fprintf(stderr, "Invalid option %s: %s\n", + poptBadOption(pc, 0), poptStrerror(opt)); + exit(1); + } + } + + /* setup the remaining options for the main program to use */ + extra_argv = poptGetArgs(pc); + if (extra_argv) { + extra_argv++; + while (extra_argv[extra_argc]) extra_argc++; + } + + if (nlist == NULL || myaddress == NULL) { + printf("You must provide a node list with --nlist and an address with --listen\n"); + exit(1); + } + + ev = event_context_init(NULL); + + /* initialise ctdb */ + ctdb = ctdb_init(ev); + if (ctdb == NULL) { + printf("Failed to init ctdb\n"); + exit(1); + } + + if (self_connect) { + ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT); + } + if (daemon_mode) { + ctdb_set_flags(ctdb, CTDB_FLAG_DAEMON_MODE); + } + + ret = ctdb_set_transport(ctdb, transport); + if (ret == -1) { + printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* tell ctdb what address to listen on */ + ret = ctdb_set_address(ctdb, myaddress); + if (ret == -1) { + printf("ctdb_set_address failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* tell ctdb what nodes are available */ + ret = ctdb_set_nlist(ctdb, nlist); + if (ret == -1) { + printf("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* attach to a specific database */ + ctdb_db = ctdb_attach(ctdb, "test.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666); + if (!ctdb_db) { + printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb)); + exit(1); + } + + /* start the protocol running */ + ret = ctdb_start(ctdb); + + srvid = -1; + for (i=0;i