diff options
-rw-r--r-- | source4/cluster/cluster.c | 29 | ||||
-rw-r--r-- | source4/cluster/cluster.h | 15 | ||||
-rw-r--r-- | source4/cluster/cluster_private.h | 7 | ||||
-rw-r--r-- | source4/cluster/ctdb/brlock_ctdb.c | 6 | ||||
-rw-r--r-- | source4/cluster/ctdb/ctdb_cluster.c | 38 | ||||
-rw-r--r-- | source4/cluster/local.c | 32 | ||||
-rw-r--r-- | source4/lib/messaging/messaging.c | 44 |
7 files changed, 161 insertions, 10 deletions
diff --git a/source4/cluster/cluster.c b/source4/cluster/cluster.c index 16de01de07..ea800d2f62 100644 --- a/source4/cluster/cluster.c +++ b/source4/cluster/cluster.c @@ -33,11 +33,11 @@ void cluster_set_ops(struct cluster_ops *new_ops) } /* - not a nice abstraction :( + an ugly way of getting at the backend handle (eg. ctdb context) via the cluster API */ -void *cluster_private(void) +void *cluster_backend_handle(void) { - return ops->private; + return ops->backend_handle(ops); } /* by default use the local ops */ @@ -46,8 +46,6 @@ static void cluster_init(void) if (ops == NULL) cluster_local_init(); } - - /* server a server_id for the local node */ @@ -76,3 +74,24 @@ struct tdb_wrap *cluster_tdb_tmp_open(TALLOC_CTX *mem_ctx, const char *dbname, i cluster_init(); return ops->cluster_tdb_tmp_open(ops, mem_ctx, dbname, flags); } + + +/* + register a callback function for a messaging endpoint +*/ +NTSTATUS cluster_message_init(struct messaging_context *msg, struct server_id server, + void (*handler)(struct messaging_context *, + struct server_id, uint32_t, DATA_BLOB)) +{ + cluster_init(); + return ops->message_init(ops, msg, server, handler); +} + +/* + send a message to another node in the cluster +*/ +NTSTATUS cluster_message_send(struct server_id server, uint32_t msg_type, DATA_BLOB *data) +{ + cluster_init(); + return ops->message_send(ops, server, msg_type, data); +} diff --git a/source4/cluster/cluster.h b/source4/cluster/cluster.h index f56a8e9bb6..6f076e7f78 100644 --- a/source4/cluster/cluster.h +++ b/source4/cluster/cluster.h @@ -28,10 +28,23 @@ */ #define cluster_id_equal(id1, id2) ((id1)->id == (id2)->id && (id1)->node == (id2)->node) +/* + test for same cluster node +*/ +#define cluster_node_equal(id1, id2) ((id1)->node == (id2)->node) + +struct messaging_context; +typedef void (*cluster_message_fn_t)(struct messaging_context *, + struct server_id, uint32_t, DATA_BLOB); + /* prototypes */ struct server_id cluster_id(uint32_t id); const char *cluster_id_string(TALLOC_CTX *mem_ctx, struct server_id id); struct tdb_wrap *cluster_tdb_tmp_open(TALLOC_CTX *mem_ctx, const char *dbname, int flags); -void *cluster_private(void); +void *cluster_backend_handle(void); + +NTSTATUS cluster_message_init(struct messaging_context *msg, struct server_id server, + cluster_message_fn_t handler); +NTSTATUS cluster_message_send(struct server_id server, uint32_t msg_type, DATA_BLOB *data); #endif diff --git a/source4/cluster/cluster_private.h b/source4/cluster/cluster_private.h index 12d57badd0..2dc749d890 100644 --- a/source4/cluster/cluster_private.h +++ b/source4/cluster/cluster_private.h @@ -29,6 +29,13 @@ struct cluster_ops { TALLOC_CTX *, struct server_id ); struct tdb_wrap *(*cluster_tdb_tmp_open)(struct cluster_ops *, TALLOC_CTX *, const char *, int); + void *(*backend_handle)(struct cluster_ops *); + NTSTATUS (*message_init)(struct cluster_ops *ops, + struct messaging_context *msg, struct server_id server, + cluster_message_fn_t handler); + NTSTATUS (*message_send)(struct cluster_ops *ops, + struct server_id server, uint32_t msg_type, + DATA_BLOB *data); void *private; /* backend state */ }; diff --git a/source4/cluster/ctdb/brlock_ctdb.c b/source4/cluster/ctdb/brlock_ctdb.c index ffc5facbb4..04f617beb9 100644 --- a/source4/cluster/ctdb/brlock_ctdb.c +++ b/source4/cluster/ctdb/brlock_ctdb.c @@ -105,7 +105,8 @@ static void show_locks(const char *op, struct lock_struct *locks, int count) static struct brl_context *brl_ctdb_init(TALLOC_CTX *mem_ctx, struct server_id server, struct messaging_context *messaging_ctx) { - struct ctdb_context *ctdb = talloc_get_type(cluster_private(), struct ctdb_context); + struct ctdb_context *ctdb = talloc_get_type(cluster_backend_handle(), + struct ctdb_context); struct brl_context *brl; brl = talloc(mem_ctx, struct brl_context); @@ -911,7 +912,8 @@ static const struct brlock_ops brlock_tdb_ops = { void brl_ctdb_init_ops(void) { - struct ctdb_context *ctdb = talloc_get_type(cluster_private(), struct ctdb_context); + struct ctdb_context *ctdb = talloc_get_type(cluster_backend_handle(), + struct ctdb_context); brl_set_ops(&brlock_tdb_ops); diff --git a/source4/cluster/ctdb/ctdb_cluster.c b/source4/cluster/ctdb/ctdb_cluster.c index 1d8d3a5252..464cb8ecba 100644 --- a/source4/cluster/ctdb/ctdb_cluster.c +++ b/source4/cluster/ctdb/ctdb_cluster.c @@ -39,7 +39,8 @@ struct cluster_state { */ static struct server_id ctdb_id(struct cluster_ops *ops, uint32_t id) { - struct ctdb_context *ctdb = ops->private; + struct cluster_state *state = ops->private; + struct ctdb_context *ctdb = state->ctdb; struct server_id server_id; server_id.node = ctdb_get_vnn(ctdb); server_id.id = id; @@ -81,10 +82,43 @@ static struct tdb_wrap *ctdb_tdb_tmp_open(struct cluster_ops *ops, return w; } +/* + get at the ctdb handle +*/ +static void *ctdb_backend_handle(struct cluster_ops *ops) +{ + struct cluster_state *state = ops->private; + return (void *)state->ctdb; +} + +/* + setup a handler for ctdb messages +*/ +static NTSTATUS ctdb_message_init(struct cluster_ops *ops, + struct messaging_context *msg, + struct server_id server, + cluster_message_fn_t handler) +{ + return NT_STATUS_OK; +} + +/* + send a ctdb message to another node +*/ +static NTSTATUS ctdb_message_send(struct cluster_ops *ops, + struct server_id server, uint32_t msg_type, + DATA_BLOB *data) +{ + return NT_STATUS_INVALID_DEVICE_REQUEST; +} + static struct cluster_ops cluster_ctdb_ops = { .cluster_id = ctdb_id, .cluster_id_string = ctdb_id_string, .cluster_tdb_tmp_open = ctdb_tdb_tmp_open, + .backend_handle = ctdb_backend_handle, + .message_init = ctdb_message_init, + .message_send = ctdb_message_send, .private = NULL }; @@ -114,7 +148,7 @@ void cluster_ctdb_init(struct event_context *ev) state->ctdb = ctdb_init(ev); if (state->ctdb == NULL) goto failed; - cluster_ctdb_ops.private = state->ctdb; + cluster_ctdb_ops.private = state; ret = ctdb_set_transport(state->ctdb, transport); if (ret == -1) { diff --git a/source4/cluster/local.c b/source4/cluster/local.c index 28a0576acc..338bac500a 100644 --- a/source4/cluster/local.c +++ b/source4/cluster/local.c @@ -65,10 +65,42 @@ static struct tdb_wrap *local_tdb_tmp_open(struct cluster_ops *ops, return w; } +/* + dummy backend handle function +*/ +static void *local_backend_handle(struct cluster_ops *ops) +{ + return NULL; +} + +/* + dummy message init function - not needed as all messages are local +*/ +static NTSTATUS local_message_init(struct cluster_ops *ops, + struct messaging_context *msg, + struct server_id server, + cluster_message_fn_t handler) +{ + return NT_STATUS_OK; +} + +/* + dummy message send +*/ +static NTSTATUS local_message_send(struct cluster_ops *ops, + struct server_id server, uint32_t msg_type, + DATA_BLOB *data) +{ + return NT_STATUS_INVALID_DEVICE_REQUEST; +} + static struct cluster_ops cluster_local_ops = { .cluster_id = local_id, .cluster_id_string = local_id_string, .cluster_tdb_tmp_open = local_tdb_tmp_open, + .backend_handle = local_backend_handle, + .message_init = local_message_init, + .message_send = local_message_send, .private = NULL }; diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c index 691db2b961..a043937733 100644 --- a/source4/lib/messaging/messaging.c +++ b/source4/lib/messaging/messaging.c @@ -152,6 +152,37 @@ static void messaging_dispatch(struct messaging_context *msg, struct messaging_r rec->header->length = 0; } +/* + handler for messages that arrive from other nodes in the cluster +*/ +static void cluster_message_handler(struct messaging_context *msg, struct server_id from, + uint32_t msg_type, DATA_BLOB packet) +{ + struct messaging_rec *rec; + + rec = talloc(msg, struct messaging_rec); + if (rec == NULL) { + smb_panic("Unable to allocate messaging_rec"); + } + + talloc_steal(rec, packet.data); + rec->msg = msg; + rec->path = msg->path; + rec->header = (struct messaging_header *)packet.data; + rec->packet = packet; + + if (packet.length != sizeof(*rec->header) + rec->header->length) { + DEBUG(0,("messaging: bad message header size %d should be %d\n", + rec->header->length, (int)(packet.length - sizeof(*rec->header)))); + talloc_free(rec); + return; + } + + messaging_dispatch(msg, rec); + talloc_free(rec); +} + + /* try to send the message @@ -375,6 +406,12 @@ NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server, NTSTATUS status; size_t dlength = data?data->length:0; + if (!cluster_node_equal(&msg->server_id, &server)) { + /* the destination is on another node - dispatch via + the cluster layer */ + return cluster_message_send(server, msg_type, data); + } + rec = talloc(msg, struct messaging_rec); if (rec == NULL) { return NT_STATUS_NO_MEMORY; @@ -464,6 +501,13 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, return NULL; } + /* setup a handler for messages from other cluster nodes, if appropriate */ + status = cluster_message_init(msg, server_id, cluster_message_handler); + if (!NT_STATUS_IS_OK(status)) { + talloc_free(msg); + return NULL; + } + if (ev == NULL) { ev = event_context_init(msg); } |