From 1cd4339b9a2786aa26691ca4f02fa93ab0958b88 Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Wed, 10 Jan 2007 10:52:09 +0000 Subject: r20646: first preparations for cluster enablement. This changes " uint32_t server_id to struct server_id server_id; which allows a server ID to have an node number. The node number will be zero in non-clustered case. This is the most basic hook needed for clustering, and ctdb. (This used to be commit 2365abaa991d57d68c6ebe9be608e01c907102eb) --- source4/lib/messaging/messaging.c | 75 ++++++++++++++++++++++----------------- 1 file changed, 42 insertions(+), 33 deletions(-) (limited to 'source4/lib/messaging/messaging.c') diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c index 09e04fda9b..691db2b961 100644 --- a/source4/lib/messaging/messaging.c +++ b/source4/lib/messaging/messaging.c @@ -33,12 +33,14 @@ #include "librpc/rpc/dcerpc.h" #include "lib/tdb/include/tdb.h" #include "lib/util/util_tdb.h" +#include "lib/util/util_tdb.h" +#include "cluster/cluster.h" /* change the message version with any incompatible changes in the protocol */ #define MESSAGING_VERSION 1 struct messaging_context { - uint32_t server_id; + struct server_id server_id; struct socket_context *sock; const char *base_path; const char *path; @@ -75,8 +77,8 @@ struct messaging_rec { struct messaging_header { uint32_t version; uint32_t msg_type; - uint32_t from; - uint32_t to; + struct server_id from; + struct server_id to; uint32_t length; } *header; @@ -85,17 +87,17 @@ struct messaging_rec { static void irpc_handler(struct messaging_context *, void *, - uint32_t, uint32_t, DATA_BLOB *); + uint32_t, struct server_id, DATA_BLOB *); /* A useful function for testing the message system. */ static void ping_message(struct messaging_context *msg, void *private, - uint32_t msg_type, uint32_t src, DATA_BLOB *data) + uint32_t msg_type, struct server_id src, DATA_BLOB *data) { - DEBUG(1,("INFO: Received PING message from server %u [%.*s]\n", - (uint_t)src, (int)data->length, + DEBUG(1,("INFO: Received PING message from server %u.%u [%.*s]\n", + (uint_t)src.node, (uint_t)src.id, (int)data->length, data->data?(const char *)data->data:"")); messaging_send(msg, src, MSG_PONG, data); } @@ -114,9 +116,10 @@ static NTSTATUS irpc_uptime(struct irpc_message *msg, /* return the path to a messaging socket */ -static char *messaging_path(struct messaging_context *msg, uint32_t server_id) +static char *messaging_path(struct messaging_context *msg, struct server_id server_id) { - return talloc_asprintf(msg, "%s/msg.%u", msg->base_path, (unsigned)server_id); + return talloc_asprintf(msg, "%s/msg.%u.%u", msg->base_path, + (unsigned)server_id.node, (unsigned)server_id.id); } /* @@ -192,7 +195,7 @@ static void messaging_send_handler(struct messaging_context *msg) } if (!NT_STATUS_IS_OK(status)) { DEBUG(1,("messaging: Lost message from %u to %u of type %u - %s\n", - rec->header->from, rec->header->to, rec->header->msg_type, + rec->header->from.id, rec->header->to.id, rec->header->msg_type, nt_errstr(status))); } DLIST_REMOVE(msg->pending, rec); @@ -365,7 +368,7 @@ void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void /* Send a message to a particular server */ -NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, +NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server, uint32_t msg_type, DATA_BLOB *data) { struct messaging_rec *rec; @@ -420,7 +423,7 @@ NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, /* Send a message to a particular server, with the message containing a single pointer */ -NTSTATUS messaging_send_ptr(struct messaging_context *msg, uint32_t server, +NTSTATUS messaging_send_ptr(struct messaging_context *msg, struct server_id server, uint32_t msg_type, void *ptr) { DATA_BLOB blob; @@ -447,7 +450,8 @@ static int messaging_destructor(struct messaging_context *msg) /* create the listening socket and setup the dispatcher */ -struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id, +struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, + struct server_id server_id, struct event_context *ev) { struct messaging_context *msg; @@ -522,7 +526,10 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx, struct event_context *ev) { - return messaging_init(mem_ctx, random() % 0x10000000, ev); + struct server_id id; + ZERO_STRUCT(id); + id.id = random() % 0x10000000; + return messaging_init(mem_ctx, id, ev); } /* a list of registered irpc server functions @@ -687,7 +694,7 @@ failed: handle an incoming irpc message */ static void irpc_handler(struct messaging_context *msg_ctx, void *private, - uint32_t msg_type, uint32_t src, DATA_BLOB *packet) + uint32_t msg_type, struct server_id src, DATA_BLOB *packet) { struct irpc_message *m; NTSTATUS status; @@ -745,7 +752,7 @@ static void irpc_timeout(struct event_context *ev, struct timed_event *te, make a irpc call - async send */ struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx, - uint32_t server_id, + struct server_id server_id, const struct dcerpc_interface_table *table, int callnum, void *r, TALLOC_CTX *ctx) { @@ -829,7 +836,7 @@ NTSTATUS irpc_call_recv(struct irpc_request *irpc) perform a synchronous irpc request */ NTSTATUS irpc_call(struct messaging_context *msg_ctx, - uint32_t server_id, + struct server_id server_id, const struct dcerpc_interface_table *table, int callnum, void *r, TALLOC_CTX *mem_ctx) @@ -873,15 +880,15 @@ NTSTATUS irpc_add_name(struct messaging_context *msg_ctx, const char *name) return NT_STATUS_LOCK_NOT_GRANTED; } rec = tdb_fetch_bystring(t->tdb, name); - count = rec.dsize / sizeof(uint32_t); - rec.dptr = (unsigned char *)realloc_p(rec.dptr, uint32_t, count+1); - rec.dsize += sizeof(uint32_t); + count = rec.dsize / sizeof(struct server_id); + rec.dptr = (unsigned char *)realloc_p(rec.dptr, struct server_id, count+1); + rec.dsize += sizeof(struct server_id); if (rec.dptr == NULL) { tdb_unlock_bystring(t->tdb, name); talloc_free(t); return NT_STATUS_NO_MEMORY; } - ((uint32_t *)rec.dptr)[count] = msg_ctx->server_id; + ((struct server_id *)rec.dptr)[count] = msg_ctx->server_id; if (tdb_store_bystring(t->tdb, name, rec, 0) != 0) { status = NT_STATUS_INTERNAL_ERROR; } @@ -898,12 +905,13 @@ NTSTATUS irpc_add_name(struct messaging_context *msg_ctx, const char *name) /* return a list of server ids for a server name */ -uint32_t *irpc_servers_byname(struct messaging_context *msg_ctx, const char *name) +struct server_id *irpc_servers_byname(struct messaging_context *msg_ctx, + const char *name) { struct tdb_wrap *t; TDB_DATA rec; int count, i; - uint32_t *ret; + struct server_id *ret; t = irpc_namedb_open(msg_ctx); if (t == NULL) { @@ -920,17 +928,17 @@ uint32_t *irpc_servers_byname(struct messaging_context *msg_ctx, const char *nam talloc_free(t); return NULL; } - count = rec.dsize / sizeof(uint32_t); - ret = talloc_array(msg_ctx, uint32_t, count+1); + count = rec.dsize / sizeof(struct server_id); + ret = talloc_array(msg_ctx, struct server_id, count+1); if (ret == NULL) { tdb_unlock_bystring(t->tdb, name); talloc_free(t); return NULL; } for (i=0;itdb, name); talloc_free(t); @@ -946,7 +954,7 @@ void irpc_remove_name(struct messaging_context *msg_ctx, const char *name) struct tdb_wrap *t; TDB_DATA rec; int count, i; - uint32_t *ids; + struct server_id *ids; str_list_remove(msg_ctx->names, name); @@ -960,19 +968,20 @@ void irpc_remove_name(struct messaging_context *msg_ctx, const char *name) return; } rec = tdb_fetch_bystring(t->tdb, name); - count = rec.dsize / sizeof(uint32_t); + count = rec.dsize / sizeof(struct server_id); if (count == 0) { tdb_unlock_bystring(t->tdb, name); talloc_free(t); return; } - ids = (uint32_t *)rec.dptr; + ids = (struct server_id *)rec.dptr; for (i=0;iserver_id) { + if (cluster_id_equal(&ids[i], &msg_ctx->server_id)) { if (i < count-1) { - memmove(ids+i, ids+i+1, count-(i+1)); + memmove(ids+i, ids+i+1, + sizeof(struct server_id) * (count-(i+1))); } - rec.dsize -= sizeof(uint32_t); + rec.dsize -= sizeof(struct server_id); break; } } -- cgit