diff options
Diffstat (limited to 'source4/lib/messaging')
-rw-r--r-- | source4/lib/messaging/config.mk | 3 | ||||
-rw-r--r-- | source4/lib/messaging/irpc.h | 21 | ||||
-rw-r--r-- | source4/lib/messaging/messaging.c | 75 |
3 files changed, 56 insertions, 43 deletions
diff --git a/source4/lib/messaging/config.mk b/source4/lib/messaging/config.mk index c4c3e6b2f8..85a5791703 100644 --- a/source4/lib/messaging/config.mk +++ b/source4/lib/messaging/config.mk @@ -9,6 +9,7 @@ PUBLIC_DEPENDENCIES = \ DB_WRAP \ NDR_IRPC \ UNIX_PRIVS \ - UTIL_TDB + UTIL_TDB \ + CLUSTER # End SUBSYSTEM MESSAGING ################################################ diff --git a/source4/lib/messaging/irpc.h b/source4/lib/messaging/irpc.h index 4e775bfe06..1d704ad943 100644 --- a/source4/lib/messaging/irpc.h +++ b/source4/lib/messaging/irpc.h @@ -26,7 +26,7 @@ an incoming irpc message */ struct irpc_message { - uint32_t from; + struct server_id from; void *private; struct irpc_header header; struct ndr_pull *ndr; @@ -77,22 +77,25 @@ struct irpc_request { }; typedef void (*msg_callback_t)(struct messaging_context *msg, void *private, - uint32_t msg_type, uint32_t server_id, DATA_BLOB *data); + uint32_t msg_type, + struct server_id server_id, DATA_BLOB *data); -struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id, +struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, + struct server_id server_id, struct event_context *ev); -NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, +NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server, uint32_t msg_type, DATA_BLOB *data); NTSTATUS messaging_register(struct messaging_context *msg, void *private, uint32_t msg_type, msg_callback_t fn); NTSTATUS messaging_register_tmp(struct messaging_context *msg, void *private, msg_callback_t fn, uint32_t *msg_type); -struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id, +struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, + struct server_id server_id, struct event_context *ev); struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx, struct event_context *ev); -NTSTATUS messaging_send_ptr(struct messaging_context *msg, uint32_t server, +NTSTATUS messaging_send_ptr(struct messaging_context *msg, struct server_id server, uint32_t msg_type, void *ptr); void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void *private); @@ -103,17 +106,17 @@ NTSTATUS irpc_register(struct messaging_context *msg_ctx, const struct dcerpc_interface_table *table, int call, irpc_function_t fn, void *private); struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx, - uint32_t server_id, + struct server_id server_id, const struct dcerpc_interface_table *table, int callnum, void *r, TALLOC_CTX *ctx); NTSTATUS irpc_call_recv(struct irpc_request *irpc); NTSTATUS irpc_call(struct messaging_context *msg_ctx, - uint32_t server_id, + struct server_id server_id, const struct dcerpc_interface_table *table, int callnum, void *r, TALLOC_CTX *ctx); NTSTATUS irpc_add_name(struct messaging_context *msg_ctx, const char *name); -uint32_t *irpc_servers_byname(struct messaging_context *msg_ctx, const char *name); +struct server_id *irpc_servers_byname(struct messaging_context *msg_ctx, const char *name); void irpc_remove_name(struct messaging_context *msg_ctx, const char *name); NTSTATUS irpc_send_reply(struct irpc_message *m, NTSTATUS status); diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c index 09e04fda9b..691db2b961 100644 --- a/source4/lib/messaging/messaging.c +++ b/source4/lib/messaging/messaging.c @@ -33,12 +33,14 @@ #include "librpc/rpc/dcerpc.h" #include "lib/tdb/include/tdb.h" #include "lib/util/util_tdb.h" +#include "lib/util/util_tdb.h" +#include "cluster/cluster.h" /* change the message version with any incompatible changes in the protocol */ #define MESSAGING_VERSION 1 struct messaging_context { - uint32_t server_id; + struct server_id server_id; struct socket_context *sock; const char *base_path; const char *path; @@ -75,8 +77,8 @@ struct messaging_rec { struct messaging_header { uint32_t version; uint32_t msg_type; - uint32_t from; - uint32_t to; + struct server_id from; + struct server_id to; uint32_t length; } *header; @@ -85,17 +87,17 @@ struct messaging_rec { static void irpc_handler(struct messaging_context *, void *, - uint32_t, uint32_t, DATA_BLOB *); + uint32_t, struct server_id, DATA_BLOB *); /* A useful function for testing the message system. */ static void ping_message(struct messaging_context *msg, void *private, - uint32_t msg_type, uint32_t src, DATA_BLOB *data) + uint32_t msg_type, struct server_id src, DATA_BLOB *data) { - DEBUG(1,("INFO: Received PING message from server %u [%.*s]\n", - (uint_t)src, (int)data->length, + DEBUG(1,("INFO: Received PING message from server %u.%u [%.*s]\n", + (uint_t)src.node, (uint_t)src.id, (int)data->length, data->data?(const char *)data->data:"")); messaging_send(msg, src, MSG_PONG, data); } @@ -114,9 +116,10 @@ static NTSTATUS irpc_uptime(struct irpc_message *msg, /* return the path to a messaging socket */ -static char *messaging_path(struct messaging_context *msg, uint32_t server_id) +static char *messaging_path(struct messaging_context *msg, struct server_id server_id) { - return talloc_asprintf(msg, "%s/msg.%u", msg->base_path, (unsigned)server_id); + return talloc_asprintf(msg, "%s/msg.%u.%u", msg->base_path, + (unsigned)server_id.node, (unsigned)server_id.id); } /* @@ -192,7 +195,7 @@ static void messaging_send_handler(struct messaging_context *msg) } if (!NT_STATUS_IS_OK(status)) { DEBUG(1,("messaging: Lost message from %u to %u of type %u - %s\n", - rec->header->from, rec->header->to, rec->header->msg_type, + rec->header->from.id, rec->header->to.id, rec->header->msg_type, nt_errstr(status))); } DLIST_REMOVE(msg->pending, rec); @@ -365,7 +368,7 @@ void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void /* Send a message to a particular server */ -NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, +NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server, uint32_t msg_type, DATA_BLOB *data) { struct messaging_rec *rec; @@ -420,7 +423,7 @@ NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, /* Send a message to a particular server, with the message containing a single pointer */ -NTSTATUS messaging_send_ptr(struct messaging_context *msg, uint32_t server, +NTSTATUS messaging_send_ptr(struct messaging_context *msg, struct server_id server, uint32_t msg_type, void *ptr) { DATA_BLOB blob; @@ -447,7 +450,8 @@ static int messaging_destructor(struct messaging_context *msg) /* create the listening socket and setup the dispatcher */ -struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id, +struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, + struct server_id server_id, struct event_context *ev) { struct messaging_context *msg; @@ -522,7 +526,10 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx, struct event_context *ev) { - return messaging_init(mem_ctx, random() % 0x10000000, ev); + struct server_id id; + ZERO_STRUCT(id); + id.id = random() % 0x10000000; + return messaging_init(mem_ctx, id, ev); } /* a list of registered irpc server functions @@ -687,7 +694,7 @@ failed: handle an incoming irpc message */ static void irpc_handler(struct messaging_context *msg_ctx, void *private, - uint32_t msg_type, uint32_t src, DATA_BLOB *packet) + uint32_t msg_type, struct server_id src, DATA_BLOB *packet) { struct irpc_message *m; NTSTATUS status; @@ -745,7 +752,7 @@ static void irpc_timeout(struct event_context *ev, struct timed_event *te, make a irpc call - async send */ struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx, - uint32_t server_id, + struct server_id server_id, const struct dcerpc_interface_table *table, int callnum, void *r, TALLOC_CTX *ctx) { @@ -829,7 +836,7 @@ NTSTATUS irpc_call_recv(struct irpc_request *irpc) perform a synchronous irpc request */ NTSTATUS irpc_call(struct messaging_context *msg_ctx, - uint32_t server_id, + struct server_id server_id, const struct dcerpc_interface_table *table, int callnum, void *r, TALLOC_CTX *mem_ctx) @@ -873,15 +880,15 @@ NTSTATUS irpc_add_name(struct messaging_context *msg_ctx, const char *name) return NT_STATUS_LOCK_NOT_GRANTED; } rec = tdb_fetch_bystring(t->tdb, name); - count = rec.dsize / sizeof(uint32_t); - rec.dptr = (unsigned char *)realloc_p(rec.dptr, uint32_t, count+1); - rec.dsize += sizeof(uint32_t); + count = rec.dsize / sizeof(struct server_id); + rec.dptr = (unsigned char *)realloc_p(rec.dptr, struct server_id, count+1); + rec.dsize += sizeof(struct server_id); if (rec.dptr == NULL) { tdb_unlock_bystring(t->tdb, name); talloc_free(t); return NT_STATUS_NO_MEMORY; } - ((uint32_t *)rec.dptr)[count] = msg_ctx->server_id; + ((struct server_id *)rec.dptr)[count] = msg_ctx->server_id; if (tdb_store_bystring(t->tdb, name, rec, 0) != 0) { status = NT_STATUS_INTERNAL_ERROR; } @@ -898,12 +905,13 @@ NTSTATUS irpc_add_name(struct messaging_context *msg_ctx, const char *name) /* return a list of server ids for a server name */ -uint32_t *irpc_servers_byname(struct messaging_context *msg_ctx, const char *name) +struct server_id *irpc_servers_byname(struct messaging_context *msg_ctx, + const char *name) { struct tdb_wrap *t; TDB_DATA rec; int count, i; - uint32_t *ret; + struct server_id *ret; t = irpc_namedb_open(msg_ctx); if (t == NULL) { @@ -920,17 +928,17 @@ uint32_t *irpc_servers_byname(struct messaging_context *msg_ctx, const char *nam talloc_free(t); return NULL; } - count = rec.dsize / sizeof(uint32_t); - ret = talloc_array(msg_ctx, uint32_t, count+1); + count = rec.dsize / sizeof(struct server_id); + ret = talloc_array(msg_ctx, struct server_id, count+1); if (ret == NULL) { tdb_unlock_bystring(t->tdb, name); talloc_free(t); return NULL; } for (i=0;i<count;i++) { - ret[i] = ((uint32_t *)rec.dptr)[i]; + ret[i] = ((struct server_id *)rec.dptr)[i]; } - ret[i] = 0; + ret[i] = cluster_id(0); free(rec.dptr); tdb_unlock_bystring(t->tdb, name); talloc_free(t); @@ -946,7 +954,7 @@ void irpc_remove_name(struct messaging_context *msg_ctx, const char *name) struct tdb_wrap *t; TDB_DATA rec; int count, i; - uint32_t *ids; + struct server_id *ids; str_list_remove(msg_ctx->names, name); @@ -960,19 +968,20 @@ void irpc_remove_name(struct messaging_context *msg_ctx, const char *name) return; } rec = tdb_fetch_bystring(t->tdb, name); - count = rec.dsize / sizeof(uint32_t); + count = rec.dsize / sizeof(struct server_id); if (count == 0) { tdb_unlock_bystring(t->tdb, name); talloc_free(t); return; } - ids = (uint32_t *)rec.dptr; + ids = (struct server_id *)rec.dptr; for (i=0;i<count;i++) { - if (ids[i] == msg_ctx->server_id) { + if (cluster_id_equal(&ids[i], &msg_ctx->server_id)) { if (i < count-1) { - memmove(ids+i, ids+i+1, count-(i+1)); + memmove(ids+i, ids+i+1, + sizeof(struct server_id) * (count-(i+1))); } - rec.dsize -= sizeof(uint32_t); + rec.dsize -= sizeof(struct server_id); break; } } |