summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--source4/cluster/cluster.c29
-rw-r--r--source4/cluster/cluster.h15
-rw-r--r--source4/cluster/cluster_private.h7
-rw-r--r--source4/cluster/ctdb/brlock_ctdb.c6
-rw-r--r--source4/cluster/ctdb/ctdb_cluster.c38
-rw-r--r--source4/cluster/local.c32
-rw-r--r--source4/lib/messaging/messaging.c44
7 files changed, 161 insertions, 10 deletions
diff --git a/source4/cluster/cluster.c b/source4/cluster/cluster.c
index 16de01de07..ea800d2f62 100644
--- a/source4/cluster/cluster.c
+++ b/source4/cluster/cluster.c
@@ -33,11 +33,11 @@ void cluster_set_ops(struct cluster_ops *new_ops)
}
/*
- not a nice abstraction :(
+ an ugly way of getting at the backend handle (eg. ctdb context) via the cluster API
*/
-void *cluster_private(void)
+void *cluster_backend_handle(void)
{
- return ops->private;
+ return ops->backend_handle(ops);
}
/* by default use the local ops */
@@ -46,8 +46,6 @@ static void cluster_init(void)
if (ops == NULL) cluster_local_init();
}
-
-
/*
server a server_id for the local node
*/
@@ -76,3 +74,24 @@ struct tdb_wrap *cluster_tdb_tmp_open(TALLOC_CTX *mem_ctx, const char *dbname, i
cluster_init();
return ops->cluster_tdb_tmp_open(ops, mem_ctx, dbname, flags);
}
+
+
+/*
+ register a callback function for a messaging endpoint
+*/
+NTSTATUS cluster_message_init(struct messaging_context *msg, struct server_id server,
+ void (*handler)(struct messaging_context *,
+ struct server_id, uint32_t, DATA_BLOB))
+{
+ cluster_init();
+ return ops->message_init(ops, msg, server, handler);
+}
+
+/*
+ send a message to another node in the cluster
+*/
+NTSTATUS cluster_message_send(struct server_id server, uint32_t msg_type, DATA_BLOB *data)
+{
+ cluster_init();
+ return ops->message_send(ops, server, msg_type, data);
+}
diff --git a/source4/cluster/cluster.h b/source4/cluster/cluster.h
index f56a8e9bb6..6f076e7f78 100644
--- a/source4/cluster/cluster.h
+++ b/source4/cluster/cluster.h
@@ -28,10 +28,23 @@
*/
#define cluster_id_equal(id1, id2) ((id1)->id == (id2)->id && (id1)->node == (id2)->node)
+/*
+ test for same cluster node
+*/
+#define cluster_node_equal(id1, id2) ((id1)->node == (id2)->node)
+
+struct messaging_context;
+typedef void (*cluster_message_fn_t)(struct messaging_context *,
+ struct server_id, uint32_t, DATA_BLOB);
+
/* prototypes */
struct server_id cluster_id(uint32_t id);
const char *cluster_id_string(TALLOC_CTX *mem_ctx, struct server_id id);
struct tdb_wrap *cluster_tdb_tmp_open(TALLOC_CTX *mem_ctx, const char *dbname, int flags);
-void *cluster_private(void);
+void *cluster_backend_handle(void);
+
+NTSTATUS cluster_message_init(struct messaging_context *msg, struct server_id server,
+ cluster_message_fn_t handler);
+NTSTATUS cluster_message_send(struct server_id server, uint32_t msg_type, DATA_BLOB *data);
#endif
diff --git a/source4/cluster/cluster_private.h b/source4/cluster/cluster_private.h
index 12d57badd0..2dc749d890 100644
--- a/source4/cluster/cluster_private.h
+++ b/source4/cluster/cluster_private.h
@@ -29,6 +29,13 @@ struct cluster_ops {
TALLOC_CTX *, struct server_id );
struct tdb_wrap *(*cluster_tdb_tmp_open)(struct cluster_ops *,
TALLOC_CTX *, const char *, int);
+ void *(*backend_handle)(struct cluster_ops *);
+ NTSTATUS (*message_init)(struct cluster_ops *ops,
+ struct messaging_context *msg, struct server_id server,
+ cluster_message_fn_t handler);
+ NTSTATUS (*message_send)(struct cluster_ops *ops,
+ struct server_id server, uint32_t msg_type,
+ DATA_BLOB *data);
void *private; /* backend state */
};
diff --git a/source4/cluster/ctdb/brlock_ctdb.c b/source4/cluster/ctdb/brlock_ctdb.c
index ffc5facbb4..04f617beb9 100644
--- a/source4/cluster/ctdb/brlock_ctdb.c
+++ b/source4/cluster/ctdb/brlock_ctdb.c
@@ -105,7 +105,8 @@ static void show_locks(const char *op, struct lock_struct *locks, int count)
static struct brl_context *brl_ctdb_init(TALLOC_CTX *mem_ctx, struct server_id server,
struct messaging_context *messaging_ctx)
{
- struct ctdb_context *ctdb = talloc_get_type(cluster_private(), struct ctdb_context);
+ struct ctdb_context *ctdb = talloc_get_type(cluster_backend_handle(),
+ struct ctdb_context);
struct brl_context *brl;
brl = talloc(mem_ctx, struct brl_context);
@@ -911,7 +912,8 @@ static const struct brlock_ops brlock_tdb_ops = {
void brl_ctdb_init_ops(void)
{
- struct ctdb_context *ctdb = talloc_get_type(cluster_private(), struct ctdb_context);
+ struct ctdb_context *ctdb = talloc_get_type(cluster_backend_handle(),
+ struct ctdb_context);
brl_set_ops(&brlock_tdb_ops);
diff --git a/source4/cluster/ctdb/ctdb_cluster.c b/source4/cluster/ctdb/ctdb_cluster.c
index 1d8d3a5252..464cb8ecba 100644
--- a/source4/cluster/ctdb/ctdb_cluster.c
+++ b/source4/cluster/ctdb/ctdb_cluster.c
@@ -39,7 +39,8 @@ struct cluster_state {
*/
static struct server_id ctdb_id(struct cluster_ops *ops, uint32_t id)
{
- struct ctdb_context *ctdb = ops->private;
+ struct cluster_state *state = ops->private;
+ struct ctdb_context *ctdb = state->ctdb;
struct server_id server_id;
server_id.node = ctdb_get_vnn(ctdb);
server_id.id = id;
@@ -81,10 +82,43 @@ static struct tdb_wrap *ctdb_tdb_tmp_open(struct cluster_ops *ops,
return w;
}
+/*
+ get at the ctdb handle
+*/
+static void *ctdb_backend_handle(struct cluster_ops *ops)
+{
+ struct cluster_state *state = ops->private;
+ return (void *)state->ctdb;
+}
+
+/*
+ setup a handler for ctdb messages
+*/
+static NTSTATUS ctdb_message_init(struct cluster_ops *ops,
+ struct messaging_context *msg,
+ struct server_id server,
+ cluster_message_fn_t handler)
+{
+ return NT_STATUS_OK;
+}
+
+/*
+ send a ctdb message to another node
+*/
+static NTSTATUS ctdb_message_send(struct cluster_ops *ops,
+ struct server_id server, uint32_t msg_type,
+ DATA_BLOB *data)
+{
+ return NT_STATUS_INVALID_DEVICE_REQUEST;
+}
+
static struct cluster_ops cluster_ctdb_ops = {
.cluster_id = ctdb_id,
.cluster_id_string = ctdb_id_string,
.cluster_tdb_tmp_open = ctdb_tdb_tmp_open,
+ .backend_handle = ctdb_backend_handle,
+ .message_init = ctdb_message_init,
+ .message_send = ctdb_message_send,
.private = NULL
};
@@ -114,7 +148,7 @@ void cluster_ctdb_init(struct event_context *ev)
state->ctdb = ctdb_init(ev);
if (state->ctdb == NULL) goto failed;
- cluster_ctdb_ops.private = state->ctdb;
+ cluster_ctdb_ops.private = state;
ret = ctdb_set_transport(state->ctdb, transport);
if (ret == -1) {
diff --git a/source4/cluster/local.c b/source4/cluster/local.c
index 28a0576acc..338bac500a 100644
--- a/source4/cluster/local.c
+++ b/source4/cluster/local.c
@@ -65,10 +65,42 @@ static struct tdb_wrap *local_tdb_tmp_open(struct cluster_ops *ops,
return w;
}
+/*
+ dummy backend handle function
+*/
+static void *local_backend_handle(struct cluster_ops *ops)
+{
+ return NULL;
+}
+
+/*
+ dummy message init function - not needed as all messages are local
+*/
+static NTSTATUS local_message_init(struct cluster_ops *ops,
+ struct messaging_context *msg,
+ struct server_id server,
+ cluster_message_fn_t handler)
+{
+ return NT_STATUS_OK;
+}
+
+/*
+ dummy message send
+*/
+static NTSTATUS local_message_send(struct cluster_ops *ops,
+ struct server_id server, uint32_t msg_type,
+ DATA_BLOB *data)
+{
+ return NT_STATUS_INVALID_DEVICE_REQUEST;
+}
+
static struct cluster_ops cluster_local_ops = {
.cluster_id = local_id,
.cluster_id_string = local_id_string,
.cluster_tdb_tmp_open = local_tdb_tmp_open,
+ .backend_handle = local_backend_handle,
+ .message_init = local_message_init,
+ .message_send = local_message_send,
.private = NULL
};
diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
index 691db2b961..a043937733 100644
--- a/source4/lib/messaging/messaging.c
+++ b/source4/lib/messaging/messaging.c
@@ -152,6 +152,37 @@ static void messaging_dispatch(struct messaging_context *msg, struct messaging_r
rec->header->length = 0;
}
+/*
+ handler for messages that arrive from other nodes in the cluster
+*/
+static void cluster_message_handler(struct messaging_context *msg, struct server_id from,
+ uint32_t msg_type, DATA_BLOB packet)
+{
+ struct messaging_rec *rec;
+
+ rec = talloc(msg, struct messaging_rec);
+ if (rec == NULL) {
+ smb_panic("Unable to allocate messaging_rec");
+ }
+
+ talloc_steal(rec, packet.data);
+ rec->msg = msg;
+ rec->path = msg->path;
+ rec->header = (struct messaging_header *)packet.data;
+ rec->packet = packet;
+
+ if (packet.length != sizeof(*rec->header) + rec->header->length) {
+ DEBUG(0,("messaging: bad message header size %d should be %d\n",
+ rec->header->length, (int)(packet.length - sizeof(*rec->header))));
+ talloc_free(rec);
+ return;
+ }
+
+ messaging_dispatch(msg, rec);
+ talloc_free(rec);
+}
+
+
/*
try to send the message
@@ -375,6 +406,12 @@ NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server,
NTSTATUS status;
size_t dlength = data?data->length:0;
+ if (!cluster_node_equal(&msg->server_id, &server)) {
+ /* the destination is on another node - dispatch via
+ the cluster layer */
+ return cluster_message_send(server, msg_type, data);
+ }
+
rec = talloc(msg, struct messaging_rec);
if (rec == NULL) {
return NT_STATUS_NO_MEMORY;
@@ -464,6 +501,13 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
return NULL;
}
+ /* setup a handler for messages from other cluster nodes, if appropriate */
+ status = cluster_message_init(msg, server_id, cluster_message_handler);
+ if (!NT_STATUS_IS_OK(status)) {
+ talloc_free(msg);
+ return NULL;
+ }
+
if (ev == NULL) {
ev = event_context_init(msg);
}