summaryrefslogtreecommitdiff
path: root/source4/cluster/ctdb/common/ctdb.c
diff options
context:
space:
mode:
Diffstat (limited to 'source4/cluster/ctdb/common/ctdb.c')
-rw-r--r--source4/cluster/ctdb/common/ctdb.c136
1 files changed, 105 insertions, 31 deletions
diff --git a/source4/cluster/ctdb/common/ctdb.c b/source4/cluster/ctdb/common/ctdb.c
index 8a8d52f3f1..6bd2fda529 100644
--- a/source4/cluster/ctdb/common/ctdb.c
+++ b/source4/cluster/ctdb/common/ctdb.c
@@ -74,6 +74,22 @@ void ctdb_set_max_lacount(struct ctdb_context *ctdb, unsigned count)
}
/*
+ set the directory for the local databases
+*/
+int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir)
+{
+ if (dir == NULL) {
+ ctdb->db_directory = talloc_asprintf(ctdb, "ctdb-%u", ctdb_get_vnn(ctdb));
+ } else {
+ ctdb->db_directory = talloc_strdup(ctdb, dir);
+ }
+ if (ctdb->db_directory == NULL) {
+ return -1;
+ }
+ return 0;
+}
+
+/*
add a node to the list of active nodes
*/
static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
@@ -190,65 +206,102 @@ uint32_t ctdb_get_num_nodes(struct ctdb_context *ctdb)
/*
called by the transport layer when a packet comes in
*/
-static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
+void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
{
- struct ctdb_req_header *hdr;
+ struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
+ TALLOC_CTX *tmp_ctx;
+
+ ctdb->status.node_packets_recv++;
+
+ /* place the packet as a child of the tmp_ctx. We then use
+ talloc_free() below to free it. If any of the calls want
+ to keep it, then they will steal it somewhere else, and the
+ talloc_free() will only free the tmp_ctx */
+ tmp_ctx = talloc_new(ctdb);
+ talloc_steal(tmp_ctx, hdr);
if (length < sizeof(*hdr)) {
ctdb_set_error(ctdb, "Bad packet length %d\n", length);
- return;
+ goto done;
}
- hdr = (struct ctdb_req_header *)data;
if (length != hdr->length) {
ctdb_set_error(ctdb, "Bad header length %d expected %d\n",
hdr->length, length);
- return;
+ goto done;
}
if (hdr->ctdb_magic != CTDB_MAGIC) {
ctdb_set_error(ctdb, "Non CTDB packet rejected\n");
- return;
+ goto done;
}
if (hdr->ctdb_version != CTDB_VERSION) {
ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
- return;
+ goto done;
}
+ DEBUG(3,(__location__ " ctdb request %d of type %d length %d from "
+ "node %d to %d\n", hdr->reqid, hdr->operation, hdr->length,
+ hdr->srcnode, hdr->destnode));
+
switch (hdr->operation) {
case CTDB_REQ_CALL:
+ ctdb->status.count.req_call++;
ctdb_request_call(ctdb, hdr);
break;
case CTDB_REPLY_CALL:
+ ctdb->status.count.reply_call++;
ctdb_reply_call(ctdb, hdr);
break;
case CTDB_REPLY_ERROR:
+ ctdb->status.count.reply_error++;
ctdb_reply_error(ctdb, hdr);
break;
case CTDB_REPLY_REDIRECT:
+ ctdb->status.count.reply_redirect++;
ctdb_reply_redirect(ctdb, hdr);
break;
case CTDB_REQ_DMASTER:
+ ctdb->status.count.req_dmaster++;
ctdb_request_dmaster(ctdb, hdr);
break;
case CTDB_REPLY_DMASTER:
+ ctdb->status.count.reply_dmaster++;
ctdb_reply_dmaster(ctdb, hdr);
break;
case CTDB_REQ_MESSAGE:
+ ctdb->status.count.req_message++;
ctdb_request_message(ctdb, hdr);
break;
+ case CTDB_REQ_FINISHED:
+ ctdb->status.count.req_finished++;
+ ctdb_request_finished(ctdb, hdr);
+ break;
+
default:
- printf("Packet with unknown operation %d\n", hdr->operation);
+ DEBUG(0,("%s: Packet with unknown operation %d\n",
+ __location__, hdr->operation));
break;
}
- talloc_free(hdr);
+
+done:
+ talloc_free(tmp_ctx);
+}
+
+/*
+ called by the transport layer when a packet comes in
+*/
+void ctdb_recv_raw_pkt(void *p, uint8_t *data, uint32_t length)
+{
+ struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
+ ctdb_recv_pkt(ctdb, data, length);
}
/*
@@ -257,8 +310,8 @@ static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t len
static void ctdb_node_dead(struct ctdb_node *node)
{
node->ctdb->num_connected--;
- printf("%s: node %s is dead: %d connected\n",
- node->ctdb->name, node->name, node->ctdb->num_connected);
+ DEBUG(1,("%s: node %s is dead: %d connected\n",
+ node->ctdb->name, node->name, node->ctdb->num_connected));
}
/*
@@ -267,8 +320,8 @@ static void ctdb_node_dead(struct ctdb_node *node)
static void ctdb_node_connected(struct ctdb_node *node)
{
node->ctdb->num_connected++;
- printf("%s: connected to %s - %d connected\n",
- node->ctdb->name, node->name, node->ctdb->num_connected);
+ DEBUG(1,("%s: connected to %s - %d connected\n",
+ node->ctdb->name, node->name, node->ctdb->num_connected));
}
/*
@@ -281,33 +334,62 @@ void ctdb_daemon_connect_wait(struct ctdb_context *ctdb)
expected++;
}
while (ctdb->num_connected != expected) {
+ DEBUG(3,("ctdb_connect_wait: waiting for %d nodes (have %d)\n",
+ expected, ctdb->num_connected));
event_loop_once(ctdb->ev);
}
+ DEBUG(3,("ctdb_connect_wait: got all %d nodes\n", expected));
}
+struct queue_next {
+ struct ctdb_context *ctdb;
+ struct ctdb_req_header *hdr;
+};
+
+
/*
- wait until we're the only node left
-*/
-void ctdb_wait_loop(struct ctdb_context *ctdb)
+ trigered when a deferred packet is due
+ */
+static void queue_next_trigger(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private_data)
{
- int expected = 0;
- if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {
- expected++;
+ struct queue_next *q = talloc_get_type(private_data, struct queue_next);
+ ctdb_recv_pkt(q->ctdb, (uint8_t *)q->hdr, q->hdr->length);
+ talloc_free(q);
+}
+
+/*
+ defer a packet, so it is processed on the next event loop
+ this is used for sending packets to ourselves
+ */
+static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+ struct queue_next *q;
+ q = talloc(ctdb, struct queue_next);
+ if (q == NULL) {
+ DEBUG(0,(__location__ " Failed to allocate deferred packet\n"));
+ return;
}
- while (ctdb->num_connected > expected) {
- event_loop_once(ctdb->ev);
+ q->ctdb = ctdb;
+ q->hdr = talloc_memdup(ctdb, hdr, hdr->length);
+ if (q->hdr == NULL) {
+ DEBUG(0,("Error copying deferred packet to self\n"));
+ return;
}
+ event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
}
-
/*
queue a packet or die
*/
void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
{
struct ctdb_node *node;
+ ctdb->status.node_packets_sent++;
node = ctdb->nodes[hdr->destnode];
- if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
+ if (hdr->destnode == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
+ ctdb_defer_packet(ctdb, hdr);
+ } else if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
ctdb_fatal(ctdb, "Unable to queue packet\n");
}
}
@@ -338,11 +420,3 @@ struct ctdb_context *ctdb_init(struct event_context *ev)
return ctdb;
}
-int ctdb_start(struct ctdb_context *ctdb)
-{
- if (ctdb->flags&CTDB_FLAG_DAEMON_MODE) {
- return ctdbd_start(ctdb);
- }
-
- return ctdb->methods->start(ctdb);
-}