summaryrefslogtreecommitdiff
path: root/source4/cluster/ctdb/common
diff options
context:
space:
mode:
Diffstat (limited to 'source4/cluster/ctdb/common')
-rw-r--r--source4/cluster/ctdb/common/cmdline.c109
-rw-r--r--source4/cluster/ctdb/common/ctdb.c421
-rw-r--r--source4/cluster/ctdb/common/ctdb_call.c706
-rw-r--r--source4/cluster/ctdb/common/ctdb_client.c681
-rw-r--r--source4/cluster/ctdb/common/ctdb_daemon.c710
-rw-r--r--source4/cluster/ctdb/common/ctdb_io.c79
-rw-r--r--source4/cluster/ctdb/common/ctdb_lockwait.c137
-rw-r--r--source4/cluster/ctdb/common/ctdb_ltdb.c240
-rw-r--r--source4/cluster/ctdb/common/ctdb_message.c138
-rw-r--r--source4/cluster/ctdb/common/ctdb_util.c178
-rw-r--r--source4/cluster/ctdb/common/system.c385
11 files changed, 718 insertions, 3066 deletions
diff --git a/source4/cluster/ctdb/common/cmdline.c b/source4/cluster/ctdb/common/cmdline.c
index 821e1fe38f..f8fca5e4a0 100644
--- a/source4/cluster/ctdb/common/cmdline.c
+++ b/source4/cluster/ctdb/common/cmdline.c
@@ -3,18 +3,18 @@
Copyright (C) Andrew Tridgell 2007
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 3 of the License, or (at your option) any later version.
-
- This library is distributed in the hope that it will be useful,
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#include "includes.h"
@@ -28,30 +28,35 @@
*/
static struct {
- const char *nlist;
- const char *transport;
- const char *myaddress;
- int self_connect;
- const char *db_dir;
+ const char *socketname;
int torture;
+ const char *events;
} ctdb_cmdline = {
- .nlist = NULL,
- .transport = "tcp",
- .myaddress = NULL,
- .self_connect = 0,
- .db_dir = NULL,
- .torture = 0
+ .socketname = CTDB_PATH,
+ .torture = 0,
};
+enum {OPT_EVENTSYSTEM=1};
+
+static void ctdb_cmdline_callback(poptContext con,
+ enum poptCallbackReason reason,
+ const struct poptOption *opt,
+ const char *arg, const void *data)
+{
+ switch (opt->val) {
+ case OPT_EVENTSYSTEM:
+ event_set_default_backend(arg);
+ break;
+ }
+}
+
struct poptOption popt_ctdb_cmdline[] = {
- { "nlist", 0, POPT_ARG_STRING, &ctdb_cmdline.nlist, 0, "node list file", "filename" },
- { "listen", 0, POPT_ARG_STRING, &ctdb_cmdline.myaddress, 0, "address to listen on", "address" },
- { "transport", 0, POPT_ARG_STRING, &ctdb_cmdline.transport, 0, "protocol transport", NULL },
- { "self-connect", 0, POPT_ARG_NONE, &ctdb_cmdline.self_connect, 0, "enable self connect", "boolean" },
+ { NULL, 0, POPT_ARG_CALLBACK, (void *)ctdb_cmdline_callback },
+ { "socket", 0, POPT_ARG_STRING, &ctdb_cmdline.socketname, 0, "local socket name", "filename" },
{ "debug", 'd', POPT_ARG_INT, &LogLevel, 0, "debug level"},
- { "dbdir", 0, POPT_ARG_STRING, &ctdb_cmdline.db_dir, 0, "directory for the tdb files", NULL },
{ "torture", 0, POPT_ARG_NONE, &ctdb_cmdline.torture, 0, "enable nastiness in library", NULL },
+ { "events", 0, POPT_ARG_STRING, NULL, OPT_EVENTSYSTEM, "event system", NULL },
{ NULL }
};
@@ -64,11 +69,6 @@ struct ctdb_context *ctdb_cmdline_init(struct event_context *ev)
struct ctdb_context *ctdb;
int ret;
- if (ctdb_cmdline.nlist == NULL || ctdb_cmdline.myaddress == NULL) {
- printf("You must provide a node list with --nlist and an address with --listen\n");
- exit(1);
- }
-
/* initialise ctdb */
ctdb = ctdb_init(ev);
if (ctdb == NULL) {
@@ -76,36 +76,14 @@ struct ctdb_context *ctdb_cmdline_init(struct event_context *ev)
exit(1);
}
- if (ctdb_cmdline.self_connect) {
- ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT);
- }
if (ctdb_cmdline.torture) {
ctdb_set_flags(ctdb, CTDB_FLAG_TORTURE);
}
- ret = ctdb_set_transport(ctdb, ctdb_cmdline.transport);
- if (ret == -1) {
- printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb));
- exit(1);
- }
-
- /* tell ctdb what address to listen on */
- ret = ctdb_set_address(ctdb, ctdb_cmdline.myaddress);
+ /* tell ctdb the socket address */
+ ret = ctdb_set_socketname(ctdb, ctdb_cmdline.socketname);
if (ret == -1) {
- printf("ctdb_set_address failed - %s\n", ctdb_errstr(ctdb));
- exit(1);
- }
-
- /* tell ctdb what nodes are available */
- ret = ctdb_set_nlist(ctdb, ctdb_cmdline.nlist);
- if (ret == -1) {
- printf("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb));
- exit(1);
- }
-
- ret = ctdb_set_tdb_dir(ctdb, ctdb_cmdline.db_dir);
- if (ret == -1) {
- printf("ctdb_set_tdb_dir failed - %s\n", ctdb_errstr(ctdb));
+ printf("ctdb_set_socketname failed - %s\n", ctdb_errstr(ctdb));
exit(1);
}
@@ -116,7 +94,7 @@ struct ctdb_context *ctdb_cmdline_init(struct event_context *ev)
/*
startup a client only ctdb context
*/
-struct ctdb_context *ctdb_cmdline_client(struct event_context *ev, const char *ctdb_socket)
+struct ctdb_context *ctdb_cmdline_client(struct event_context *ev)
{
struct ctdb_context *ctdb;
int ret;
@@ -128,7 +106,12 @@ struct ctdb_context *ctdb_cmdline_client(struct event_context *ev, const char *c
exit(1);
}
- ctdb->daemon.name = talloc_strdup(ctdb, ctdb_socket);
+ /* tell ctdb the socket address */
+ ret = ctdb_set_socketname(ctdb, ctdb_cmdline.socketname);
+ if (ret == -1) {
+ printf("ctdb_set_socketname failed - %s\n", ctdb_errstr(ctdb));
+ exit(1);
+ }
ret = ctdb_socket_connect(ctdb);
if (ret != 0) {
@@ -137,5 +120,13 @@ struct ctdb_context *ctdb_cmdline_client(struct event_context *ev, const char *c
return NULL;
}
+ /* get our vnn */
+ ctdb->vnn = ctdb_ctrl_getvnn(ctdb, timeval_zero(), CTDB_CURRENT_NODE);
+ if (ctdb->vnn == (uint32_t)-1) {
+ DEBUG(0,(__location__ " Failed to get ctdb vnn\n"));
+ talloc_free(ctdb);
+ return NULL;
+ }
+
return ctdb;
}
diff --git a/source4/cluster/ctdb/common/ctdb.c b/source4/cluster/ctdb/common/ctdb.c
deleted file mode 100644
index a802daa7b5..0000000000
--- a/source4/cluster/ctdb/common/ctdb.c
+++ /dev/null
@@ -1,421 +0,0 @@
-/*
- ctdb main protocol code
-
- Copyright (C) Andrew Tridgell 2006
-
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 3 of the License, or (at your option) any later version.
-
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "includes.h"
-#include "lib/tdb/include/tdb.h"
-#include "lib/events/events.h"
-#include "lib/util/dlinklist.h"
-#include "system/network.h"
-#include "system/filesys.h"
-#include "../include/ctdb_private.h"
-
-/*
- choose the transport we will use
-*/
-int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
-{
- int ctdb_tcp_init(struct ctdb_context *ctdb);
-#ifdef USE_INFINIBAND
- int ctdb_ibw_init(struct ctdb_context *ctdb);
-#endif /* USE_INFINIBAND */
-
- if (strcmp(transport, "tcp") == 0) {
- return ctdb_tcp_init(ctdb);
- }
-#ifdef USE_INFINIBAND
- if (strcmp(transport, "ib") == 0) {
- return ctdb_ibw_init(ctdb);
- }
-#endif /* USE_INFINIBAND */
-
- ctdb_set_error(ctdb, "Unknown transport '%s'\n", transport);
- return -1;
-}
-
-/*
- set some ctdb flags
-*/
-void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
-{
- ctdb->flags |= flags;
-}
-
-/*
- clear some ctdb flags
-*/
-void ctdb_clear_flags(struct ctdb_context *ctdb, unsigned flags)
-{
- ctdb->flags &= ~flags;
-}
-
-/*
- set max acess count before a dmaster migration
-*/
-void ctdb_set_max_lacount(struct ctdb_context *ctdb, unsigned count)
-{
- ctdb->max_lacount = count;
-}
-
-/*
- set the directory for the local databases
-*/
-int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir)
-{
- if (dir == NULL) {
- ctdb->db_directory = talloc_asprintf(ctdb, "ctdb-%u", ctdb_get_vnn(ctdb));
- } else {
- ctdb->db_directory = talloc_strdup(ctdb, dir);
- }
- if (ctdb->db_directory == NULL) {
- return -1;
- }
- return 0;
-}
-
-/*
- add a node to the list of active nodes
-*/
-static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
-{
- struct ctdb_node *node, **nodep;
-
- nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
- CTDB_NO_MEMORY(ctdb, nodep);
-
- ctdb->nodes = nodep;
- nodep = &ctdb->nodes[ctdb->num_nodes];
- (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
- CTDB_NO_MEMORY(ctdb, *nodep);
- node = *nodep;
-
- if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
- return -1;
- }
- node->ctdb = ctdb;
- node->name = talloc_asprintf(node, "%s:%u",
- node->address.address,
- node->address.port);
- /* for now we just set the vnn to the line in the file - this
- will change! */
- node->vnn = ctdb->num_nodes;
-
- if (ctdb->methods->add_node(node) != 0) {
- talloc_free(node);
- return -1;
- }
-
- if (ctdb_same_address(&ctdb->address, &node->address)) {
- ctdb->vnn = node->vnn;
- }
-
- ctdb->num_nodes++;
-
- return 0;
-}
-
-/*
- setup the node list from a file
-*/
-int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
-{
- char **lines;
- int nlines;
- int i;
-
- lines = file_lines_load(nlist, &nlines, ctdb);
- if (lines == NULL) {
- ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
- return -1;
- }
-
- for (i=0;i<nlines;i++) {
- if (ctdb_add_node(ctdb, lines[i]) != 0) {
- talloc_free(lines);
- return -1;
- }
- }
-
- talloc_free(lines);
- return 0;
-}
-
-/*
- setup the local node address
-*/
-int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
-{
- if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
- return -1;
- }
-
- ctdb->name = talloc_asprintf(ctdb, "%s:%u",
- ctdb->address.address,
- ctdb->address.port);
- return 0;
-}
-
-/*
- add a node to the list of active nodes
-*/
-int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, int id)
-{
- struct ctdb_registered_call *call;
-
- call = talloc(ctdb_db, struct ctdb_registered_call);
- call->fn = fn;
- call->id = id;
-
- DLIST_ADD(ctdb_db->calls, call);
- return 0;
-}
-
-/*
- return the vnn of this node
-*/
-uint32_t ctdb_get_vnn(struct ctdb_context *ctdb)
-{
- return ctdb->vnn;
-}
-
-/*
- return the number of nodes
-*/
-uint32_t ctdb_get_num_nodes(struct ctdb_context *ctdb)
-{
- return ctdb->num_nodes;
-}
-
-
-/*
- called by the transport layer when a packet comes in
-*/
-void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
-{
- struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
- TALLOC_CTX *tmp_ctx;
-
- ctdb->status.node_packets_recv++;
-
- /* place the packet as a child of the tmp_ctx. We then use
- talloc_free() below to free it. If any of the calls want
- to keep it, then they will steal it somewhere else, and the
- talloc_free() will only free the tmp_ctx */
- tmp_ctx = talloc_new(ctdb);
- talloc_steal(tmp_ctx, hdr);
-
- if (length < sizeof(*hdr)) {
- ctdb_set_error(ctdb, "Bad packet length %d\n", length);
- goto done;
- }
- if (length != hdr->length) {
- ctdb_set_error(ctdb, "Bad header length %d expected %d\n",
- hdr->length, length);
- goto done;
- }
-
- if (hdr->ctdb_magic != CTDB_MAGIC) {
- ctdb_set_error(ctdb, "Non CTDB packet rejected\n");
- goto done;
- }
-
- if (hdr->ctdb_version != CTDB_VERSION) {
- ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
- goto done;
- }
-
- DEBUG(3,(__location__ " ctdb request %d of type %d length %d from "
- "node %d to %d\n", hdr->reqid, hdr->operation, hdr->length,
- hdr->srcnode, hdr->destnode));
-
- switch (hdr->operation) {
- case CTDB_REQ_CALL:
- ctdb->status.count.req_call++;
- ctdb_request_call(ctdb, hdr);
- break;
-
- case CTDB_REPLY_CALL:
- ctdb->status.count.reply_call++;
- ctdb_reply_call(ctdb, hdr);
- break;
-
- case CTDB_REPLY_ERROR:
- ctdb->status.count.reply_error++;
- ctdb_reply_error(ctdb, hdr);
- break;
-
- case CTDB_REPLY_REDIRECT:
- ctdb->status.count.reply_redirect++;
- ctdb_reply_redirect(ctdb, hdr);
- break;
-
- case CTDB_REQ_DMASTER:
- ctdb->status.count.req_dmaster++;
- ctdb_request_dmaster(ctdb, hdr);
- break;
-
- case CTDB_REPLY_DMASTER:
- ctdb->status.count.reply_dmaster++;
- ctdb_reply_dmaster(ctdb, hdr);
- break;
-
- case CTDB_REQ_MESSAGE:
- ctdb->status.count.req_message++;
- ctdb_request_message(ctdb, hdr);
- break;
-
- case CTDB_REQ_FINISHED:
- ctdb->status.count.req_finished++;
- ctdb_request_finished(ctdb, hdr);
- break;
-
- default:
- DEBUG(0,("%s: Packet with unknown operation %d\n",
- __location__, hdr->operation));
- break;
- }
-
-done:
- talloc_free(tmp_ctx);
-}
-
-/*
- called by the transport layer when a packet comes in
-*/
-void ctdb_recv_raw_pkt(void *p, uint8_t *data, uint32_t length)
-{
- struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
- ctdb_recv_pkt(ctdb, data, length);
-}
-
-/*
- called by the transport layer when a node is dead
-*/
-static void ctdb_node_dead(struct ctdb_node *node)
-{
- node->ctdb->num_connected--;
- DEBUG(1,("%s: node %s is dead: %d connected\n",
- node->ctdb->name, node->name, node->ctdb->num_connected));
-}
-
-/*
- called by the transport layer when a node is connected
-*/
-static void ctdb_node_connected(struct ctdb_node *node)
-{
- node->ctdb->num_connected++;
- DEBUG(1,("%s: connected to %s - %d connected\n",
- node->ctdb->name, node->name, node->ctdb->num_connected));
-}
-
-/*
- wait for all nodes to be connected
-*/
-void ctdb_daemon_connect_wait(struct ctdb_context *ctdb)
-{
- int expected = ctdb->num_nodes - 1;
- if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {
- expected++;
- }
- while (ctdb->num_connected != expected) {
- DEBUG(3,("ctdb_connect_wait: waiting for %d nodes (have %d)\n",
- expected, ctdb->num_connected));
- event_loop_once(ctdb->ev);
- }
- DEBUG(3,("ctdb_connect_wait: got all %d nodes\n", expected));
-}
-
-struct queue_next {
- struct ctdb_context *ctdb;
- struct ctdb_req_header *hdr;
-};
-
-
-/*
- trigered when a deferred packet is due
- */
-static void queue_next_trigger(struct event_context *ev, struct timed_event *te,
- struct timeval t, void *private_data)
-{
- struct queue_next *q = talloc_get_type(private_data, struct queue_next);
- ctdb_recv_pkt(q->ctdb, (uint8_t *)q->hdr, q->hdr->length);
- talloc_free(q);
-}
-
-/*
- defer a packet, so it is processed on the next event loop
- this is used for sending packets to ourselves
- */
-static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
-{
- struct queue_next *q;
- q = talloc(ctdb, struct queue_next);
- if (q == NULL) {
- DEBUG(0,(__location__ " Failed to allocate deferred packet\n"));
- return;
- }
- q->ctdb = ctdb;
- q->hdr = talloc_memdup(ctdb, hdr, hdr->length);
- if (q->hdr == NULL) {
- DEBUG(0,("Error copying deferred packet to self\n"));
- return;
- }
- event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
-}
-
-/*
- queue a packet or die
-*/
-void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
-{
- struct ctdb_node *node;
- ctdb->status.node_packets_sent++;
- node = ctdb->nodes[hdr->destnode];
- if (hdr->destnode == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
- ctdb_defer_packet(ctdb, hdr);
- } else if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
- ctdb_fatal(ctdb, "Unable to queue packet\n");
- }
-}
-
-
-static const struct ctdb_upcalls ctdb_upcalls = {
- .recv_pkt = ctdb_recv_pkt,
- .node_dead = ctdb_node_dead,
- .node_connected = ctdb_node_connected
-};
-
-/*
- initialise the ctdb daemon.
-
- NOTE: In current code the daemon does not fork. This is for testing purposes only
- and to simplify the code.
-*/
-struct ctdb_context *ctdb_init(struct event_context *ev)
-{
- struct ctdb_context *ctdb;
-
- ctdb = talloc_zero(ev, struct ctdb_context);
- ctdb->ev = ev;
- ctdb->upcalls = &ctdb_upcalls;
- ctdb->idr = idr_init(ctdb);
- ctdb->max_lacount = CTDB_DEFAULT_MAX_LACOUNT;
-
- return ctdb;
-}
-
diff --git a/source4/cluster/ctdb/common/ctdb_call.c b/source4/cluster/ctdb/common/ctdb_call.c
deleted file mode 100644
index 67ab076308..0000000000
--- a/source4/cluster/ctdb/common/ctdb_call.c
+++ /dev/null
@@ -1,706 +0,0 @@
-/*
- ctdb_call protocol code
-
- Copyright (C) Andrew Tridgell 2006
-
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 3 of the License, or (at your option) any later version.
-
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, see <http://www.gnu.org/licenses/>.
-*/
-/*
- see http://wiki.samba.org/index.php/Samba_%26_Clustering for
- protocol design and packet details
-*/
-#include "includes.h"
-#include "lib/events/events.h"
-#include "lib/tdb/include/tdb.h"
-#include "system/network.h"
-#include "system/filesys.h"
-#include "../include/ctdb_private.h"
-
-/*
- find the ctdb_db from a db index
- */
- struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
-{
- struct ctdb_db_context *ctdb_db;
-
- for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
- if (ctdb_db->db_id == id) {
- break;
- }
- }
- return ctdb_db;
-}
-
-
-/*
- local version of ctdb_call
-*/
-int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
- struct ctdb_ltdb_header *header, TDB_DATA *data,
- uint32_t caller)
-{
- struct ctdb_call_info *c;
- struct ctdb_registered_call *fn;
- struct ctdb_context *ctdb = ctdb_db->ctdb;
-
- c = talloc(ctdb, struct ctdb_call_info);
- CTDB_NO_MEMORY(ctdb, c);
-
- c->key = call->key;
- c->call_data = &call->call_data;
- c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
- c->record_data.dsize = data->dsize;
- CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
- c->new_data = NULL;
- c->reply_data = NULL;
- c->status = 0;
-
- for (fn=ctdb_db->calls;fn;fn=fn->next) {
- if (fn->id == call->call_id) break;
- }
- if (fn == NULL) {
- ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
- talloc_free(c);
- return -1;
- }
-
- if (fn->fn(c) != 0) {
- ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
- talloc_free(c);
- return -1;
- }
-
- if (header->laccessor != caller) {
- header->lacount = 0;
- }
- header->laccessor = caller;
- header->lacount++;
-
- /* we need to force the record to be written out if this was a remote access,
- so that the lacount is updated */
- if (c->new_data == NULL && header->laccessor != ctdb->vnn) {
- c->new_data = &c->record_data;
- }
-
- if (c->new_data) {
- if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
- ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
- talloc_free(c);
- return -1;
- }
- }
-
- if (c->reply_data) {
- call->reply_data = *c->reply_data;
- talloc_steal(ctdb, call->reply_data.dptr);
- talloc_set_name_const(call->reply_data.dptr, __location__);
- } else {
- call->reply_data.dptr = NULL;
- call->reply_data.dsize = 0;
- }
- call->status = c->status;
-
- talloc_free(c);
-
- return 0;
-}
-
-/*
- send an error reply
-*/
-static void ctdb_send_error(struct ctdb_context *ctdb,
- struct ctdb_req_header *hdr, uint32_t status,
- const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
-static void ctdb_send_error(struct ctdb_context *ctdb,
- struct ctdb_req_header *hdr, uint32_t status,
- const char *fmt, ...)
-{
- va_list ap;
- struct ctdb_reply_error *r;
- char *msg;
- int msglen, len;
-
- va_start(ap, fmt);
- msg = talloc_vasprintf(ctdb, fmt, ap);
- if (msg == NULL) {
- ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
- }
- va_end(ap);
-
- msglen = strlen(msg)+1;
- len = offsetof(struct ctdb_reply_error, msg);
- r = ctdb->methods->allocate_pkt(msg, len + msglen);
- CTDB_NO_MEMORY_FATAL(ctdb, r);
- talloc_set_name_const(r, "send_error packet");
-
- r->hdr.length = len + msglen;
- r->hdr.ctdb_magic = CTDB_MAGIC;
- r->hdr.ctdb_version = CTDB_VERSION;
- r->hdr.operation = CTDB_REPLY_ERROR;
- r->hdr.destnode = hdr->srcnode;
- r->hdr.srcnode = ctdb->vnn;
- r->hdr.reqid = hdr->reqid;
- r->status = status;
- r->msglen = msglen;
- memcpy(&r->msg[0], msg, msglen);
-
- ctdb_queue_packet(ctdb, &r->hdr);
-
- talloc_free(msg);
-}
-
-
-/*
- send a redirect reply
-*/
-static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
- struct ctdb_req_call *c,
- struct ctdb_ltdb_header *header)
-{
- struct ctdb_reply_redirect *r;
-
- r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r));
- CTDB_NO_MEMORY_FATAL(ctdb, r);
- talloc_set_name_const(r, "send_redirect packet");
- r->hdr.length = sizeof(*r);
- r->hdr.ctdb_magic = CTDB_MAGIC;
- r->hdr.ctdb_version = CTDB_VERSION;
- r->hdr.operation = CTDB_REPLY_REDIRECT;
- r->hdr.destnode = c->hdr.srcnode;
- r->hdr.srcnode = ctdb->vnn;
- r->hdr.reqid = c->hdr.reqid;
- r->dmaster = header->dmaster;
-
- ctdb_queue_packet(ctdb, &r->hdr);
-
- talloc_free(r);
-}
-
-/*
- send a dmaster request (give another node the dmaster for a record)
-
- This is always sent to the lmaster, which ensures that the lmaster
- always knows who the dmaster is. The lmaster will then send a
- CTDB_REPLY_DMASTER to the new dmaster
-*/
-static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
- struct ctdb_req_call *c,
- struct ctdb_ltdb_header *header,
- TDB_DATA *key, TDB_DATA *data)
-{
- struct ctdb_req_dmaster *r;
- struct ctdb_context *ctdb = ctdb_db->ctdb;
- int len;
-
- len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize;
- r = ctdb->methods->allocate_pkt(ctdb, len);
- CTDB_NO_MEMORY_FATAL(ctdb, r);
- talloc_set_name_const(r, "send_dmaster packet");
- r->hdr.length = len;
- r->hdr.ctdb_magic = CTDB_MAGIC;
- r->hdr.ctdb_version = CTDB_VERSION;
- r->hdr.operation = CTDB_REQ_DMASTER;
- r->hdr.destnode = ctdb_lmaster(ctdb, key);
- r->hdr.srcnode = ctdb->vnn;
- r->hdr.reqid = c->hdr.reqid;
- r->db_id = c->db_id;
- r->dmaster = c->hdr.srcnode;
- r->keylen = key->dsize;
- r->datalen = data->dsize;
- memcpy(&r->data[0], key->dptr, key->dsize);
- memcpy(&r->data[key->dsize], data->dptr, data->dsize);
-
- /* XXX - probably not necessary when lmaster==dmaster
- update the ltdb to record the new dmaster */
- header->dmaster = r->hdr.destnode;
- ctdb_ltdb_store(ctdb_db, *key, header, *data);
-
- ctdb_queue_packet(ctdb, &r->hdr);
-
- talloc_free(r);
-}
-
-
-/*
- called when a CTDB_REQ_DMASTER packet comes in
-
- this comes into the lmaster for a record when the current dmaster
- wants to give up the dmaster role and give it to someone else
-*/
-void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
-{
- struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
- struct ctdb_reply_dmaster *r;
- TDB_DATA key, data, data2;
- struct ctdb_ltdb_header header;
- struct ctdb_db_context *ctdb_db;
- int ret, len;
- TALLOC_CTX *tmp_ctx;
-
- key.dptr = c->data;
- key.dsize = c->keylen;
- data.dptr = c->data + c->keylen;
- data.dsize = c->datalen;
-
- ctdb_db = find_ctdb_db(ctdb, c->db_id);
- if (!ctdb_db) {
- ctdb_send_error(ctdb, hdr, -1,
- "Unknown database in request. db_id==0x%08x",
- c->db_id);
- return;
- }
-
- /* fetch the current record */
- ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
- ctdb_recv_raw_pkt, ctdb);
- if (ret == -1) {
- ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
- return;
- }
- if (ret == -2) {
- DEBUG(2,(__location__ " deferring ctdb_request_dmaster\n"));
- return;
- }
-
- /* its a protocol error if the sending node is not the current dmaster */
- if (header.dmaster != hdr->srcnode &&
- hdr->srcnode != ctdb_lmaster(ctdb_db->ctdb, &key)) {
- ctdb_fatal(ctdb, "dmaster request from non-master");
- return;
- }
-
- header.dmaster = c->dmaster;
- ret = ctdb_ltdb_store(ctdb_db, key, &header, data);
- ctdb_ltdb_unlock(ctdb_db, key);
- if (ret != 0) {
- ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster");
- return;
- }
-
- /* put the packet on a temporary context, allowing us to safely free
- it below even if ctdb_reply_dmaster() has freed it already */
- tmp_ctx = talloc_new(ctdb);
-
- /* send the CTDB_REPLY_DMASTER */
- len = offsetof(struct ctdb_reply_dmaster, data) + data.dsize;
- r = ctdb->methods->allocate_pkt(tmp_ctx, len);
- CTDB_NO_MEMORY_FATAL(ctdb, r);
-
- talloc_set_name_const(r, "reply_dmaster packet");
- r->hdr.length = len;
- r->hdr.ctdb_magic = CTDB_MAGIC;
- r->hdr.ctdb_version = CTDB_VERSION;
- r->hdr.operation = CTDB_REPLY_DMASTER;
- r->hdr.destnode = c->dmaster;
- r->hdr.srcnode = ctdb->vnn;
- r->hdr.reqid = hdr->reqid;
- r->datalen = data.dsize;
- memcpy(&r->data[0], data.dptr, data.dsize);
-
- ctdb_queue_packet(ctdb, &r->hdr);
-
- talloc_free(tmp_ctx);
-}
-
-
-/*
- called when a CTDB_REQ_CALL packet comes in
-*/
-void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
-{
- struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
- TDB_DATA data;
- struct ctdb_reply_call *r;
- int ret, len;
- struct ctdb_ltdb_header header;
- struct ctdb_call call;
- struct ctdb_db_context *ctdb_db;
-
- ctdb_db = find_ctdb_db(ctdb, c->db_id);
- if (!ctdb_db) {
- ctdb_send_error(ctdb, hdr, -1,
- "Unknown database in request. db_id==0x%08x",
- c->db_id);
- return;
- }
-
- call.call_id = c->callid;
- call.key.dptr = c->data;
- call.key.dsize = c->keylen;
- call.call_data.dptr = c->data + c->keylen;
- call.call_data.dsize = c->calldatalen;
-
- /* determine if we are the dmaster for this key. This also
- fetches the record data (if any), thus avoiding a 2nd fetch of the data
- if the call will be answered locally */
-
- ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call.key, &header, hdr, &data,
- ctdb_recv_raw_pkt, ctdb);
- if (ret == -1) {
- ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
- return;
- }
- if (ret == -2) {
- DEBUG(2,(__location__ " deferred ctdb_request_call\n"));
- return;
- }
-
- /* if we are not the dmaster, then send a redirect to the
- requesting node */
- if (header.dmaster != ctdb->vnn) {
- ctdb_call_send_redirect(ctdb, c, &header);
- talloc_free(data.dptr);
- ctdb_ltdb_unlock(ctdb_db, call.key);
- return;
- }
-
- /* if this nodes has done enough consecutive calls on the same record
- then give them the record
- or if the node requested an immediate migration
- */
- if ( (header.laccessor == c->hdr.srcnode
- && header.lacount >= ctdb->max_lacount)
- || c->flags&CTDB_IMMEDIATE_MIGRATION ) {
- ctdb_call_send_dmaster(ctdb_db, c, &header, &call.key, &data);
- talloc_free(data.dptr);
- ctdb_ltdb_unlock(ctdb_db, call.key);
- return;
- }
-
- ctdb_call_local(ctdb_db, &call, &header, &data, c->hdr.srcnode);
-
- ctdb_ltdb_unlock(ctdb_db, call.key);
-
- len = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize;
- r = ctdb->methods->allocate_pkt(ctdb, len);
- CTDB_NO_MEMORY_FATAL(ctdb, r);
- talloc_set_name_const(r, "reply_call packet");
- r->hdr.length = len;
- r->hdr.ctdb_magic = CTDB_MAGIC;
- r->hdr.ctdb_version = CTDB_VERSION;
- r->hdr.operation = CTDB_REPLY_CALL;
- r->hdr.destnode = hdr->srcnode;
- r->hdr.srcnode = hdr->destnode;
- r->hdr.reqid = hdr->reqid;
- r->status = call.status;
- r->datalen = call.reply_data.dsize;
- if (call.reply_data.dsize) {
- memcpy(&r->data[0], call.reply_data.dptr, call.reply_data.dsize);
- talloc_free(call.reply_data.dptr);
- }
-
- ctdb_queue_packet(ctdb, &r->hdr);
-
- talloc_free(r);
-}
-
-/*
- called when a CTDB_REPLY_CALL packet comes in
-
- This packet comes in response to a CTDB_REQ_CALL request packet. It
- contains any reply data from the call
-*/
-void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
-{
- struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
- struct ctdb_call_state *state;
-
- state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
- if (state == NULL) {
- DEBUG(0, ("reqid %d not found\n", hdr->reqid));
- return;
- }
-
- state->call.reply_data.dptr = c->data;
- state->call.reply_data.dsize = c->datalen;
- state->call.status = c->status;
-
- talloc_steal(state, c);
-
- state->state = CTDB_CALL_DONE;
- if (state->async.fn) {
- state->async.fn(state);
- }
-}
-
-/*
- called when a CTDB_REPLY_DMASTER packet comes in
-
- This packet comes in from the lmaster response to a CTDB_REQ_CALL
- request packet. It means that the current dmaster wants to give us
- the dmaster role
-*/
-void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
-{
- struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
- struct ctdb_call_state *state;
- struct ctdb_db_context *ctdb_db;
- TDB_DATA data;
- int ret;
-
- state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
- if (state == NULL) {
- return;
- }
-
- ctdb_db = state->ctdb_db;
-
- ret = ctdb_ltdb_lock_requeue(ctdb_db, state->call.key, hdr,
- ctdb_recv_raw_pkt, ctdb);
- if (ret == -2) {
- return;
- }
- if (ret != 0) {
- DEBUG(0,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
- return;
- }
-
- data.dptr = c->data;
- data.dsize = c->datalen;
-
- talloc_steal(state, c);
-
- /* we're now the dmaster - update our local ltdb with new header
- and data */
- state->header.dmaster = ctdb->vnn;
-
- if (ctdb_ltdb_store(ctdb_db, state->call.key, &state->header, data) != 0) {
- ctdb_ltdb_unlock(ctdb_db, state->call.key);
- ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
- return;
- }
-
- ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn);
-
- ctdb_ltdb_unlock(ctdb_db, state->call.key);
-
- talloc_steal(state, state->call.reply_data.dptr);
-
- state->state = CTDB_CALL_DONE;
- if (state->async.fn) {
- state->async.fn(state);
- }
-}
-
-
-/*
- called when a CTDB_REPLY_ERROR packet comes in
-*/
-void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
-{
- struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
- struct ctdb_call_state *state;
-
- state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
- if (state == NULL) return;
-
- talloc_steal(state, c);
-
- state->state = CTDB_CALL_ERROR;
- state->errmsg = (char *)c->msg;
- if (state->async.fn) {
- state->async.fn(state);
- }
-}
-
-
-/*
- called when a CTDB_REPLY_REDIRECT packet comes in
-
- This packet arrives when we have sent a CTDB_REQ_CALL request and
- the node that received it is not the dmaster for the given key. We
- are given a hint as to what node to try next.
-*/
-void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
-{
- struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr;
- struct ctdb_call_state *state;
-
- state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
- if (state == NULL) return;
-
- talloc_steal(state, c);
-
- /* don't allow for too many redirects */
- if (state->redirect_count++ == CTDB_MAX_REDIRECT) {
- c->dmaster = ctdb_lmaster(ctdb, &state->call.key);
- }
-
- /* send it off again */
- state->node = ctdb->nodes[c->dmaster];
- state->c->hdr.destnode = c->dmaster;
-
- ctdb_queue_packet(ctdb, &state->c->hdr);
-}
-
-/*
- destroy a ctdb_call
-*/
-static int ctdb_call_destructor(struct ctdb_call_state *state)
-{
- idr_remove(state->node->ctdb->idr, state->c->hdr.reqid);
- return 0;
-}
-
-
-/*
- called when a ctdb_call times out
-*/
-void ctdb_call_timeout(struct event_context *ev, struct timed_event *te,
- struct timeval t, void *private_data)
-{
- struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
- state->state = CTDB_CALL_ERROR;
- ctdb_set_error(state->node->ctdb, "ctdb_call %u timed out",
- state->c->hdr.reqid);
- if (state->async.fn) {
- state->async.fn(state);
- }
-}
-
-/*
- this allows the caller to setup a async.fn
-*/
-static void call_local_trigger(struct event_context *ev, struct timed_event *te,
- struct timeval t, void *private_data)
-{
- struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
- if (state->async.fn) {
- state->async.fn(state);
- }
-}
-
-
-/*
- construct an event driven local ctdb_call
-
- this is used so that locally processed ctdb_call requests are processed
- in an event driven manner
-*/
-struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
- struct ctdb_call *call,
- struct ctdb_ltdb_header *header,
- TDB_DATA *data)
-{
- struct ctdb_call_state *state;
- struct ctdb_context *ctdb = ctdb_db->ctdb;
- int ret;
-
- state = talloc_zero(ctdb_db, struct ctdb_call_state);
- CTDB_NO_MEMORY_NULL(ctdb, state);
-
- talloc_steal(state, data->dptr);
-
- state->state = CTDB_CALL_DONE;
- state->node = ctdb->nodes[ctdb->vnn];
- state->call = *call;
- state->ctdb_db = ctdb_db;
-
- ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn);
- talloc_steal(state, state->call.reply_data.dptr);
-
- event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
-
- return state;
-}
-
-
-/*
- make a remote ctdb call - async send. Called in daemon context.
-
- This constructs a ctdb_call request and queues it for processing.
- This call never blocks.
-*/
-struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
- struct ctdb_call *call,
- struct ctdb_ltdb_header *header)
-{
- uint32_t len;
- struct ctdb_call_state *state;
- struct ctdb_context *ctdb = ctdb_db->ctdb;
-
- state = talloc_zero(ctdb_db, struct ctdb_call_state);
- CTDB_NO_MEMORY_NULL(ctdb, state);
-
- len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
- state->c = ctdb->methods->allocate_pkt(state, len);
- CTDB_NO_MEMORY_NULL(ctdb, state->c);
- talloc_set_name_const(state->c, "req_call packet");
-
- state->c->hdr.length = len;
- state->c->hdr.ctdb_magic = CTDB_MAGIC;
- state->c->hdr.ctdb_version = CTDB_VERSION;
- state->c->hdr.operation = CTDB_REQ_CALL;
- state->c->hdr.destnode = header->dmaster;
- state->c->hdr.srcnode = ctdb->vnn;
- /* this limits us to 16k outstanding messages - not unreasonable */
- state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
- state->c->flags = call->flags;
- state->c->db_id = ctdb_db->db_id;
- state->c->callid = call->call_id;
- state->c->keylen = call->key.dsize;
- state->c->calldatalen = call->call_data.dsize;
- memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
- memcpy(&state->c->data[call->key.dsize],
- call->call_data.dptr, call->call_data.dsize);
- state->call = *call;
- state->call.call_data.dptr = &state->c->data[call->key.dsize];
- state->call.key.dptr = &state->c->data[0];
-
- state->node = ctdb->nodes[header->dmaster];
- state->state = CTDB_CALL_WAIT;
- state->header = *header;
- state->ctdb_db = ctdb_db;
-
- talloc_set_destructor(state, ctdb_call_destructor);
-
- ctdb_queue_packet(ctdb, &state->c->hdr);
-
- event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0),
- ctdb_call_timeout, state);
- return state;
-}
-
-/*
- make a remote ctdb call - async recv - called in daemon context
-
- This is called when the program wants to wait for a ctdb_call to complete and get the
- results. This call will block unless the call has already completed.
-*/
-int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
-{
- while (state->state < CTDB_CALL_DONE) {
- event_loop_once(state->node->ctdb->ev);
- }
- if (state->state != CTDB_CALL_DONE) {
- ctdb_set_error(state->node->ctdb, "%s", state->errmsg);
- talloc_free(state);
- return -1;
- }
-
- if (state->call.reply_data.dsize) {
- call->reply_data.dptr = talloc_memdup(state->node->ctdb,
- state->call.reply_data.dptr,
- state->call.reply_data.dsize);
- call->reply_data.dsize = state->call.reply_data.dsize;
- } else {
- call->reply_data.dptr = NULL;
- call->reply_data.dsize = 0;
- }
- call->status = state->call.status;
- talloc_free(state);
- return 0;
-}
-
-
diff --git a/source4/cluster/ctdb/common/ctdb_client.c b/source4/cluster/ctdb/common/ctdb_client.c
deleted file mode 100644
index e3806f517d..0000000000
--- a/source4/cluster/ctdb/common/ctdb_client.c
+++ /dev/null
@@ -1,681 +0,0 @@
-/*
- ctdb daemon code
-
- Copyright (C) Andrew Tridgell 2007
- Copyright (C) Ronnie Sahlberg 2007
-
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 3 of the License, or (at your option) any later version.
-
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "includes.h"
-#include "db_wrap.h"
-#include "lib/tdb/include/tdb.h"
-#include "lib/events/events.h"
-#include "lib/util/dlinklist.h"
-#include "system/network.h"
-#include "system/filesys.h"
-#include "../include/ctdb.h"
-#include "../include/ctdb_private.h"
-
-/*
- queue a packet for sending from client to daemon
-*/
-static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
-{
- return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
-}
-
-
-/*
- handle a connect wait reply packet
- */
-static void ctdb_reply_connect_wait(struct ctdb_context *ctdb,
- struct ctdb_req_header *hdr)
-{
- struct ctdb_reply_connect_wait *r = (struct ctdb_reply_connect_wait *)hdr;
- ctdb->num_connected = r->num_connected;
-}
-
-/*
- state of a in-progress ctdb call in client
-*/
-struct ctdb_client_call_state {
- enum call_state state;
- uint32_t reqid;
- struct ctdb_db_context *ctdb_db;
- struct ctdb_call call;
-};
-
-/*
- called when a CTDB_REPLY_CALL packet comes in in the client
-
- This packet comes in response to a CTDB_REQ_CALL request packet. It
- contains any reply data from the call
-*/
-static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
-{
- struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
- struct ctdb_client_call_state *state;
-
- state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_client_call_state);
- if (state == NULL) {
- DEBUG(0, ("reqid %d not found\n", hdr->reqid));
- return;
- }
-
- state->call.reply_data.dptr = c->data;
- state->call.reply_data.dsize = c->datalen;
- state->call.status = c->status;
-
- talloc_steal(state, c);
-
- state->state = CTDB_CALL_DONE;
-}
-
-static void ctdb_reply_status(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
-
-/*
- this is called in the client, when data comes in from the daemon
- */
-static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
-{
- struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
- struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
- TALLOC_CTX *tmp_ctx;
-
- /* place the packet as a child of a tmp_ctx. We then use
- talloc_free() below to free it. If any of the calls want
- to keep it, then they will steal it somewhere else, and the
- talloc_free() will be a no-op */
- tmp_ctx = talloc_new(ctdb);
- talloc_steal(tmp_ctx, hdr);
-
- if (cnt == 0) {
- DEBUG(2,("Daemon has exited - shutting down client\n"));
- exit(0);
- }
-
- if (cnt < sizeof(*hdr)) {
- DEBUG(0,("Bad packet length %d in client\n", cnt));
- goto done;
- }
- if (cnt != hdr->length) {
- ctdb_set_error(ctdb, "Bad header length %d expected %d in client\n",
- hdr->length, cnt);
- goto done;
- }
-
- if (hdr->ctdb_magic != CTDB_MAGIC) {
- ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
- goto done;
- }
-
- if (hdr->ctdb_version != CTDB_VERSION) {
- ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
- goto done;
- }
-
- switch (hdr->operation) {
- case CTDB_REPLY_CALL:
- ctdb_client_reply_call(ctdb, hdr);
- break;
-
- case CTDB_REQ_MESSAGE:
- ctdb_request_message(ctdb, hdr);
- break;
-
- case CTDB_REPLY_CONNECT_WAIT:
- ctdb_reply_connect_wait(ctdb, hdr);
- break;
-
- case CTDB_REPLY_STATUS:
- ctdb_reply_status(ctdb, hdr);
- break;
-
- default:
- DEBUG(0,("bogus operation code:%d\n",hdr->operation));
- }
-
-done:
- talloc_free(tmp_ctx);
-}
-
-/*
- connect to a unix domain socket
-*/
-int ctdb_socket_connect(struct ctdb_context *ctdb)
-{
- struct sockaddr_un addr;
-
- memset(&addr, 0, sizeof(addr));
- addr.sun_family = AF_UNIX;
- strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
-
- ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
- if (ctdb->daemon.sd == -1) {
- return -1;
- }
-
- if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
- close(ctdb->daemon.sd);
- ctdb->daemon.sd = -1;
- return -1;
- }
-
- ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
- CTDB_DS_ALIGNMENT,
- ctdb_client_read_cb, ctdb);
- return 0;
-}
-
-
-struct ctdb_record_handle {
- struct ctdb_db_context *ctdb_db;
- TDB_DATA key;
- TDB_DATA *data;
- struct ctdb_ltdb_header header;
-};
-
-
-/*
- make a recv call to the local ctdb daemon - called from client context
-
- This is called when the program wants to wait for a ctdb_call to complete and get the
- results. This call will block unless the call has already completed.
-*/
-int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
-{
- while (state->state < CTDB_CALL_DONE) {
- event_loop_once(state->ctdb_db->ctdb->ev);
- }
- if (state->state != CTDB_CALL_DONE) {
- DEBUG(0,(__location__ " ctdb_call_recv failed\n"));
- talloc_free(state);
- return -1;
- }
-
- if (state->call.reply_data.dsize) {
- call->reply_data.dptr = talloc_memdup(state->ctdb_db,
- state->call.reply_data.dptr,
- state->call.reply_data.dsize);
- call->reply_data.dsize = state->call.reply_data.dsize;
- } else {
- call->reply_data.dptr = NULL;
- call->reply_data.dsize = 0;
- }
- call->status = state->call.status;
- talloc_free(state);
-
- return 0;
-}
-
-
-
-
-/*
- destroy a ctdb_call in client
-*/
-static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)
-{
- idr_remove(state->ctdb_db->ctdb->idr, state->reqid);
- return 0;
-}
-
-/*
- construct an event driven local ctdb_call
-
- this is used so that locally processed ctdb_call requests are processed
- in an event driven manner
-*/
-static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db,
- struct ctdb_call *call,
- struct ctdb_ltdb_header *header,
- TDB_DATA *data)
-{
- struct ctdb_client_call_state *state;
- struct ctdb_context *ctdb = ctdb_db->ctdb;
- int ret;
-
- state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
- CTDB_NO_MEMORY_NULL(ctdb, state);
-
- talloc_steal(state, data->dptr);
-
- state->state = CTDB_CALL_DONE;
- state->call = *call;
- state->ctdb_db = ctdb_db;
-
- ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn);
- talloc_steal(state, state->call.reply_data.dptr);
-
- return state;
-}
-
-/*
- make a ctdb call to the local daemon - async send. Called from client context.
-
- This constructs a ctdb_call request and queues it for processing.
- This call never blocks.
-*/
-struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
- struct ctdb_call *call)
-{
- struct ctdb_client_call_state *state;
- struct ctdb_context *ctdb = ctdb_db->ctdb;
- struct ctdb_ltdb_header header;
- TDB_DATA data;
- int ret;
- size_t len;
- struct ctdb_req_call *c;
-
- /* if the domain socket is not yet open, open it */
- if (ctdb->daemon.sd==-1) {
- ctdb_socket_connect(ctdb);
- }
-
- ret = ctdb_ltdb_lock(ctdb_db, call->key);
- if (ret != 0) {
- DEBUG(0,(__location__ " Failed to get chainlock\n"));
- return NULL;
- }
-
- ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
- if (ret != 0) {
- ctdb_ltdb_unlock(ctdb_db, call->key);
- DEBUG(0,(__location__ " Failed to fetch record\n"));
- return NULL;
- }
-
- if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
- state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
- talloc_free(data.dptr);
- ctdb_ltdb_unlock(ctdb_db, call->key);
- return state;
- }
-
- ctdb_ltdb_unlock(ctdb_db, call->key);
- talloc_free(data.dptr);
-
- state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
- if (state == NULL) {
- DEBUG(0, (__location__ " failed to allocate state\n"));
- return NULL;
- }
-
- len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
- c = ctdbd_allocate_pkt(state, len);
- if (c == NULL) {
- DEBUG(0, (__location__ " failed to allocate packet\n"));
- return NULL;
- }
- talloc_set_name_const(c, "ctdb client req_call packet");
- memset(c, 0, offsetof(struct ctdb_req_call, data));
-
- c->hdr.length = len;
- c->hdr.ctdb_magic = CTDB_MAGIC;
- c->hdr.ctdb_version = CTDB_VERSION;
- c->hdr.operation = CTDB_REQ_CALL;
- /* this limits us to 16k outstanding messages - not unreasonable */
- c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
- c->flags = call->flags;
- c->db_id = ctdb_db->db_id;
- c->callid = call->call_id;
- c->keylen = call->key.dsize;
- c->calldatalen = call->call_data.dsize;
- memcpy(&c->data[0], call->key.dptr, call->key.dsize);
- memcpy(&c->data[call->key.dsize],
- call->call_data.dptr, call->call_data.dsize);
- state->call = *call;
- state->call.call_data.dptr = &c->data[call->key.dsize];
- state->call.key.dptr = &c->data[0];
-
- state->state = CTDB_CALL_WAIT;
- state->ctdb_db = ctdb_db;
- state->reqid = c->hdr.reqid;
-
- talloc_set_destructor(state, ctdb_client_call_destructor);
-
- ctdb_client_queue_pkt(ctdb, &c->hdr);
-
- return state;
-}
-
-
-/*
- full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
-*/
-int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
-{
- struct ctdb_client_call_state *state;
-
- state = ctdb_call_send(ctdb_db, call);
- return ctdb_call_recv(state, call);
-}
-
-
-/*
- tell the daemon what messaging srvid we will use, and register the message
- handler function in the client
-*/
-int ctdb_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid,
- ctdb_message_fn_t handler,
- void *private_data)
-
-{
- struct ctdb_req_register c;
- int res;
-
- /* if the domain socket is not yet open, open it */
- if (ctdb->daemon.sd==-1) {
- ctdb_socket_connect(ctdb);
- }
-
- ZERO_STRUCT(c);
-
- c.hdr.length = sizeof(c);
- c.hdr.ctdb_magic = CTDB_MAGIC;
- c.hdr.ctdb_version = CTDB_VERSION;
- c.hdr.operation = CTDB_REQ_REGISTER;
- c.srvid = srvid;
-
- res = ctdb_client_queue_pkt(ctdb, &c.hdr);
- if (res != 0) {
- return res;
- }
-
- /* also need to register the handler with our ctdb structure */
- return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
-}
-
-
-/*
- send a message - from client context
- */
-int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
- uint32_t srvid, TDB_DATA data)
-{
- struct ctdb_req_message *r;
- int len, res;
-
- len = offsetof(struct ctdb_req_message, data) + data.dsize;
- r = ctdb->methods->allocate_pkt(ctdb, len);
- CTDB_NO_MEMORY(ctdb, r);
- talloc_set_name_const(r, "req_message packet");
-
- r->hdr.length = len;
- r->hdr.ctdb_magic = CTDB_MAGIC;
- r->hdr.ctdb_version = CTDB_VERSION;
- r->hdr.operation = CTDB_REQ_MESSAGE;
- r->hdr.destnode = vnn;
- r->hdr.srcnode = ctdb->vnn;
- r->hdr.reqid = 0;
- r->srvid = srvid;
- r->datalen = data.dsize;
- memcpy(&r->data[0], data.dptr, data.dsize);
-
- res = ctdb_client_queue_pkt(ctdb, &r->hdr);
- if (res != 0) {
- return res;
- }
-
- talloc_free(r);
- return 0;
-}
-
-/*
- wait for all nodes to be connected - from client
- */
-void ctdb_connect_wait(struct ctdb_context *ctdb)
-{
- struct ctdb_req_connect_wait r;
- int res;
-
- ZERO_STRUCT(r);
-
- r.hdr.length = sizeof(r);
- r.hdr.ctdb_magic = CTDB_MAGIC;
- r.hdr.ctdb_version = CTDB_VERSION;
- r.hdr.operation = CTDB_REQ_CONNECT_WAIT;
-
- DEBUG(3,("ctdb_connect_wait: sending to ctdbd\n"));
-
- /* if the domain socket is not yet open, open it */
- if (ctdb->daemon.sd==-1) {
- ctdb_socket_connect(ctdb);
- }
-
- res = ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)&r.hdr, r.hdr.length);
- if (res != 0) {
- DEBUG(0,(__location__ " Failed to queue a connect wait request\n"));
- return;
- }
-
- DEBUG(3,("ctdb_connect_wait: waiting\n"));
-
- /* now we can go into the normal wait routine, as the reply packet
- will update the ctdb->num_connected variable */
- ctdb_daemon_connect_wait(ctdb);
-}
-
-/*
- cancel a ctdb_fetch_lock operation, releasing the lock
- */
-static int fetch_lock_destructor(struct ctdb_record_handle *h)
-{
- ctdb_ltdb_unlock(h->ctdb_db, h->key);
- return 0;
-}
-
-/*
- force the migration of a record to this node
- */
-static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
-{
- struct ctdb_call call;
- ZERO_STRUCT(call);
- call.call_id = CTDB_NULL_FUNC;
- call.key = key;
- call.flags = CTDB_IMMEDIATE_MIGRATION;
- return ctdb_call(ctdb_db, &call);
-}
-
-/*
- get a lock on a record, and return the records data. Blocks until it gets the lock
- */
-struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
- TDB_DATA key, TDB_DATA *data)
-{
- int ret;
- struct ctdb_record_handle *h;
-
- /*
- procedure is as follows:
-
- 1) get the chain lock.
- 2) check if we are dmaster
- 3) if we are the dmaster then return handle
- 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
- reply from ctdbd
- 5) when we get the reply, goto (1)
- */
-
- h = talloc_zero(mem_ctx, struct ctdb_record_handle);
- if (h == NULL) {
- return NULL;
- }
-
- h->ctdb_db = ctdb_db;
- h->key = key;
- h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
- if (h->key.dptr == NULL) {
- talloc_free(h);
- return NULL;
- }
- h->data = data;
-
- DEBUG(3,("ctdb_fetch_lock: key=%*.*s\n", key.dsize, key.dsize,
- (const char *)key.dptr));
-
-again:
- /* step 1 - get the chain lock */
- ret = ctdb_ltdb_lock(ctdb_db, key);
- if (ret != 0) {
- DEBUG(0, (__location__ " failed to lock ltdb record\n"));
- talloc_free(h);
- return NULL;
- }
-
- DEBUG(4,("ctdb_fetch_lock: got chain lock\n"));
-
- talloc_set_destructor(h, fetch_lock_destructor);
-
- ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
- if (ret != 0) {
- ctdb_ltdb_unlock(ctdb_db, key);
- talloc_free(h);
- return NULL;
- }
-
- /* when torturing, ensure we test the remote path */
- if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
- random() % 5 == 0) {
- h->header.dmaster = (uint32_t)-1;
- }
-
-
- DEBUG(4,("ctdb_fetch_lock: done local fetch\n"));
-
- if (h->header.dmaster != ctdb_db->ctdb->vnn) {
- ctdb_ltdb_unlock(ctdb_db, key);
- ret = ctdb_client_force_migration(ctdb_db, key);
- if (ret != 0) {
- DEBUG(4,("ctdb_fetch_lock: force_migration failed\n"));
- talloc_free(h);
- return NULL;
- }
- goto again;
- }
-
- DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n"));
- return h;
-}
-
-/*
- store some data to the record that was locked with ctdb_fetch_lock()
-*/
-int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
-{
- return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
-}
-
-/*
- wait until we're the only node left.
- this function never returns
-*/
-void ctdb_shutdown(struct ctdb_context *ctdb)
-{
- struct ctdb_req_shutdown r;
- int len;
-
- /* if the domain socket is not yet open, open it */
- if (ctdb->daemon.sd==-1) {
- ctdb_socket_connect(ctdb);
- }
-
- len = sizeof(struct ctdb_req_shutdown);
- ZERO_STRUCT(r);
- r.hdr.length = len;
- r.hdr.ctdb_magic = CTDB_MAGIC;
- r.hdr.ctdb_version = CTDB_VERSION;
- r.hdr.operation = CTDB_REQ_SHUTDOWN;
- r.hdr.reqid = 0;
-
- ctdb_client_queue_pkt(ctdb, &(r.hdr));
-
- /* this event loop will terminate once we receive the reply */
- while (1) {
- event_loop_once(ctdb->ev);
- }
-}
-
-enum ctdb_status_states {CTDB_STATUS_WAIT, CTDB_STATUS_DONE};
-
-struct ctdb_status_state {
- uint32_t reqid;
- struct ctdb_status *status;
- enum ctdb_status_states state;
-};
-
-/*
- handle a ctdb_reply_status reply
- */
-static void ctdb_reply_status(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
-{
- struct ctdb_reply_status *r = (struct ctdb_reply_status *)hdr;
- struct ctdb_status_state *state;
-
- state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_status_state);
- if (state == NULL) {
- DEBUG(0, ("reqid %d not found\n", hdr->reqid));
- return;
- }
-
- *state->status = r->status;
- state->state = CTDB_STATUS_DONE;
-}
-
-/*
- wait until we're the only node left.
- this function never returns
-*/
-int ctdb_status(struct ctdb_context *ctdb, struct ctdb_status *status)
-{
- struct ctdb_req_status r;
- int ret;
- struct ctdb_status_state *state;
-
- /* if the domain socket is not yet open, open it */
- if (ctdb->daemon.sd==-1) {
- ctdb_socket_connect(ctdb);
- }
-
- state = talloc(ctdb, struct ctdb_status_state);
- CTDB_NO_MEMORY(ctdb, state);
-
- state->reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
- state->status = status;
- state->state = CTDB_STATUS_WAIT;
-
- ZERO_STRUCT(r);
- r.hdr.length = sizeof(r);
- r.hdr.ctdb_magic = CTDB_MAGIC;
- r.hdr.ctdb_version = CTDB_VERSION;
- r.hdr.operation = CTDB_REQ_STATUS;
- r.hdr.reqid = state->reqid;
-
- ret = ctdb_client_queue_pkt(ctdb, &(r.hdr));
- if (ret != 0) {
- talloc_free(state);
- return -1;
- }
-
- while (state->state == CTDB_STATUS_WAIT) {
- event_loop_once(ctdb->ev);
- }
-
- talloc_free(state);
-
- return 0;
-}
-
diff --git a/source4/cluster/ctdb/common/ctdb_daemon.c b/source4/cluster/ctdb/common/ctdb_daemon.c
deleted file mode 100644
index f1de3933e7..0000000000
--- a/source4/cluster/ctdb/common/ctdb_daemon.c
+++ /dev/null
@@ -1,710 +0,0 @@
-/*
- ctdb daemon code
-
- Copyright (C) Andrew Tridgell 2006
-
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 3 of the License, or (at your option) any later version.
-
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "includes.h"
-#include "db_wrap.h"
-#include "lib/tdb/include/tdb.h"
-#include "lib/events/events.h"
-#include "lib/util/dlinklist.h"
-#include "system/network.h"
-#include "system/filesys.h"
-#include "system/wait.h"
-#include "../include/ctdb.h"
-#include "../include/ctdb_private.h"
-
-/*
- structure describing a connected client in the daemon
- */
-struct ctdb_client {
- struct ctdb_context *ctdb;
- int fd;
- struct ctdb_queue *queue;
-};
-
-
-
-static void daemon_incoming_packet(void *, uint8_t *, uint32_t );
-
-static void ctdb_main_loop(struct ctdb_context *ctdb)
-{
- ctdb->methods->start(ctdb);
-
- /* go into a wait loop to allow other nodes to complete */
- event_loop_wait(ctdb->ev);
-
- DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
- exit(1);
-}
-
-
-static void set_non_blocking(int fd)
-{
- unsigned v;
- v = fcntl(fd, F_GETFL, 0);
- fcntl(fd, F_SETFL, v | O_NONBLOCK);
-}
-
-static void block_signal(int signum)
-{
- struct sigaction act;
-
- memset(&act, 0, sizeof(act));
-
- act.sa_handler = SIG_IGN;
- sigemptyset(&act.sa_mask);
- sigaddset(&act.sa_mask, signum);
- sigaction(signum, &act, NULL);
-}
-
-
-/*
- send a packet to a client
- */
-static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
-{
- client->ctdb->status.client_packets_sent++;
- return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
-}
-
-/*
- message handler for when we are in daemon mode. This redirects the message
- to the right client
- */
-static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid,
- TDB_DATA data, void *private_data)
-{
- struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
- struct ctdb_req_message *r;
- int len;
-
- /* construct a message to send to the client containing the data */
- len = offsetof(struct ctdb_req_message, data) + data.dsize;
- r = ctdbd_allocate_pkt(ctdb, len);
-
- talloc_set_name_const(r, "req_message packet");
-
- memset(r, 0, offsetof(struct ctdb_req_message, data));
-
- r->hdr.length = len;
- r->hdr.ctdb_magic = CTDB_MAGIC;
- r->hdr.ctdb_version = CTDB_VERSION;
- r->hdr.operation = CTDB_REQ_MESSAGE;
- r->srvid = srvid;
- r->datalen = data.dsize;
- memcpy(&r->data[0], data.dptr, data.dsize);
-
- daemon_queue_send(client, &r->hdr);
-
- talloc_free(r);
-}
-
-
-/*
- this is called when the ctdb daemon received a ctdb request to
- set the srvid from the client
- */
-static void daemon_request_register_message_handler(struct ctdb_client *client,
- struct ctdb_req_register *c)
-{
- int res;
- res = ctdb_register_message_handler(client->ctdb, client,
- c->srvid, daemon_message_handler,
- client);
- if (res != 0) {
- DEBUG(0,(__location__ " Failed to register handler %u in daemon\n",
- c->srvid));
- } else {
- DEBUG(2,(__location__ " Registered message handler for srvid=%u\n",
- c->srvid));
- }
-}
-
-
-/*
- called when the daemon gets a shutdown request from a client
- */
-static void daemon_request_shutdown(struct ctdb_client *client,
- struct ctdb_req_shutdown *f)
-{
- struct ctdb_context *ctdb = talloc_get_type(client->ctdb, struct ctdb_context);
- int len;
- uint32_t node;
-
- /* we dont send to ourself so we can already count one daemon as
- exiting */
- ctdb->num_finished++;
-
-
- /* loop over all nodes of the cluster */
- for (node=0; node<ctdb->num_nodes;node++) {
- struct ctdb_req_finished *rf;
-
- /* dont send a message to ourself */
- if (ctdb->vnn == node) {
- continue;
- }
-
- len = sizeof(struct ctdb_req_finished);
- rf = ctdb->methods->allocate_pkt(ctdb, len);
- CTDB_NO_MEMORY_FATAL(ctdb, rf);
- talloc_set_name_const(rf, "ctdb_req_finished packet");
-
- ZERO_STRUCT(*rf);
- rf->hdr.length = len;
- rf->hdr.ctdb_magic = CTDB_MAGIC;
- rf->hdr.ctdb_version = CTDB_VERSION;
- rf->hdr.operation = CTDB_REQ_FINISHED;
- rf->hdr.destnode = node;
- rf->hdr.srcnode = ctdb->vnn;
- rf->hdr.reqid = 0;
-
- ctdb_queue_packet(ctdb, &(rf->hdr));
-
- talloc_free(rf);
- }
-
- /* wait until all nodes have are prepared to shutdown */
- while (ctdb->num_finished != ctdb->num_nodes) {
- event_loop_once(ctdb->ev);
- }
-
- /* all daemons have requested to finish - we now exit */
- DEBUG(1,("All daemons finished - exiting\n"));
- _exit(0);
-}
-
-
-
-/*
- called when the daemon gets a connect wait request from a client
- */
-static void daemon_request_connect_wait(struct ctdb_client *client,
- struct ctdb_req_connect_wait *c)
-{
- struct ctdb_reply_connect_wait r;
- int res;
-
- /* first wait - in the daemon */
- ctdb_daemon_connect_wait(client->ctdb);
-
- /* now send the reply */
- ZERO_STRUCT(r);
-
- r.hdr.length = sizeof(r);
- r.hdr.ctdb_magic = CTDB_MAGIC;
- r.hdr.ctdb_version = CTDB_VERSION;
- r.hdr.operation = CTDB_REPLY_CONNECT_WAIT;
- r.vnn = ctdb_get_vnn(client->ctdb);
- r.num_connected = client->ctdb->num_connected;
-
- res = daemon_queue_send(client, &r.hdr);
- if (res != 0) {
- DEBUG(0,(__location__ " Failed to queue a connect wait response\n"));
- return;
- }
-}
-
-
-/*
- called when the daemon gets a status request from a client
- */
-static void daemon_request_status(struct ctdb_client *client,
- struct ctdb_req_status *c)
-{
- struct ctdb_reply_status r;
- int res;
-
- /* now send the reply */
- ZERO_STRUCT(r);
-
- r.hdr.length = sizeof(r);
- r.hdr.ctdb_magic = CTDB_MAGIC;
- r.hdr.ctdb_version = CTDB_VERSION;
- r.hdr.operation = CTDB_REPLY_STATUS;
- r.hdr.reqid = c->hdr.reqid;
- r.status = client->ctdb->status;
-
- res = daemon_queue_send(client, &r.hdr);
- if (res != 0) {
- DEBUG(0,(__location__ " Failed to queue a connect wait response\n"));
- return;
- }
-}
-
-/*
- destroy a ctdb_client
-*/
-static int ctdb_client_destructor(struct ctdb_client *client)
-{
- close(client->fd);
- client->fd = -1;
- return 0;
-}
-
-
-/*
- this is called when the ctdb daemon received a ctdb request message
- from a local client over the unix domain socket
- */
-static void daemon_request_message_from_client(struct ctdb_client *client,
- struct ctdb_req_message *c)
-{
- TDB_DATA data;
- int res;
-
- /* maybe the message is for another client on this node */
- if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
- ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
- return;
- }
-
- /* its for a remote node */
- data.dptr = &c->data[0];
- data.dsize = c->datalen;
- res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
- c->srvid, data);
- if (res != 0) {
- DEBUG(0,(__location__ " Failed to send message to remote node %u\n",
- c->hdr.destnode));
- }
-}
-
-
-struct daemon_call_state {
- struct ctdb_client *client;
- uint32_t reqid;
- struct ctdb_call *call;
- struct timeval start_time;
-};
-
-/*
- complete a call from a client
-*/
-static void daemon_call_from_client_callback(struct ctdb_call_state *state)
-{
- struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
- struct daemon_call_state);
- struct ctdb_reply_call *r;
- int res;
- uint32_t length;
- struct ctdb_client *client = dstate->client;
-
- talloc_steal(client, dstate);
- talloc_steal(dstate, dstate->call);
-
- res = ctdb_daemon_call_recv(state, dstate->call);
- if (res != 0) {
- DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
- client->ctdb->status.pending_calls--;
- ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
- return;
- }
-
- length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
- r = ctdbd_allocate_pkt(dstate, length);
- if (r == NULL) {
- DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
- client->ctdb->status.pending_calls--;
- ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
- return;
- }
- memset(r, 0, offsetof(struct ctdb_reply_call, data));
- r->hdr.length = length;
- r->hdr.ctdb_magic = CTDB_MAGIC;
- r->hdr.ctdb_version = CTDB_VERSION;
- r->hdr.operation = CTDB_REPLY_CALL;
- r->hdr.reqid = dstate->reqid;
- r->datalen = dstate->call->reply_data.dsize;
- memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
-
- res = daemon_queue_send(client, &r->hdr);
- if (res != 0) {
- DEBUG(0, (__location__ "Failed to queue packet from daemon to client\n"));
- }
- ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
- talloc_free(dstate);
- client->ctdb->status.pending_calls--;
-}
-
-
-/*
- this is called when the ctdb daemon received a ctdb request call
- from a local client over the unix domain socket
- */
-static void daemon_request_call_from_client(struct ctdb_client *client,
- struct ctdb_req_call *c)
-{
- struct ctdb_call_state *state;
- struct ctdb_db_context *ctdb_db;
- struct daemon_call_state *dstate;
- struct ctdb_call *call;
- struct ctdb_ltdb_header header;
- TDB_DATA key, data;
- int ret;
- struct ctdb_context *ctdb = client->ctdb;
-
- ctdb->status.total_calls++;
- ctdb->status.pending_calls++;
-
- ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
- if (!ctdb_db) {
- DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
- c->db_id));
- ctdb->status.pending_calls--;
- return;
- }
-
- key.dptr = c->data;
- key.dsize = c->keylen;
-
- ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
- (struct ctdb_req_header *)c, &data,
- daemon_incoming_packet, client);
- if (ret == -2) {
- /* will retry later */
- ctdb->status.pending_calls--;
- return;
- }
-
- if (ret != 0) {
- DEBUG(0,(__location__ " Unable to fetch record\n"));
- ctdb->status.pending_calls--;
- return;
- }
-
- dstate = talloc(client, struct daemon_call_state);
- if (dstate == NULL) {
- ctdb_ltdb_unlock(ctdb_db, key);
- DEBUG(0,(__location__ " Unable to allocate dstate\n"));
- ctdb->status.pending_calls--;
- return;
- }
- dstate->start_time = timeval_current();
- dstate->client = client;
- dstate->reqid = c->hdr.reqid;
- talloc_steal(dstate, data.dptr);
-
- call = dstate->call = talloc_zero(dstate, struct ctdb_call);
- if (call == NULL) {
- ctdb_ltdb_unlock(ctdb_db, key);
- DEBUG(0,(__location__ " Unable to allocate call\n"));
- ctdb->status.pending_calls--;
- ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time);
- return;
- }
-
- call->call_id = c->callid;
- call->key = key;
- call->call_data.dptr = c->data + c->keylen;
- call->call_data.dsize = c->calldatalen;
- call->flags = c->flags;
-
- if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
- state = ctdb_call_local_send(ctdb_db, call, &header, &data);
- } else {
- state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
- }
-
- ctdb_ltdb_unlock(ctdb_db, key);
-
- if (state == NULL) {
- DEBUG(0,(__location__ " Unable to setup call send\n"));
- ctdb->status.pending_calls--;
- ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time);
- return;
- }
- talloc_steal(state, dstate);
- talloc_steal(client, state);
-
- state->async.fn = daemon_call_from_client_callback;
- state->async.private_data = dstate;
-}
-
-/* data contains a packet from the client */
-static void daemon_incoming_packet(void *p, uint8_t *data, uint32_t nread)
-{
- struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
- struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
- TALLOC_CTX *tmp_ctx;
- struct ctdb_context *ctdb = client->ctdb;
-
- /* place the packet as a child of a tmp_ctx. We then use
- talloc_free() below to free it. If any of the calls want
- to keep it, then they will steal it somewhere else, and the
- talloc_free() will be a no-op */
- tmp_ctx = talloc_new(client);
- talloc_steal(tmp_ctx, hdr);
-
- if (hdr->ctdb_magic != CTDB_MAGIC) {
- ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
- goto done;
- }
-
- if (hdr->ctdb_version != CTDB_VERSION) {
- ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
- goto done;
- }
-
- switch (hdr->operation) {
- case CTDB_REQ_CALL:
- ctdb->status.client.req_call++;
- daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
- break;
-
- case CTDB_REQ_REGISTER:
- ctdb->status.client.req_register++;
- daemon_request_register_message_handler(client,
- (struct ctdb_req_register *)hdr);
- break;
- case CTDB_REQ_MESSAGE:
- ctdb->status.client.req_message++;
- daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
- break;
-
- case CTDB_REQ_CONNECT_WAIT:
- ctdb->status.client.req_connect_wait++;
- daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
- break;
-
- case CTDB_REQ_SHUTDOWN:
- ctdb->status.client.req_shutdown++;
- daemon_request_shutdown(client, (struct ctdb_req_shutdown *)hdr);
- break;
-
- case CTDB_REQ_STATUS:
- ctdb->status.client.req_status++;
- daemon_request_status(client, (struct ctdb_req_status *)hdr);
- break;
-
- default:
- DEBUG(0,(__location__ " daemon: unrecognized operation %d\n",
- hdr->operation));
- }
-
-done:
- talloc_free(tmp_ctx);
-}
-
-/*
- called when the daemon gets a incoming packet
- */
-static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
-{
- struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
- struct ctdb_req_header *hdr;
-
- if (cnt == 0) {
- talloc_free(client);
- return;
- }
-
- client->ctdb->status.client_packets_recv++;
-
- if (cnt < sizeof(*hdr)) {
- ctdb_set_error(client->ctdb, "Bad packet length %d in daemon\n", cnt);
- return;
- }
- hdr = (struct ctdb_req_header *)data;
- if (cnt != hdr->length) {
- ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n in daemon",
- hdr->length, cnt);
- return;
- }
-
- if (hdr->ctdb_magic != CTDB_MAGIC) {
- ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
- return;
- }
-
- if (hdr->ctdb_version != CTDB_VERSION) {
- ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
- return;
- }
-
- DEBUG(3,(__location__ " client request %d of type %d length %d from "
- "node %d to %d\n", hdr->reqid, hdr->operation, hdr->length,
- hdr->srcnode, hdr->destnode));
-
- /* it is the responsibility of the incoming packet function to free 'data' */
- daemon_incoming_packet(client, data, cnt);
-}
-
-static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private_data)
-{
- struct sockaddr_in addr;
- socklen_t len;
- int fd;
- struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
- struct ctdb_client *client;
-
- memset(&addr, 0, sizeof(addr));
- len = sizeof(addr);
- fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
- if (fd == -1) {
- return;
- }
- set_non_blocking(fd);
-
- client = talloc_zero(ctdb, struct ctdb_client);
- client->ctdb = ctdb;
- client->fd = fd;
-
- client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
- ctdb_daemon_read_cb, client);
-
- talloc_set_destructor(client, ctdb_client_destructor);
-}
-
-
-
-static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private_data)
-{
- int *fd = private_data;
- int cnt;
- char buf;
-
- /* XXX this is a good place to try doing some cleaning up before exiting */
- cnt = read(*fd, &buf, 1);
- if (cnt==0) {
- DEBUG(2,(__location__ " parent process exited. filedescriptor dissappeared\n"));
- exit(1);
- } else {
- DEBUG(0,(__location__ " ctdb: did not expect data from parent process\n"));
- exit(1);
- }
-}
-
-
-
-/*
- create a unix domain socket and bind it
- return a file descriptor open on the socket
-*/
-static int ux_socket_bind(struct ctdb_context *ctdb)
-{
- struct sockaddr_un addr;
-
- ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
- if (ctdb->daemon.sd == -1) {
- ctdb->daemon.sd = -1;
- return -1;
- }
-
- set_non_blocking(ctdb->daemon.sd);
-
- memset(&addr, 0, sizeof(addr));
- addr.sun_family = AF_UNIX;
- strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
-
- if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
- close(ctdb->daemon.sd);
- ctdb->daemon.sd = -1;
- return -1;
- }
- listen(ctdb->daemon.sd, 1);
-
- return 0;
-}
-
-/*
- delete the socket on exit - called on destruction of autofree context
- */
-static int unlink_destructor(const char *name)
-{
- unlink(name);
- return 0;
-}
-
-/*
- start the protocol going
-*/
-int ctdb_start(struct ctdb_context *ctdb)
-{
- pid_t pid;
- static int fd[2];
- int res;
- struct fd_event *fde;
- const char *domain_socket_name;
-
- /* generate a name to use for our local socket */
- ctdb->daemon.name = talloc_asprintf(ctdb, "%s.%s", CTDB_PATH, ctdb->address.address);
- /* get rid of any old sockets */
- unlink(ctdb->daemon.name);
-
- /* create a unix domain stream socket to listen to */
- res = ux_socket_bind(ctdb);
- if (res!=0) {
- DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
- exit(10);
- }
-
- res = pipe(&fd[0]);
- if (res) {
- DEBUG(0,(__location__ " Failed to open pipe for CTDB\n"));
- exit(1);
- }
- pid = fork();
- if (pid==-1) {
- DEBUG(0,(__location__ " Failed to fork CTDB daemon\n"));
- exit(1);
- }
-
- if (pid) {
- close(fd[0]);
- close(ctdb->daemon.sd);
- ctdb->daemon.sd = -1;
- return 0;
- }
-
- block_signal(SIGPIPE);
-
- /* ensure the socket is deleted on exit of the daemon */
- domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
- talloc_set_destructor(domain_socket_name, unlink_destructor);
-
- close(fd[1]);
-
- ctdb->ev = event_context_init(NULL);
- fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]);
- fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb);
- ctdb_main_loop(ctdb);
-
- return 0;
-}
-
-/*
- allocate a packet for use in client<->daemon communication
- */
-void *ctdbd_allocate_pkt(TALLOC_CTX *mem_ctx, size_t len)
-{
- int size;
-
- size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
- return talloc_size(mem_ctx, size);
-}
-
-/*
- called when a CTDB_REQ_FINISHED packet comes in
-*/
-void ctdb_request_finished(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
-{
- ctdb->num_finished++;
-}
diff --git a/source4/cluster/ctdb/common/ctdb_io.c b/source4/cluster/ctdb/common/ctdb_io.c
index 517fbbd842..3cc522b58a 100644
--- a/source4/cluster/ctdb/common/ctdb_io.c
+++ b/source4/cluster/ctdb/common/ctdb_io.c
@@ -6,18 +6,18 @@
Copyright (C) Andrew Tridgell 2006
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 3 of the License, or (at your option) any later version.
-
- This library is distributed in the hope that it will be useful,
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#include "includes.h"
@@ -39,6 +39,7 @@ struct ctdb_queue_pkt {
struct ctdb_queue_pkt *next, *prev;
uint8_t *data;
uint32_t length;
+ uint32_t full_length;
};
struct ctdb_queue {
@@ -63,8 +64,10 @@ static void queue_io_read(struct ctdb_queue *queue)
ssize_t nread;
uint8_t *data, *data_base;
- if (ioctl(queue->fd, FIONREAD, &num_ready) != 0 ||
- num_ready == 0) {
+ if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
+ return;
+ }
+ if (num_ready == 0) {
/* the descriptor has been closed */
goto failed;
}
@@ -74,11 +77,14 @@ static void queue_io_read(struct ctdb_queue *queue)
num_ready + queue->partial.length);
if (queue->partial.data == NULL) {
+ DEBUG(0,("read error alloc failed for %u\n",
+ num_ready + queue->partial.length));
goto failed;
}
nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready);
if (nread <= 0) {
+ DEBUG(0,("read error nread=%d\n", (int)nread));
goto failed;
}
@@ -103,8 +109,14 @@ static void queue_io_read(struct ctdb_queue *queue)
uint8_t *d2;
uint32_t len;
len = *(uint32_t *)data;
+ if (len == 0) {
+ /* bad packet! treat as EOF */
+ DEBUG(0,("Invalid packet of length 0\n"));
+ goto failed;
+ }
d2 = talloc_memdup(queue, data, len);
if (d2 == NULL) {
+ DEBUG(0,("read error memdup failed for %u\n", len));
/* sigh */
goto failed;
}
@@ -121,6 +133,8 @@ static void queue_io_read(struct ctdb_queue *queue)
} else {
queue->partial.data = talloc_memdup(queue, data, nread);
if (queue->partial.data == NULL) {
+ DEBUG(0,("read error memdup partial failed for %u\n",
+ (unsigned)nread));
goto failed;
}
queue->partial.length = nread;
@@ -154,13 +168,23 @@ static void queue_io_write(struct ctdb_queue *queue)
while (queue->out_queue) {
struct ctdb_queue_pkt *pkt = queue->out_queue;
ssize_t n;
-
- n = write(queue->fd, pkt->data, pkt->length);
+ if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
+ n = write(queue->fd, pkt->data, 1);
+ } else {
+ n = write(queue->fd, pkt->data, pkt->length);
+ }
if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ if (pkt->length != pkt->full_length) {
+ /* partial packet sent - we have to drop it */
+ DLIST_REMOVE(queue->out_queue, pkt);
+ talloc_free(pkt);
+ }
+ talloc_free(queue->fde);
+ queue->fde = NULL;
+ queue->fd = -1;
event_add_timed(queue->ctdb->ev, queue, timeval_zero(),
queue_dead, queue);
- EVENT_FD_NOT_WRITEABLE(queue->fde);
return;
}
if (n <= 0) return;
@@ -200,21 +224,31 @@ static void queue_io_handler(struct event_context *ev, struct fd_event *fde,
int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
{
struct ctdb_queue_pkt *pkt;
- uint32_t length2;
+ uint32_t length2, full_length;
- /* enforce the length and alignment rules from the tcp packet allocator */
- length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
- *(uint32_t *)data = length2;
+ if (queue->alignment) {
+ /* enforce the length and alignment rules from the tcp packet allocator */
+ length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
+ *(uint32_t *)data = length2;
+ } else {
+ length2 = length;
+ }
if (length2 != length) {
memset(data+length, 0, length2-length);
}
+
+ full_length = length2;
/* if the queue is empty then try an immediate write, avoiding
queue overhead. This relies on non-blocking sockets */
- if (queue->out_queue == NULL && queue->fd != -1) {
+ if (queue->out_queue == NULL && queue->fd != -1 &&
+ !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
ssize_t n = write(queue->fd, data, length2);
if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ talloc_free(queue->fde);
+ queue->fde = NULL;
+ queue->fd = -1;
event_add_timed(queue->ctdb->ev, queue, timeval_zero(),
queue_dead, queue);
/* yes, we report success, as the dead node is
@@ -235,6 +269,7 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
CTDB_NO_MEMORY(queue->ctdb, pkt->data);
pkt->length = length2;
+ pkt->full_length = full_length;
if (queue->out_queue == NULL && queue->fd != -1) {
EVENT_FD_WRITEABLE(queue->fde);
@@ -256,7 +291,7 @@ int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
queue->fde = NULL;
if (fd != -1) {
- queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ,
+ queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
queue_io_handler, queue);
if (queue->fde == NULL) {
return -1;
diff --git a/source4/cluster/ctdb/common/ctdb_lockwait.c b/source4/cluster/ctdb/common/ctdb_lockwait.c
deleted file mode 100644
index 2134cb95e9..0000000000
--- a/source4/cluster/ctdb/common/ctdb_lockwait.c
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- wait for a tdb chain lock
-
- Copyright (C) Andrew Tridgell 2006
-
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 3 of the License, or (at your option) any later version.
-
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "includes.h"
-#include "lib/events/events.h"
-#include "system/filesys.h"
-#include "system/wait.h"
-#include "db_wrap.h"
-#include "lib/tdb/include/tdb.h"
-#include "../include/ctdb_private.h"
-
-
-struct lockwait_handle {
- struct ctdb_context *ctdb;
- struct fd_event *fde;
- int fd[2];
- pid_t child;
- void *private_data;
- void (*callback)(void *);
- struct timeval start_time;
-};
-
-static void lockwait_handler(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private_data)
-{
- struct lockwait_handle *h = talloc_get_type(private_data,
- struct lockwait_handle);
- void (*callback)(void *) = h->callback;
- void *p = h->private_data;
- pid_t child = h->child;
- talloc_set_destructor(h, NULL);
- close(h->fd[0]);
- ctdb_latency(&h->ctdb->status.max_lockwait_latency, h->start_time);
- h->ctdb->status.pending_lockwait_calls--;
- talloc_free(h);
- callback(p);
- waitpid(child, NULL, 0);
-}
-
-static int lockwait_destructor(struct lockwait_handle *h)
-{
- h->ctdb->status.pending_lockwait_calls--;
- close(h->fd[0]);
- kill(h->child, SIGKILL);
- waitpid(h->child, NULL, 0);
- return 0;
-}
-
-/*
- setup a non-blocking chainlock on a tdb record. If this function
- returns NULL then it could not get the chainlock. Otherwise it
- returns a opaque handle, and will call callback() once it has
- managed to get the chainlock. You can cancel it by using talloc_free
- on the returned handle.
-
- It is the callers responsibility to unlock the chainlock once
- acquired
- */
-struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db,
- TDB_DATA key,
- void (*callback)(void *private_data),
- void *private_data)
-{
- struct lockwait_handle *result;
- int ret;
-
- ctdb_db->ctdb->status.lockwait_calls++;
- ctdb_db->ctdb->status.pending_lockwait_calls++;
-
- if (!(result = talloc_zero(ctdb_db, struct lockwait_handle))) {
- ctdb_db->ctdb->status.pending_lockwait_calls--;
- return NULL;
- }
-
- ret = pipe(result->fd);
-
- if (ret != 0) {
- talloc_free(result);
- ctdb_db->ctdb->status.pending_lockwait_calls--;
- return NULL;
- }
-
- result->child = fork();
-
- if (result->child == (pid_t)-1) {
- close(result->fd[0]);
- close(result->fd[1]);
- talloc_free(result);
- ctdb_db->ctdb->status.pending_lockwait_calls--;
- return NULL;
- }
-
- result->callback = callback;
- result->private_data = private_data;
- result->ctdb = ctdb_db->ctdb;
-
- if (result->child == 0) {
- close(result->fd[0]);
- /*
- * Do we need a tdb_reopen here?
- */
- tdb_chainlock(ctdb_db->ltdb->tdb, key);
- _exit(0);
- }
-
- close(result->fd[1]);
- talloc_set_destructor(result, lockwait_destructor);
-
- result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
- EVENT_FD_READ, lockwait_handler,
- (void *)result);
- if (result->fde == NULL) {
- talloc_free(result);
- ctdb_db->ctdb->status.pending_lockwait_calls--;
- return NULL;
- }
-
- result->start_time = timeval_current();
-
- return result;
-}
diff --git a/source4/cluster/ctdb/common/ctdb_ltdb.c b/source4/cluster/ctdb/common/ctdb_ltdb.c
index feb7207fcc..7db1523ca1 100644
--- a/source4/cluster/ctdb/common/ctdb_ltdb.c
+++ b/source4/cluster/ctdb/common/ctdb_ltdb.c
@@ -3,18 +3,18 @@
Copyright (C) Andrew Tridgell 2006
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 3 of the License, or (at your option) any later version.
-
- This library is distributed in the hope that it will be useful,
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#include "includes.h"
@@ -42,87 +42,16 @@ struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *na
/*
- this is the dummy null procedure that all databases support
-*/
-static int ctdb_null_func(struct ctdb_call_info *call)
-{
- return 0;
-}
-
-
-/*
- attach to a specific database
-*/
-struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags,
- int open_flags, mode_t mode)
-{
- struct ctdb_db_context *ctdb_db, *tmp_db;
- TDB_DATA data;
- int ret;
-
- ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
- CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
-
- ctdb_db->ctdb = ctdb;
- ctdb_db->db_name = talloc_strdup(ctdb_db, name);
- CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
-
- data.dptr = discard_const(name);
- data.dsize = strlen(name);
- ctdb_db->db_id = ctdb_hash(&data);
-
- for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
- if (tmp_db->db_id == ctdb_db->db_id) {
- ctdb_set_error(ctdb, "CTDB database hash collission '%s' : '%s'",
- name, tmp_db->db_name);
- talloc_free(ctdb_db);
- return NULL;
- }
- }
-
- if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
- DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n",
- ctdb->db_directory));
- talloc_free(ctdb_db);
- return NULL;
- }
-
- /* add the node id to the database name, so when we run on loopback
- we don't conflict in the local filesystem */
- name = talloc_asprintf(ctdb_db, "%s/%s", ctdb->db_directory, name);
-
- /* when we have a separate daemon this will need to be a real
- file, not a TDB_INTERNAL, so the parent can access it to
- for ltdb bypass */
- ctdb_db->ltdb = tdb_wrap_open(ctdb, name, 0, TDB_CLEAR_IF_FIRST, open_flags, mode);
- if (ctdb_db->ltdb == NULL) {
- ctdb_set_error(ctdb, "Failed to open tdb %s\n", name);
- talloc_free(ctdb_db);
- return NULL;
- }
-
-
- /*
- all databases support the "null" function. we need this in
- order to do forced migration of records
- */
- ret = ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
- if (ret != 0) {
- talloc_free(ctdb_db);
- return NULL;
- }
-
- DLIST_ADD(ctdb->db_list, ctdb_db);
-
- return ctdb_db;
-}
-
-/*
return the lmaster given a key
*/
uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
{
- return ctdb_hash(key) % ctdb->num_nodes;
+ uint32_t idx, lmaster;
+
+ idx = ctdb_hash(key) % ctdb->vnn_map->size;
+ lmaster = ctdb->vnn_map->map[idx];
+
+ return lmaster;
}
@@ -158,6 +87,12 @@ int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db,
TDB_DATA d2;
/* return an initial header */
if (rec.dptr) free(rec.dptr);
+ if (ctdb->vnn_map == NULL) {
+ /* called from the client */
+ ZERO_STRUCTP(data);
+ header->dmaster = (uint32_t)-1;
+ return -1;
+ }
ltdb_initial_header(ctdb_db, key, header);
ZERO_STRUCT(d2);
if (data) {
@@ -197,6 +132,17 @@ int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
TDB_DATA rec;
int ret;
+ if (ctdb->flags & CTDB_FLAG_TORTURE) {
+ struct ctdb_ltdb_header *h2;
+ rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
+ h2 = (struct ctdb_ltdb_header *)rec.dptr;
+ if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
+ DEBUG(0,("RSN regression! %llu %llu\n",
+ (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
+ }
+ if (rec.dptr) free(rec.dptr);
+ }
+
rec.dsize = sizeof(*header) + data.dsize;
rec.dptr = talloc_size(ctdb, rec.dsize);
CTDB_NO_MEMORY(ctdb, rec.dptr);
@@ -224,121 +170,9 @@ int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
*/
int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
{
- return tdb_chainunlock(ctdb_db->ltdb->tdb, key);
-}
-
-struct lock_fetch_state {
- struct ctdb_context *ctdb;
- void (*recv_pkt)(void *, uint8_t *, uint32_t);
- void *recv_context;
- struct ctdb_req_header *hdr;
-};
-
-/*
- called when we should retry the operation
- */
-static void lock_fetch_callback(void *p)
-{
- struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
- state->recv_pkt(state->recv_context, (uint8_t *)state->hdr, state->hdr->length);
- talloc_free(state);
- DEBUG(2,(__location__ " PACKET REQUEUED\n"));
-}
-
-
-/*
- do a non-blocking ltdb_lock, deferring this ctdb request until we
- have the chainlock
-
- It does the following:
-
- 1) tries to get the chainlock. If it succeeds, then it returns 0
-
- 2) if it fails to get a chainlock immediately then it sets up a
- non-blocking chainlock via ctdb_lockwait, and when it gets the
- chainlock it re-submits this ctdb request to the main packet
- receive function
-
- This effectively queues all ctdb requests that cannot be
- immediately satisfied until it can get the lock. This means that
- the main ctdb daemon will not block waiting for a chainlock held by
- a client
-
- There are 3 possible return values:
-
- 0: means that it got the lock immediately.
- -1: means that it failed to get the lock, and won't retry
- -2: means that it failed to get the lock immediately, but will retry
- */
-int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
- TDB_DATA key, struct ctdb_req_header *hdr,
- void (*recv_pkt)(void *, uint8_t *, uint32_t ),
- void *recv_context)
-{
- int ret;
- struct tdb_context *tdb = ctdb_db->ltdb->tdb;
- struct lockwait_handle *h;
- struct lock_fetch_state *state;
-
- ret = tdb_chainlock_nonblock(tdb, key);
-
- if (ret != 0 &&
- !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
- /* a hard failure - don't try again */
- return -1;
- }
-
- /* when torturing, ensure we test the contended path */
- if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
- random() % 5 == 0) {
- ret = -1;
- tdb_chainunlock(tdb, key);
- }
-
- /* first the non-contended path */
- if (ret == 0) {
- return 0;
- }
-
- state = talloc(ctdb_db, struct lock_fetch_state);
- state->ctdb = ctdb_db->ctdb;
- state->hdr = hdr;
- state->recv_pkt = recv_pkt;
- state->recv_context = recv_context;
-
- /* now the contended path */
- h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
- if (h == NULL) {
- tdb_chainunlock(tdb, key);
- return -1;
- }
-
- /* we need to move the packet off the temporary context in ctdb_recv_pkt(),
- so it won't be freed yet */
- talloc_steal(state, hdr);
- talloc_steal(state, h);
-
- /* now tell the caller than we will retry asynchronously */
- return -2;
-}
-
-/*
- a varient of ctdb_ltdb_lock_requeue that also fetches the record
- */
-int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
- TDB_DATA key, struct ctdb_ltdb_header *header,
- struct ctdb_req_header *hdr, TDB_DATA *data,
- void (*recv_pkt)(void *, uint8_t *, uint32_t ),
- void *recv_context)
-{
- int ret;
-
- ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, recv_context);
- if (ret == 0) {
- ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
- if (ret != 0) {
- ctdb_ltdb_unlock(ctdb_db, key);
- }
+ int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+ if (ret != 0) {
+ DEBUG(0,("tdb_chainunlock failed\n"));
}
return ret;
}
diff --git a/source4/cluster/ctdb/common/ctdb_message.c b/source4/cluster/ctdb/common/ctdb_message.c
index a477cc59b6..1aea28fd35 100644
--- a/source4/cluster/ctdb/common/ctdb_message.c
+++ b/source4/cluster/ctdb/common/ctdb_message.c
@@ -3,18 +3,18 @@
Copyright (C) Andrew Tridgell 2007
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 3 of the License, or (at your option) any later version.
-
- This library is distributed in the hope that it will be useful,
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
/*
see http://wiki.samba.org/index.php/Samba_%26_Clustering for
@@ -31,27 +31,19 @@
/*
this dispatches the messages to the registered ctdb message handler
*/
-static int ctdb_dispatch_message(struct ctdb_context *ctdb, uint32_t srvid, TDB_DATA data)
+int ctdb_dispatch_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
{
struct ctdb_message_list *ml;
- /* XXX we need a must faster way of finding the matching srvid
- - maybe a tree? */
for (ml=ctdb->message_list;ml;ml=ml->next) {
- if (ml->srvid == srvid || ml->srvid == CTDB_SRVID_ALL) break;
- }
- if (ml == NULL) {
- DEBUG(1,(__location__ " daemon vnn:%d no msg handler for srvid=%u\n",
- ctdb_get_vnn(ctdb), srvid));
- /* no registered message handler */
- return -1;
+ if (ml->srvid == srvid || ml->srvid == CTDB_SRVID_ALL) {
+ ml->message_handler(ctdb, srvid, data, ml->message_private);
+ }
}
- ml->message_handler(ctdb, srvid, data, ml->message_private);
return 0;
}
-
/*
called when a CTDB_REQ_MESSAGE packet comes in
*/
@@ -66,87 +58,6 @@ void ctdb_request_message(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
ctdb_dispatch_message(ctdb, c->srvid, data);
}
-/*
- this local messaging handler is ugly, but is needed to prevent
- recursion in ctdb_send_message() when the destination node is the
- same as the source node
- */
-struct ctdb_local_message {
- struct ctdb_context *ctdb;
- uint32_t srvid;
- TDB_DATA data;
-};
-
-static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
- struct timeval t, void *private_data)
-{
- struct ctdb_local_message *m = talloc_get_type(private_data,
- struct ctdb_local_message);
- int res;
-
- res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
- if (res != 0) {
- DEBUG(0, (__location__ " Failed to dispatch message for srvid=%u\n", m->srvid));
- }
- talloc_free(m);
-}
-
-static int ctdb_local_message(struct ctdb_context *ctdb, uint32_t srvid, TDB_DATA data)
-{
- struct ctdb_local_message *m;
- m = talloc(ctdb, struct ctdb_local_message);
- CTDB_NO_MEMORY(ctdb, m);
-
- m->ctdb = ctdb;
- m->srvid = srvid;
- m->data = data;
- m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
- if (m->data.dptr == NULL) {
- talloc_free(m);
- return -1;
- }
-
- /* this needs to be done as an event to prevent recursion */
- event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
- return 0;
-}
-
-/*
- send a ctdb message
-*/
-int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn,
- uint32_t srvid, TDB_DATA data)
-{
- struct ctdb_req_message *r;
- int len;
-
- /* see if this is a message to ourselves */
- if (vnn == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
- return ctdb_local_message(ctdb, srvid, data);
- }
-
- len = offsetof(struct ctdb_req_message, data) + data.dsize;
- r = ctdb->methods->allocate_pkt(ctdb, len);
- CTDB_NO_MEMORY(ctdb, r);
- talloc_set_name_const(r, "req_message packet");
-
- r->hdr.length = len;
- r->hdr.ctdb_magic = CTDB_MAGIC;
- r->hdr.ctdb_version = CTDB_VERSION;
- r->hdr.operation = CTDB_REQ_MESSAGE;
- r->hdr.destnode = vnn;
- r->hdr.srcnode = ctdb->vnn;
- r->hdr.reqid = 0;
- r->srvid = srvid;
- r->datalen = data.dsize;
- memcpy(&r->data[0], data.dptr, data.dsize);
-
- ctdb_queue_packet(ctdb, &r->hdr);
-
- talloc_free(r);
- return 0;
-}
-
/*
when a client goes away, we need to remove its srvid handler from the list
@@ -162,7 +73,7 @@ static int message_handler_destructor(struct ctdb_message_list *m)
*/
int ctdb_register_message_handler(struct ctdb_context *ctdb,
TALLOC_CTX *mem_ctx,
- uint32_t srvid,
+ uint64_t srvid,
ctdb_message_fn_t handler,
void *private_data)
{
@@ -182,3 +93,20 @@ int ctdb_register_message_handler(struct ctdb_context *ctdb,
return 0;
}
+
+
+/*
+ setup handler for receipt of ctdb messages from ctdb_send_message()
+*/
+int ctdb_deregister_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
+{
+ struct ctdb_message_list *m;
+
+ for (m=ctdb->message_list;m;m=m->next) {
+ if (m->srvid == srvid && m->message_private == private_data) {
+ talloc_free(m);
+ return 0;
+ }
+ }
+ return -1;
+}
diff --git a/source4/cluster/ctdb/common/ctdb_util.c b/source4/cluster/ctdb/common/ctdb_util.c
index 1448e3340a..f8f7cb5150 100644
--- a/source4/cluster/ctdb/common/ctdb_util.c
+++ b/source4/cluster/ctdb/common/ctdb_util.c
@@ -3,18 +3,18 @@
Copyright (C) Andrew Tridgell 2006
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 3 of the License, or (at your option) any later version.
-
- This library is distributed in the hope that it will be useful,
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#include "includes.h"
@@ -51,10 +51,9 @@ void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...)
/*
a fatal internal error occurred - no hope for recovery
*/
-_NORETURN_ void ctdb_fatal(struct ctdb_context *ctdb, const char *msg)
+void ctdb_fatal(struct ctdb_context *ctdb, const char *msg)
{
DEBUG(0,("ctdb fatal error: %s\n", msg));
- fprintf(stderr, "ctdb fatal error: '%s'\n", msg);
abort();
}
@@ -65,15 +64,18 @@ int ctdb_parse_address(struct ctdb_context *ctdb,
TALLOC_CTX *mem_ctx, const char *str,
struct ctdb_address *address)
{
- char *p;
- p = strchr(str, ':');
- if (p == NULL) {
- ctdb_set_error(ctdb, "Badly formed node '%s'\n", str);
- return -1;
- }
+ struct servent *se;
+
+ setservent(0);
+ se = getservbyname("ctdb", "tcp");
+ endservent();
- address->address = talloc_strndup(mem_ctx, str, p-str);
- address->port = strtoul(p+1, NULL, 0);
+ address->address = talloc_strdup(mem_ctx, str);
+ if (se == NULL) {
+ address->port = CTDB_PORT;
+ } else {
+ address->port = ntohs(se->s_port);
+ }
return 0;
}
@@ -105,7 +107,7 @@ uint32_t ctdb_hash(const TDB_DATA *key)
/*
a type checking varient of idr_find
*/
-void *_idr_find_type(struct idr_context *idp, int id, const char *type, const char *location)
+static void *_idr_find_type(struct idr_context *idp, int id, const char *type, const char *location)
{
void *p = idr_find(idp, id);
if (p && talloc_check_name(p, type) == NULL) {
@@ -127,3 +129,135 @@ void ctdb_latency(double *latency, struct timeval t)
*latency = l;
}
}
+
+uint32_t ctdb_reqid_new(struct ctdb_context *ctdb, void *state)
+{
+ uint32_t id;
+
+ id = ctdb->idr_cnt++ & 0xFFFF;
+ id |= (idr_get_new(ctdb->idr, state, 0xFFFF)<<16);
+ return id;
+}
+
+void *_ctdb_reqid_find(struct ctdb_context *ctdb, uint32_t reqid, const char *type, const char *location)
+{
+ void *p;
+
+ p = _idr_find_type(ctdb->idr, (reqid>>16)&0xFFFF, type, location);
+ if (p == NULL) {
+ DEBUG(0, ("Could not find idr:%u\n",reqid));
+ }
+
+ return p;
+}
+
+
+void ctdb_reqid_remove(struct ctdb_context *ctdb, uint32_t reqid)
+{
+ int ret;
+
+ ret = idr_remove(ctdb->idr, (reqid>>16)&0xFFFF);
+ if (ret != 0) {
+ DEBUG(0, ("Removing idr that does not exist\n"));
+ }
+}
+
+
+/*
+ form a ctdb_rec_data record from a key/data pair
+ */
+struct ctdb_rec_data *ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid, TDB_DATA key, TDB_DATA data)
+{
+ size_t length;
+ struct ctdb_rec_data *d;
+
+ length = offsetof(struct ctdb_rec_data, data) + key.dsize + data.dsize;
+ d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
+ if (d == NULL) {
+ return NULL;
+ }
+ d->length = length;
+ d->reqid = reqid;
+ d->keylen = key.dsize;
+ d->datalen = data.dsize;
+ memcpy(&d->data[0], key.dptr, key.dsize);
+ memcpy(&d->data[key.dsize], data.dptr, data.dsize);
+ return d;
+}
+
+#if HAVE_SCHED_H
+#include <sched.h>
+#endif
+
+/*
+ if possible, make this task real time
+ */
+void ctdb_set_realtime(bool enable)
+{
+#if HAVE_SCHED_SETSCHEDULER
+ struct sched_param p;
+ p.__sched_priority = 1;
+
+ if (enable) {
+ if (sched_setscheduler(getpid(), SCHED_FIFO, &p) == -1) {
+ DEBUG(0,("Unable to set scheduler to SCHED_FIFO (%s)\n", strerror(errno)));
+ } else {
+ DEBUG(0,("Set scheduler to SCHED_FIFO\n"));
+ }
+ } else {
+ sched_setscheduler(getpid(), SCHED_OTHER, &p);
+ }
+#endif
+}
+
+void set_nonblocking(int fd)
+{
+ unsigned v;
+ v = fcntl(fd, F_GETFL, 0);
+ fcntl(fd, F_SETFL, v | O_NONBLOCK);
+}
+
+void set_close_on_exec(int fd)
+{
+ unsigned v;
+ v = fcntl(fd, F_GETFD, 0);
+ fcntl(fd, F_SETFD, v | FD_CLOEXEC);
+}
+
+
+/*
+ parse a ip:port pair
+ */
+bool parse_ip_port(const char *s, struct sockaddr_in *ip)
+{
+ const char *p;
+ char *endp = NULL;
+ unsigned port;
+ char buf[16];
+
+ ip->sin_family = AF_INET;
+
+ p = strchr(s, ':');
+ if (p == NULL) {
+ return false;
+ }
+
+ if (p - s > 15) {
+ return false;
+ }
+
+ port = strtoul(p+1, &endp, 10);
+ if (endp == NULL || *endp != 0) {
+ /* trailing garbage */
+ return false;
+ }
+ ip->sin_port = htons(port);
+
+ strlcpy(buf, s, 1+p-s);
+
+ if (inet_aton(buf, &ip->sin_addr) == 0) {
+ return false;
+ }
+
+ return true;
+}
diff --git a/source4/cluster/ctdb/common/system.c b/source4/cluster/ctdb/common/system.c
new file mode 100644
index 0000000000..1e536f5e8a
--- /dev/null
+++ b/source4/cluster/ctdb/common/system.c
@@ -0,0 +1,385 @@
+/*
+ ctdb recovery code
+
+ Copyright (C) Ronnie Sahlberg 2007
+ Copyright (C) Andrew Tridgell 2007
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "system/wait.h"
+#include "../include/ctdb_private.h"
+#include "lib/events/events.h"
+#include <net/ethernet.h>
+#include <net/if_arp.h>
+
+
+
+/*
+ send gratuitous arp reply after we have taken over an ip address
+
+ saddr is the address we are trying to claim
+ iface is the interface name we will be using to claim the address
+ */
+int ctdb_sys_send_arp(const struct sockaddr_in *saddr, const char *iface)
+{
+ int s, ret;
+ struct sockaddr sa;
+ struct ether_header *eh;
+ struct arphdr *ah;
+ struct ifreq if_hwaddr;
+ unsigned char buffer[64]; /*minimum eth frame size */
+ char *ptr;
+
+ /* for now, we only handle AF_INET addresses */
+ if (saddr->sin_family != AF_INET) {
+ DEBUG(0,(__location__ " not an ipv4 address (family is %u)\n", saddr->sin_family));
+ return -1;
+ }
+
+ s = socket(AF_INET, SOCK_PACKET, htons(ETHERTYPE_ARP));
+ if (s == -1){
+ DEBUG(0,(__location__ " failed to open raw socket\n"));
+ return -1;
+ }
+
+ /* get the mac address */
+ strcpy(if_hwaddr.ifr_name, iface);
+ ret = ioctl(s, SIOCGIFHWADDR, &if_hwaddr);
+ if ( ret < 0 ) {
+ close(s);
+ DEBUG(0,(__location__ " ioctl failed\n"));
+ return -1;
+ }
+ if (ARPHRD_LOOPBACK == if_hwaddr.ifr_hwaddr.sa_family) {
+ DEBUG(3,("Ignoring loopback arp request\n"));
+ close(s);
+ return 0;
+ }
+ if (if_hwaddr.ifr_hwaddr.sa_family != AF_LOCAL) {
+ close(s);
+ errno = EINVAL;
+ DEBUG(0,(__location__ " not an ethernet address family (0x%x)\n",
+ if_hwaddr.ifr_hwaddr.sa_family));
+ return -1;
+ }
+
+
+ memset(buffer, 0 , 64);
+ eh = (struct ether_header *)buffer;
+ memset(eh->ether_dhost, 0xff, ETH_ALEN);
+ memcpy(eh->ether_shost, if_hwaddr.ifr_hwaddr.sa_data, ETH_ALEN);
+ eh->ether_type = htons(ETHERTYPE_ARP);
+
+ ah = (struct arphdr *)&buffer[sizeof(struct ether_header)];
+ ah->ar_hrd = htons(ARPHRD_ETHER);
+ ah->ar_pro = htons(ETH_P_IP);
+ ah->ar_hln = ETH_ALEN;
+ ah->ar_pln = 4;
+
+ /* send a gratious arp */
+ ah->ar_op = htons(ARPOP_REQUEST);
+ ptr = (char *)&ah[1];
+ memcpy(ptr, if_hwaddr.ifr_hwaddr.sa_data, ETH_ALEN);
+ ptr+=ETH_ALEN;
+ memcpy(ptr, &saddr->sin_addr, 4);
+ ptr+=4;
+ memset(ptr, 0, ETH_ALEN);
+ ptr+=ETH_ALEN;
+ memcpy(ptr, &saddr->sin_addr, 4);
+ ptr+=4;
+
+ strncpy(sa.sa_data, iface, sizeof(sa.sa_data));
+ ret = sendto(s, buffer, 64, 0, &sa, sizeof(sa));
+ if (ret < 0 ){
+ close(s);
+ DEBUG(0,(__location__ " failed sendto\n"));
+ return -1;
+ }
+
+ /* send unsolicited arp reply broadcast */
+ ah->ar_op = htons(ARPOP_REPLY);
+ ptr = (char *)&ah[1];
+ memcpy(ptr, if_hwaddr.ifr_hwaddr.sa_data, ETH_ALEN);
+ ptr+=ETH_ALEN;
+ memcpy(ptr, &saddr->sin_addr, 4);
+ ptr+=4;
+ memcpy(ptr, if_hwaddr.ifr_hwaddr.sa_data, ETH_ALEN);
+ ptr+=ETH_ALEN;
+ memcpy(ptr, &saddr->sin_addr, 4);
+ ptr+=4;
+
+ strncpy(sa.sa_data, iface, sizeof(sa.sa_data));
+ ret = sendto(s, buffer, 64, 0, &sa, sizeof(sa));
+ if (ret < 0 ){
+ DEBUG(0,(__location__ " failed sendto\n"));
+ return -1;
+ }
+
+ close(s);
+ return 0;
+}
+
+
+/*
+ uint16 checksum for n bytes
+ */
+static uint32_t uint16_checksum(uint16_t *data, size_t n)
+{
+ uint32_t sum=0;
+ while (n>=2) {
+ sum += (uint32_t)ntohs(*data);
+ data++;
+ n -= 2;
+ }
+ if (n == 1) {
+ sum += (uint32_t)ntohs(*(uint8_t *)data);
+ }
+ return sum;
+}
+
+/*
+ simple TCP checksum - assumes data is multiple of 2 bytes long
+ */
+static uint16_t tcp_checksum(uint16_t *data, size_t n, struct iphdr *ip)
+{
+ uint32_t sum = uint16_checksum(data, n);
+ uint16_t sum2;
+ sum += uint16_checksum((uint16_t *)&ip->saddr, sizeof(ip->saddr));
+ sum += uint16_checksum((uint16_t *)&ip->daddr, sizeof(ip->daddr));
+ sum += ip->protocol + n;
+ sum = (sum & 0xFFFF) + (sum >> 16);
+ sum = (sum & 0xFFFF) + (sum >> 16);
+ sum2 = htons(sum);
+ sum2 = ~sum2;
+ if (sum2 == 0) {
+ return 0xFFFF;
+ }
+ return sum2;
+}
+
+/*
+ Send tcp segment from the specified IP/port to the specified
+ destination IP/port.
+
+ This is used to trigger the receiving host into sending its own ACK,
+ which should trigger early detection of TCP reset by the client
+ after IP takeover
+
+ This can also be used to send RST segments (if rst is true) and also
+ if correct seq and ack numbers are provided.
+ */
+int ctdb_sys_send_tcp(const struct sockaddr_in *dest,
+ const struct sockaddr_in *src,
+ uint32_t seq, uint32_t ack, int rst)
+{
+ int s, ret;
+ uint32_t one = 1;
+ struct {
+ struct iphdr ip;
+ struct tcphdr tcp;
+ } pkt;
+
+ /* for now, we only handle AF_INET addresses */
+ if (src->sin_family != AF_INET || dest->sin_family != AF_INET) {
+ DEBUG(0,(__location__ " not an ipv4 address\n"));
+ return -1;
+ }
+
+ s = socket(AF_INET, SOCK_RAW, htons(IPPROTO_RAW));
+ if (s == -1) {
+ DEBUG(0,(__location__ " failed to open raw socket (%s)\n",
+ strerror(errno)));
+ return -1;
+ }
+
+ ret = setsockopt(s, SOL_IP, IP_HDRINCL, &one, sizeof(one));
+ if (ret != 0) {
+ DEBUG(0,(__location__ " failed to setup IP headers (%s)\n",
+ strerror(errno)));
+ close(s);
+ return -1;
+ }
+
+ ZERO_STRUCT(pkt);
+ pkt.ip.version = 4;
+ pkt.ip.ihl = sizeof(pkt.ip)/4;
+ pkt.ip.tot_len = htons(sizeof(pkt));
+ pkt.ip.ttl = 255;
+ pkt.ip.protocol = IPPROTO_TCP;
+ pkt.ip.saddr = src->sin_addr.s_addr;
+ pkt.ip.daddr = dest->sin_addr.s_addr;
+ pkt.ip.check = 0;
+
+ pkt.tcp.source = src->sin_port;
+ pkt.tcp.dest = dest->sin_port;
+ pkt.tcp.seq = seq;
+ pkt.tcp.ack_seq = ack;
+ pkt.tcp.ack = 1;
+ if (rst) {
+ pkt.tcp.rst = 1;
+ }
+ pkt.tcp.doff = sizeof(pkt.tcp)/4;
+ pkt.tcp.window = htons(1234);
+ pkt.tcp.check = tcp_checksum((uint16_t *)&pkt.tcp, sizeof(pkt.tcp), &pkt.ip);
+
+ ret = sendto(s, &pkt, sizeof(pkt), 0, dest, sizeof(*dest));
+ if (ret != sizeof(pkt)) {
+ DEBUG(0,(__location__ " failed sendto (%s)\n", strerror(errno)));
+ close(s);
+ return -1;
+ }
+
+ close(s);
+ return 0;
+}
+
+
+/*
+ see if we currently have an interface with the given IP
+
+ we try to bind to it, and if that fails then we don't have that IP
+ on an interface
+ */
+bool ctdb_sys_have_ip(const char *ip)
+{
+ struct sockaddr_in sin;
+ int s;
+ int ret;
+
+ sin.sin_port = 0;
+ inet_aton(ip, &sin.sin_addr);
+ sin.sin_family = AF_INET;
+ s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (s == -1) {
+ return false;
+ }
+ ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
+ close(s);
+ return ret == 0;
+}
+
+static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te,
+ struct timeval yt, void *p)
+{
+ uint32_t *timed_out = (uint32_t *)p;
+ (*timed_out) = 1;
+}
+
+/* This function is used to kill (RST) the specified tcp connection.
+
+ This function is not asynchronous and will block until the operation
+ was successful or it timesout.
+ */
+int ctdb_sys_kill_tcp(struct event_context *ev,
+ const struct sockaddr_in *dst,
+ const struct sockaddr_in *src)
+{
+ int s, ret;
+ uint32_t timedout;
+ TALLOC_CTX *tmp_ctx = talloc_new(NULL);
+#define RCVPKTSIZE 100
+ char pkt[RCVPKTSIZE];
+ struct ether_header *eth;
+ struct iphdr *ip;
+ struct tcphdr *tcp;
+
+ /* Open a socket to capture all traffic */
+ s=socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL));
+ if (s == -1){
+ DEBUG(0,(__location__ " failed to open raw socket\n"));
+ return -1;
+ }
+
+ /* We wait for up to 1 second for the ACK coming back */
+ timedout = 0;
+ event_add_timed(ev, tmp_ctx, timeval_current_ofs(1, 0), ctdb_wait_handler, &timedout);
+
+ /* Send a tickle ack to probe what the real seq/ack numbers are */
+ ctdb_sys_send_tcp(dst, src, 0, 0, 0);
+
+ /* Wait until we either time out or we succeeds in sending the RST */
+ while (timedout==0) {
+ event_loop_once(ev);
+
+ ret = recv(s, pkt, RCVPKTSIZE, MSG_TRUNC);
+ if (ret < sizeof(*eth)+sizeof(*ip)) {
+ continue;
+ }
+
+ /* Ethernet */
+ eth = (struct ether_header *)pkt;
+ /* We only want IP packets */
+ if (ntohs(eth->ether_type) != ETHERTYPE_IP) {
+ continue;
+ }
+
+ /* IP */
+ ip = (struct iphdr *)(eth+1);
+ /* We only want IPv4 packets */
+ if (ip->version != 4) {
+ continue;
+ }
+ /* Dont look at fragments */
+ if ((ntohs(ip->frag_off)&0x1fff) != 0) {
+ continue;
+ }
+ /* we only want TCP */
+ if (ip->protocol != IPPROTO_TCP) {
+ continue;
+ }
+
+ /* We only want packets sent from the guy we tickled */
+ if (ip->saddr != dst->sin_addr.s_addr) {
+ continue;
+ }
+ /* We only want packets sent to us */
+ if (ip->daddr != src->sin_addr.s_addr) {
+ continue;
+ }
+
+ /* make sure its not a short packet */
+ if (offsetof(struct tcphdr, ack_seq) + 4 +
+ (ip->ihl*4) + sizeof(*eth) > ret) {
+ continue;
+ }
+
+ /* TCP */
+ tcp = (struct tcphdr *)((ip->ihl*4) + (char *)ip);
+
+ /* We only want replies from the port we tickled */
+ if (tcp->source != dst->sin_port) {
+ continue;
+ }
+ if (tcp->dest != src->sin_port) {
+ continue;
+ }
+
+ ctdb_sys_send_tcp(dst, src, tcp->ack_seq, tcp->seq, 1);
+
+ close(s);
+ talloc_free(tmp_ctx);
+
+ return 0;
+ }
+
+ close(s);
+ talloc_free(tmp_ctx);
+ DEBUG(0,(__location__ " timedout waiting for tickle ack reply\n"));
+
+ return -1;
+}