diff options
-rw-r--r-- | source4/cluster/cluster.c | 3 | ||||
-rw-r--r-- | source4/cluster/cluster.h | 3 | ||||
-rw-r--r-- | source4/cluster/ctdb/ctdb_cluster.c | 78 | ||||
-rw-r--r-- | source4/lib/messaging/messaging.c | 18 |
4 files changed, 88 insertions, 14 deletions
diff --git a/source4/cluster/cluster.c b/source4/cluster/cluster.c index ea800d2f62..4be52b8233 100644 --- a/source4/cluster/cluster.c +++ b/source4/cluster/cluster.c @@ -80,8 +80,7 @@ struct tdb_wrap *cluster_tdb_tmp_open(TALLOC_CTX *mem_ctx, const char *dbname, i register a callback function for a messaging endpoint */ NTSTATUS cluster_message_init(struct messaging_context *msg, struct server_id server, - void (*handler)(struct messaging_context *, - struct server_id, uint32_t, DATA_BLOB)) + cluster_message_fn_t handler) { cluster_init(); return ops->message_init(ops, msg, server, handler); diff --git a/source4/cluster/cluster.h b/source4/cluster/cluster.h index 6f076e7f78..d182bf5526 100644 --- a/source4/cluster/cluster.h +++ b/source4/cluster/cluster.h @@ -34,8 +34,7 @@ #define cluster_node_equal(id1, id2) ((id1)->node == (id2)->node) struct messaging_context; -typedef void (*cluster_message_fn_t)(struct messaging_context *, - struct server_id, uint32_t, DATA_BLOB); +typedef void (*cluster_message_fn_t)(struct messaging_context *, DATA_BLOB); /* prototypes */ struct server_id cluster_id(uint32_t id); diff --git a/source4/cluster/ctdb/ctdb_cluster.c b/source4/cluster/ctdb/ctdb_cluster.c index 464cb8ecba..95adbafadf 100644 --- a/source4/cluster/ctdb/ctdb_cluster.c +++ b/source4/cluster/ctdb/ctdb_cluster.c @@ -28,12 +28,25 @@ #include "lib/tdb/include/tdb.h" #include "include/ctdb.h" #include "db_wrap.h" +#include "lib/util/dlinklist.h" + +/* a linked list of messaging handlers, allowing incoming messages + to be directed to the right messaging context */ +struct cluster_messaging_list { + struct cluster_messaging_list *next, *prev; + struct cluster_state *state; + struct messaging_context *msg; + struct server_id server; + cluster_message_fn_t handler; +}; struct cluster_state { struct ctdb_context *ctdb; + struct cluster_messaging_list *list; }; + /* return a server_id for a ctdb node */ @@ -92,6 +105,33 @@ static void *ctdb_backend_handle(struct cluster_ops *ops) } /* + dispatch incoming ctdb messages +*/ +static void ctdb_message_handler(struct ctdb_context *ctdb, uint32_t srvid, + TDB_DATA data, void *private) +{ + struct cluster_state *state = talloc_get_type(private, struct cluster_state); + struct cluster_messaging_list *m; + for (m=state->list;m;m=m->next) { + if (srvid == m->server.id) { + DATA_BLOB bdata; + bdata.data = data.dptr; + bdata.length = data.dsize; + m->handler(m->msg, bdata); + } + } +} + +/* + destroy a element of messaging list (when messaging context goes away) +*/ +static int cluster_messaging_destructor(struct cluster_messaging_list *m) +{ + DLIST_REMOVE(m->state->list, m); + return 0; +} + +/* setup a handler for ctdb messages */ static NTSTATUS ctdb_message_init(struct cluster_ops *ops, @@ -99,6 +139,19 @@ static NTSTATUS ctdb_message_init(struct cluster_ops *ops, struct server_id server, cluster_message_fn_t handler) { + struct cluster_state *state = ops->private; + struct cluster_messaging_list *m; + m = talloc(msg, struct cluster_messaging_list); + NT_STATUS_HAVE_NO_MEMORY(m); + + m->state = state; + m->msg = msg; + m->server = server; + m->handler = handler; + DLIST_ADD(state->list, m); + + talloc_set_destructor(m, cluster_messaging_destructor); + return NT_STATUS_OK; } @@ -109,7 +162,19 @@ static NTSTATUS ctdb_message_send(struct cluster_ops *ops, struct server_id server, uint32_t msg_type, DATA_BLOB *data) { - return NT_STATUS_INVALID_DEVICE_REQUEST; + struct cluster_state *state = ops->private; + struct ctdb_context *ctdb = state->ctdb; + TDB_DATA tdata; + int ret; + + tdata.dptr = data->data; + tdata.dsize = data->length; + + ret = ctdb_send_message(ctdb, server.node, server.id, msg_type, tdata); + if (ret != 0) { + return NT_STATUS_INTERNAL_DB_CORRUPTION; + } + return NT_STATUS_OK; } static struct cluster_ops cluster_ctdb_ops = { @@ -148,6 +213,8 @@ void cluster_ctdb_init(struct event_context *ev) state->ctdb = ctdb_init(ev); if (state->ctdb == NULL) goto failed; + state->list = NULL; + cluster_ctdb_ops.private = state; ret = ctdb_set_transport(state->ctdb, transport); @@ -181,6 +248,14 @@ void cluster_ctdb_init(struct event_context *ev) goto failed; } + /* setup messaging handler */ + ret = ctdb_set_message_handler(state->ctdb, ctdb_message_handler, state); + if (ret == -1) { + DEBUG(0,("ctdb_set_message_handler failed - %s\n", + ctdb_errstr(state->ctdb))); + goto failed; + } + ret = ctdb_attach(state->ctdb, "cluster.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666); if (ret == -1) { DEBUG(0,("ctdb_attach failed - %s\n", ctdb_errstr(state->ctdb))); @@ -199,6 +274,7 @@ void cluster_ctdb_init(struct event_context *ev) ctdb_connect_wait(state->ctdb); cluster_set_ops(&cluster_ctdb_ops); + return; failed: diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c index a043937733..03bfb6b571 100644 --- a/source4/lib/messaging/messaging.c +++ b/source4/lib/messaging/messaging.c @@ -155,8 +155,7 @@ static void messaging_dispatch(struct messaging_context *msg, struct messaging_r /* handler for messages that arrive from other nodes in the cluster */ -static void cluster_message_handler(struct messaging_context *msg, struct server_id from, - uint32_t msg_type, DATA_BLOB packet) +static void cluster_message_handler(struct messaging_context *msg, DATA_BLOB packet) { struct messaging_rec *rec; @@ -165,7 +164,6 @@ static void cluster_message_handler(struct messaging_context *msg, struct server smb_panic("Unable to allocate messaging_rec"); } - talloc_steal(rec, packet.data); rec->msg = msg; rec->path = msg->path; rec->header = (struct messaging_header *)packet.data; @@ -406,12 +404,6 @@ NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server, NTSTATUS status; size_t dlength = data?data->length:0; - if (!cluster_node_equal(&msg->server_id, &server)) { - /* the destination is on another node - dispatch via - the cluster layer */ - return cluster_message_send(server, msg_type, data); - } - rec = talloc(msg, struct messaging_rec); if (rec == NULL) { return NT_STATUS_NO_MEMORY; @@ -435,6 +427,14 @@ NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server, data->data, dlength); } + if (!cluster_node_equal(&msg->server_id, &server)) { + /* the destination is on another node - dispatch via + the cluster layer */ + status = cluster_message_send(server, msg_type, &rec->packet); + talloc_free(rec); + return status; + } + rec->path = messaging_path(msg, server); talloc_steal(rec, rec->path); |