summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--source4/cluster/cluster.c3
-rw-r--r--source4/cluster/cluster.h3
-rw-r--r--source4/cluster/ctdb/ctdb_cluster.c78
-rw-r--r--source4/lib/messaging/messaging.c18
4 files changed, 88 insertions, 14 deletions
diff --git a/source4/cluster/cluster.c b/source4/cluster/cluster.c
index ea800d2f62..4be52b8233 100644
--- a/source4/cluster/cluster.c
+++ b/source4/cluster/cluster.c
@@ -80,8 +80,7 @@ struct tdb_wrap *cluster_tdb_tmp_open(TALLOC_CTX *mem_ctx, const char *dbname, i
register a callback function for a messaging endpoint
*/
NTSTATUS cluster_message_init(struct messaging_context *msg, struct server_id server,
- void (*handler)(struct messaging_context *,
- struct server_id, uint32_t, DATA_BLOB))
+ cluster_message_fn_t handler)
{
cluster_init();
return ops->message_init(ops, msg, server, handler);
diff --git a/source4/cluster/cluster.h b/source4/cluster/cluster.h
index 6f076e7f78..d182bf5526 100644
--- a/source4/cluster/cluster.h
+++ b/source4/cluster/cluster.h
@@ -34,8 +34,7 @@
#define cluster_node_equal(id1, id2) ((id1)->node == (id2)->node)
struct messaging_context;
-typedef void (*cluster_message_fn_t)(struct messaging_context *,
- struct server_id, uint32_t, DATA_BLOB);
+typedef void (*cluster_message_fn_t)(struct messaging_context *, DATA_BLOB);
/* prototypes */
struct server_id cluster_id(uint32_t id);
diff --git a/source4/cluster/ctdb/ctdb_cluster.c b/source4/cluster/ctdb/ctdb_cluster.c
index 464cb8ecba..95adbafadf 100644
--- a/source4/cluster/ctdb/ctdb_cluster.c
+++ b/source4/cluster/ctdb/ctdb_cluster.c
@@ -28,12 +28,25 @@
#include "lib/tdb/include/tdb.h"
#include "include/ctdb.h"
#include "db_wrap.h"
+#include "lib/util/dlinklist.h"
+
+/* a linked list of messaging handlers, allowing incoming messages
+ to be directed to the right messaging context */
+struct cluster_messaging_list {
+ struct cluster_messaging_list *next, *prev;
+ struct cluster_state *state;
+ struct messaging_context *msg;
+ struct server_id server;
+ cluster_message_fn_t handler;
+};
struct cluster_state {
struct ctdb_context *ctdb;
+ struct cluster_messaging_list *list;
};
+
/*
return a server_id for a ctdb node
*/
@@ -92,6 +105,33 @@ static void *ctdb_backend_handle(struct cluster_ops *ops)
}
/*
+ dispatch incoming ctdb messages
+*/
+static void ctdb_message_handler(struct ctdb_context *ctdb, uint32_t srvid,
+ TDB_DATA data, void *private)
+{
+ struct cluster_state *state = talloc_get_type(private, struct cluster_state);
+ struct cluster_messaging_list *m;
+ for (m=state->list;m;m=m->next) {
+ if (srvid == m->server.id) {
+ DATA_BLOB bdata;
+ bdata.data = data.dptr;
+ bdata.length = data.dsize;
+ m->handler(m->msg, bdata);
+ }
+ }
+}
+
+/*
+ destroy a element of messaging list (when messaging context goes away)
+*/
+static int cluster_messaging_destructor(struct cluster_messaging_list *m)
+{
+ DLIST_REMOVE(m->state->list, m);
+ return 0;
+}
+
+/*
setup a handler for ctdb messages
*/
static NTSTATUS ctdb_message_init(struct cluster_ops *ops,
@@ -99,6 +139,19 @@ static NTSTATUS ctdb_message_init(struct cluster_ops *ops,
struct server_id server,
cluster_message_fn_t handler)
{
+ struct cluster_state *state = ops->private;
+ struct cluster_messaging_list *m;
+ m = talloc(msg, struct cluster_messaging_list);
+ NT_STATUS_HAVE_NO_MEMORY(m);
+
+ m->state = state;
+ m->msg = msg;
+ m->server = server;
+ m->handler = handler;
+ DLIST_ADD(state->list, m);
+
+ talloc_set_destructor(m, cluster_messaging_destructor);
+
return NT_STATUS_OK;
}
@@ -109,7 +162,19 @@ static NTSTATUS ctdb_message_send(struct cluster_ops *ops,
struct server_id server, uint32_t msg_type,
DATA_BLOB *data)
{
- return NT_STATUS_INVALID_DEVICE_REQUEST;
+ struct cluster_state *state = ops->private;
+ struct ctdb_context *ctdb = state->ctdb;
+ TDB_DATA tdata;
+ int ret;
+
+ tdata.dptr = data->data;
+ tdata.dsize = data->length;
+
+ ret = ctdb_send_message(ctdb, server.node, server.id, msg_type, tdata);
+ if (ret != 0) {
+ return NT_STATUS_INTERNAL_DB_CORRUPTION;
+ }
+ return NT_STATUS_OK;
}
static struct cluster_ops cluster_ctdb_ops = {
@@ -148,6 +213,8 @@ void cluster_ctdb_init(struct event_context *ev)
state->ctdb = ctdb_init(ev);
if (state->ctdb == NULL) goto failed;
+ state->list = NULL;
+
cluster_ctdb_ops.private = state;
ret = ctdb_set_transport(state->ctdb, transport);
@@ -181,6 +248,14 @@ void cluster_ctdb_init(struct event_context *ev)
goto failed;
}
+ /* setup messaging handler */
+ ret = ctdb_set_message_handler(state->ctdb, ctdb_message_handler, state);
+ if (ret == -1) {
+ DEBUG(0,("ctdb_set_message_handler failed - %s\n",
+ ctdb_errstr(state->ctdb)));
+ goto failed;
+ }
+
ret = ctdb_attach(state->ctdb, "cluster.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666);
if (ret == -1) {
DEBUG(0,("ctdb_attach failed - %s\n", ctdb_errstr(state->ctdb)));
@@ -199,6 +274,7 @@ void cluster_ctdb_init(struct event_context *ev)
ctdb_connect_wait(state->ctdb);
cluster_set_ops(&cluster_ctdb_ops);
+
return;
failed:
diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
index a043937733..03bfb6b571 100644
--- a/source4/lib/messaging/messaging.c
+++ b/source4/lib/messaging/messaging.c
@@ -155,8 +155,7 @@ static void messaging_dispatch(struct messaging_context *msg, struct messaging_r
/*
handler for messages that arrive from other nodes in the cluster
*/
-static void cluster_message_handler(struct messaging_context *msg, struct server_id from,
- uint32_t msg_type, DATA_BLOB packet)
+static void cluster_message_handler(struct messaging_context *msg, DATA_BLOB packet)
{
struct messaging_rec *rec;
@@ -165,7 +164,6 @@ static void cluster_message_handler(struct messaging_context *msg, struct server
smb_panic("Unable to allocate messaging_rec");
}
- talloc_steal(rec, packet.data);
rec->msg = msg;
rec->path = msg->path;
rec->header = (struct messaging_header *)packet.data;
@@ -406,12 +404,6 @@ NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server,
NTSTATUS status;
size_t dlength = data?data->length:0;
- if (!cluster_node_equal(&msg->server_id, &server)) {
- /* the destination is on another node - dispatch via
- the cluster layer */
- return cluster_message_send(server, msg_type, data);
- }
-
rec = talloc(msg, struct messaging_rec);
if (rec == NULL) {
return NT_STATUS_NO_MEMORY;
@@ -435,6 +427,14 @@ NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server,
data->data, dlength);
}
+ if (!cluster_node_equal(&msg->server_id, &server)) {
+ /* the destination is on another node - dispatch via
+ the cluster layer */
+ status = cluster_message_send(server, msg_type, &rec->packet);
+ talloc_free(rec);
+ return status;
+ }
+
rec->path = messaging_path(msg, server);
talloc_steal(rec, rec->path);