summaryrefslogtreecommitdiff
path: root/source4/lib/messaging
diff options
context:
space:
mode:
Diffstat (limited to 'source4/lib/messaging')
-rw-r--r--source4/lib/messaging/config.mk3
-rw-r--r--source4/lib/messaging/irpc.h21
-rw-r--r--source4/lib/messaging/messaging.c75
3 files changed, 56 insertions, 43 deletions
diff --git a/source4/lib/messaging/config.mk b/source4/lib/messaging/config.mk
index c4c3e6b2f8..85a5791703 100644
--- a/source4/lib/messaging/config.mk
+++ b/source4/lib/messaging/config.mk
@@ -9,6 +9,7 @@ PUBLIC_DEPENDENCIES = \
DB_WRAP \
NDR_IRPC \
UNIX_PRIVS \
- UTIL_TDB
+ UTIL_TDB \
+ CLUSTER
# End SUBSYSTEM MESSAGING
################################################
diff --git a/source4/lib/messaging/irpc.h b/source4/lib/messaging/irpc.h
index 4e775bfe06..1d704ad943 100644
--- a/source4/lib/messaging/irpc.h
+++ b/source4/lib/messaging/irpc.h
@@ -26,7 +26,7 @@
an incoming irpc message
*/
struct irpc_message {
- uint32_t from;
+ struct server_id from;
void *private;
struct irpc_header header;
struct ndr_pull *ndr;
@@ -77,22 +77,25 @@ struct irpc_request {
};
typedef void (*msg_callback_t)(struct messaging_context *msg, void *private,
- uint32_t msg_type, uint32_t server_id, DATA_BLOB *data);
+ uint32_t msg_type,
+ struct server_id server_id, DATA_BLOB *data);
-struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id,
+struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
+ struct server_id server_id,
struct event_context *ev);
-NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server,
+NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server,
uint32_t msg_type, DATA_BLOB *data);
NTSTATUS messaging_register(struct messaging_context *msg, void *private,
uint32_t msg_type,
msg_callback_t fn);
NTSTATUS messaging_register_tmp(struct messaging_context *msg, void *private,
msg_callback_t fn, uint32_t *msg_type);
-struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id,
+struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
+ struct server_id server_id,
struct event_context *ev);
struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx,
struct event_context *ev);
-NTSTATUS messaging_send_ptr(struct messaging_context *msg, uint32_t server,
+NTSTATUS messaging_send_ptr(struct messaging_context *msg, struct server_id server,
uint32_t msg_type, void *ptr);
void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void *private);
@@ -103,17 +106,17 @@ NTSTATUS irpc_register(struct messaging_context *msg_ctx,
const struct dcerpc_interface_table *table,
int call, irpc_function_t fn, void *private);
struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx,
- uint32_t server_id,
+ struct server_id server_id,
const struct dcerpc_interface_table *table,
int callnum, void *r, TALLOC_CTX *ctx);
NTSTATUS irpc_call_recv(struct irpc_request *irpc);
NTSTATUS irpc_call(struct messaging_context *msg_ctx,
- uint32_t server_id,
+ struct server_id server_id,
const struct dcerpc_interface_table *table,
int callnum, void *r, TALLOC_CTX *ctx);
NTSTATUS irpc_add_name(struct messaging_context *msg_ctx, const char *name);
-uint32_t *irpc_servers_byname(struct messaging_context *msg_ctx, const char *name);
+struct server_id *irpc_servers_byname(struct messaging_context *msg_ctx, const char *name);
void irpc_remove_name(struct messaging_context *msg_ctx, const char *name);
NTSTATUS irpc_send_reply(struct irpc_message *m, NTSTATUS status);
diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
index 09e04fda9b..691db2b961 100644
--- a/source4/lib/messaging/messaging.c
+++ b/source4/lib/messaging/messaging.c
@@ -33,12 +33,14 @@
#include "librpc/rpc/dcerpc.h"
#include "lib/tdb/include/tdb.h"
#include "lib/util/util_tdb.h"
+#include "lib/util/util_tdb.h"
+#include "cluster/cluster.h"
/* change the message version with any incompatible changes in the protocol */
#define MESSAGING_VERSION 1
struct messaging_context {
- uint32_t server_id;
+ struct server_id server_id;
struct socket_context *sock;
const char *base_path;
const char *path;
@@ -75,8 +77,8 @@ struct messaging_rec {
struct messaging_header {
uint32_t version;
uint32_t msg_type;
- uint32_t from;
- uint32_t to;
+ struct server_id from;
+ struct server_id to;
uint32_t length;
} *header;
@@ -85,17 +87,17 @@ struct messaging_rec {
static void irpc_handler(struct messaging_context *, void *,
- uint32_t, uint32_t, DATA_BLOB *);
+ uint32_t, struct server_id, DATA_BLOB *);
/*
A useful function for testing the message system.
*/
static void ping_message(struct messaging_context *msg, void *private,
- uint32_t msg_type, uint32_t src, DATA_BLOB *data)
+ uint32_t msg_type, struct server_id src, DATA_BLOB *data)
{
- DEBUG(1,("INFO: Received PING message from server %u [%.*s]\n",
- (uint_t)src, (int)data->length,
+ DEBUG(1,("INFO: Received PING message from server %u.%u [%.*s]\n",
+ (uint_t)src.node, (uint_t)src.id, (int)data->length,
data->data?(const char *)data->data:""));
messaging_send(msg, src, MSG_PONG, data);
}
@@ -114,9 +116,10 @@ static NTSTATUS irpc_uptime(struct irpc_message *msg,
/*
return the path to a messaging socket
*/
-static char *messaging_path(struct messaging_context *msg, uint32_t server_id)
+static char *messaging_path(struct messaging_context *msg, struct server_id server_id)
{
- return talloc_asprintf(msg, "%s/msg.%u", msg->base_path, (unsigned)server_id);
+ return talloc_asprintf(msg, "%s/msg.%u.%u", msg->base_path,
+ (unsigned)server_id.node, (unsigned)server_id.id);
}
/*
@@ -192,7 +195,7 @@ static void messaging_send_handler(struct messaging_context *msg)
}
if (!NT_STATUS_IS_OK(status)) {
DEBUG(1,("messaging: Lost message from %u to %u of type %u - %s\n",
- rec->header->from, rec->header->to, rec->header->msg_type,
+ rec->header->from.id, rec->header->to.id, rec->header->msg_type,
nt_errstr(status)));
}
DLIST_REMOVE(msg->pending, rec);
@@ -365,7 +368,7 @@ void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void
/*
Send a message to a particular server
*/
-NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server,
+NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server,
uint32_t msg_type, DATA_BLOB *data)
{
struct messaging_rec *rec;
@@ -420,7 +423,7 @@ NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server,
/*
Send a message to a particular server, with the message containing a single pointer
*/
-NTSTATUS messaging_send_ptr(struct messaging_context *msg, uint32_t server,
+NTSTATUS messaging_send_ptr(struct messaging_context *msg, struct server_id server,
uint32_t msg_type, void *ptr)
{
DATA_BLOB blob;
@@ -447,7 +450,8 @@ static int messaging_destructor(struct messaging_context *msg)
/*
create the listening socket and setup the dispatcher
*/
-struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id,
+struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
+ struct server_id server_id,
struct event_context *ev)
{
struct messaging_context *msg;
@@ -522,7 +526,10 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx,
struct event_context *ev)
{
- return messaging_init(mem_ctx, random() % 0x10000000, ev);
+ struct server_id id;
+ ZERO_STRUCT(id);
+ id.id = random() % 0x10000000;
+ return messaging_init(mem_ctx, id, ev);
}
/*
a list of registered irpc server functions
@@ -687,7 +694,7 @@ failed:
handle an incoming irpc message
*/
static void irpc_handler(struct messaging_context *msg_ctx, void *private,
- uint32_t msg_type, uint32_t src, DATA_BLOB *packet)
+ uint32_t msg_type, struct server_id src, DATA_BLOB *packet)
{
struct irpc_message *m;
NTSTATUS status;
@@ -745,7 +752,7 @@ static void irpc_timeout(struct event_context *ev, struct timed_event *te,
make a irpc call - async send
*/
struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx,
- uint32_t server_id,
+ struct server_id server_id,
const struct dcerpc_interface_table *table,
int callnum, void *r, TALLOC_CTX *ctx)
{
@@ -829,7 +836,7 @@ NTSTATUS irpc_call_recv(struct irpc_request *irpc)
perform a synchronous irpc request
*/
NTSTATUS irpc_call(struct messaging_context *msg_ctx,
- uint32_t server_id,
+ struct server_id server_id,
const struct dcerpc_interface_table *table,
int callnum, void *r,
TALLOC_CTX *mem_ctx)
@@ -873,15 +880,15 @@ NTSTATUS irpc_add_name(struct messaging_context *msg_ctx, const char *name)
return NT_STATUS_LOCK_NOT_GRANTED;
}
rec = tdb_fetch_bystring(t->tdb, name);
- count = rec.dsize / sizeof(uint32_t);
- rec.dptr = (unsigned char *)realloc_p(rec.dptr, uint32_t, count+1);
- rec.dsize += sizeof(uint32_t);
+ count = rec.dsize / sizeof(struct server_id);
+ rec.dptr = (unsigned char *)realloc_p(rec.dptr, struct server_id, count+1);
+ rec.dsize += sizeof(struct server_id);
if (rec.dptr == NULL) {
tdb_unlock_bystring(t->tdb, name);
talloc_free(t);
return NT_STATUS_NO_MEMORY;
}
- ((uint32_t *)rec.dptr)[count] = msg_ctx->server_id;
+ ((struct server_id *)rec.dptr)[count] = msg_ctx->server_id;
if (tdb_store_bystring(t->tdb, name, rec, 0) != 0) {
status = NT_STATUS_INTERNAL_ERROR;
}
@@ -898,12 +905,13 @@ NTSTATUS irpc_add_name(struct messaging_context *msg_ctx, const char *name)
/*
return a list of server ids for a server name
*/
-uint32_t *irpc_servers_byname(struct messaging_context *msg_ctx, const char *name)
+struct server_id *irpc_servers_byname(struct messaging_context *msg_ctx,
+ const char *name)
{
struct tdb_wrap *t;
TDB_DATA rec;
int count, i;
- uint32_t *ret;
+ struct server_id *ret;
t = irpc_namedb_open(msg_ctx);
if (t == NULL) {
@@ -920,17 +928,17 @@ uint32_t *irpc_servers_byname(struct messaging_context *msg_ctx, const char *nam
talloc_free(t);
return NULL;
}
- count = rec.dsize / sizeof(uint32_t);
- ret = talloc_array(msg_ctx, uint32_t, count+1);
+ count = rec.dsize / sizeof(struct server_id);
+ ret = talloc_array(msg_ctx, struct server_id, count+1);
if (ret == NULL) {
tdb_unlock_bystring(t->tdb, name);
talloc_free(t);
return NULL;
}
for (i=0;i<count;i++) {
- ret[i] = ((uint32_t *)rec.dptr)[i];
+ ret[i] = ((struct server_id *)rec.dptr)[i];
}
- ret[i] = 0;
+ ret[i] = cluster_id(0);
free(rec.dptr);
tdb_unlock_bystring(t->tdb, name);
talloc_free(t);
@@ -946,7 +954,7 @@ void irpc_remove_name(struct messaging_context *msg_ctx, const char *name)
struct tdb_wrap *t;
TDB_DATA rec;
int count, i;
- uint32_t *ids;
+ struct server_id *ids;
str_list_remove(msg_ctx->names, name);
@@ -960,19 +968,20 @@ void irpc_remove_name(struct messaging_context *msg_ctx, const char *name)
return;
}
rec = tdb_fetch_bystring(t->tdb, name);
- count = rec.dsize / sizeof(uint32_t);
+ count = rec.dsize / sizeof(struct server_id);
if (count == 0) {
tdb_unlock_bystring(t->tdb, name);
talloc_free(t);
return;
}
- ids = (uint32_t *)rec.dptr;
+ ids = (struct server_id *)rec.dptr;
for (i=0;i<count;i++) {
- if (ids[i] == msg_ctx->server_id) {
+ if (cluster_id_equal(&ids[i], &msg_ctx->server_id)) {
if (i < count-1) {
- memmove(ids+i, ids+i+1, count-(i+1));
+ memmove(ids+i, ids+i+1,
+ sizeof(struct server_id) * (count-(i+1)));
}
- rec.dsize -= sizeof(uint32_t);
+ rec.dsize -= sizeof(struct server_id);
break;
}
}