summaryrefslogtreecommitdiff
path: root/source3/lib/messages.c
diff options
context:
space:
mode:
authorVolker Lendecke <vlendec@samba.org>2007-05-21 22:17:13 +0000
committerGerald (Jerry) Carter <jerry@samba.org>2007-10-10 12:22:17 -0500
commitac3f08ddbe0b484375624db0e35999a8584b57f4 (patch)
tree477347104c60dc6ae205257d654b1d89c7903f35 /source3/lib/messages.c
parentf96242d9331a5fcdc65445d0d0ea7177c7ddc6e0 (diff)
downloadsamba-ac3f08ddbe0b484375624db0e35999a8584b57f4.tar.gz
samba-ac3f08ddbe0b484375624db0e35999a8584b57f4.tar.bz2
samba-ac3f08ddbe0b484375624db0e35999a8584b57f4.zip
r23055: Rewrite messages.c to use auto-generated marshalling in the tdb. I'm
doing this because for the clustering the marshalling is needed in more than one place, so I wanted a decent routine to marshall a message_rec struct which was not there before. Tridge, this seems about the same speed as it used to be before, the librpc/ndr overhead in my tests was under the noise. Volker (This used to be commit eaefd00563173dfabb7716c5695ac0a2f7139bb6)
Diffstat (limited to 'source3/lib/messages.c')
-rw-r--r--source3/lib/messages.c597
1 files changed, 267 insertions, 330 deletions
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 95f4aba4e7..6932369b21 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -47,54 +47,30 @@
*/
#include "includes.h"
+#include "librpc/gen_ndr/messaging.h"
+#include "librpc/gen_ndr/ndr_messaging.h"
/* the locking database handle */
-static TDB_CONTEXT *tdb;
static int received_signal;
/* change the message version with any incompatible changes in the protocol */
-#define MESSAGE_VERSION 1
+#define MESSAGE_VERSION 2
-struct message_rec {
- int msg_version;
- int msg_type;
- struct server_id dest;
- struct server_id src;
- size_t len;
-};
-
-/* we have a linked list of dispatch handlers */
-static struct dispatch_fns {
- struct dispatch_fns *next, *prev;
- int msg_type;
- void (*fn)(int msg_type, struct server_id pid, void *buf, size_t len,
- void *private_data);
+struct messaging_callback {
+ struct messaging_callback *prev, *next;
+ uint32 msg_type;
+ void (*fn)(struct messaging_context *msg, void *private_data,
+ uint32_t msg_type,
+ struct server_id server_id, DATA_BLOB *data);
void *private_data;
-} *dispatch_fns;
-
-static void message_register(int msg_type,
- void (*fn)(int msg_type, struct server_id pid,
- void *buf, size_t len,
- void *private_data),
- void *private_data);
-
-/****************************************************************************
- Free global objects.
-****************************************************************************/
+};
-void gfree_messages(void)
-{
- struct dispatch_fns *dfn, *next;
-
- /* delete the dispatch_fns list */
- dfn = dispatch_fns;
- while( dfn ) {
- next = dfn->next;
- DLIST_REMOVE(dispatch_fns, dfn);
- SAFE_FREE(dfn);
- dfn = next;
- }
-}
+struct messaging_context {
+ TDB_CONTEXT *tdb;
+ struct server_id id;
+ struct event_context *event_ctx;
+ struct messaging_callback *callbacks;
+};
/****************************************************************************
Notifications come in as signals.
@@ -106,21 +82,25 @@ static void sig_usr1(void)
sys_select_signal(SIGUSR1);
}
-static NTSTATUS message_send_pid(struct server_id pid, int msg_type,
- const void *buf, size_t len);
+static NTSTATUS messaging_tdb_send(TDB_CONTEXT *msg_tdb,
+ struct server_id pid, int msg_type,
+ const void *buf, size_t len);
/****************************************************************************
A useful function for testing the message system.
****************************************************************************/
-static void ping_message(int msg_type, struct server_id src,
- void *buf, size_t len, void *private_data)
+static void ping_message(struct messaging_context *msg_ctx,
+ void *private_data,
+ uint32_t msg_type,
+ struct server_id src,
+ DATA_BLOB *data)
{
- const char *msg = buf ? (const char *)buf : "none";
+ const char *msg = data->data ? (const char *)data->data : "none";
DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
procid_str_static(&src), msg));
- message_send_pid(src, MSG_PONG, buf, len);
+ messaging_send(msg_ctx, src, MSG_PONG, data);
}
/****************************************************************************
@@ -131,24 +111,21 @@ static BOOL message_init(struct messaging_context *msg_ctx)
{
sec_init();
- if (tdb)
- return True;
-
- tdb = tdb_open_log(lock_path("messages.tdb"),
- 0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT,
- O_RDWR|O_CREAT,0600);
+ msg_ctx->tdb = tdb_open_log(lock_path("messages.tdb"),
+ 0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT,
+ O_RDWR|O_CREAT,0600);
- if (!tdb) {
+ if (!msg_ctx->tdb) {
DEBUG(0,("ERROR: Failed to initialise messages database\n"));
return False;
}
/* Activate the per-hashchain freelist */
- tdb_set_max_dead(tdb, 5);
+ tdb_set_max_dead(msg_ctx->tdb, 5);
CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
- message_register(MSG_PING, ping_message, NULL);
+ messaging_register(msg_ctx, NULL, MSG_PING, ping_message);
/* Register some debugging related messages */
@@ -175,6 +152,99 @@ static TDB_DATA message_key_pid(struct server_id pid)
return kbuf;
}
+/*
+ Fetch the messaging array for a process
+ */
+
+static NTSTATUS messaging_tdb_fetch(TDB_CONTEXT *msg_tdb,
+ TDB_DATA key,
+ TALLOC_CTX *mem_ctx,
+ struct messaging_array **presult)
+{
+ struct messaging_array *result;
+ TDB_DATA data;
+ DATA_BLOB blob;
+ NTSTATUS status;
+
+ if (!(result = TALLOC_ZERO_P(mem_ctx, struct messaging_array))) {
+ return NT_STATUS_NO_MEMORY;
+ }
+
+ data = tdb_fetch(msg_tdb, key);
+
+ if (data.dptr == NULL) {
+ *presult = result;
+ return NT_STATUS_OK;
+ }
+
+ blob = data_blob_const(data.dptr, data.dsize);
+
+ status = ndr_pull_struct_blob(
+ &blob, result, result,
+ (ndr_pull_flags_fn_t)ndr_pull_messaging_array);
+
+ SAFE_FREE(data.dptr);
+
+ if (!NT_STATUS_IS_OK(status)) {
+ TALLOC_FREE(result);
+ return status;
+ }
+
+ if (DEBUGLEVEL >= 10) {
+ DEBUG(10, ("messaging_tdb_fetch:\n"));
+ NDR_PRINT_DEBUG(messaging_array, result);
+ }
+
+ *presult = result;
+ return NT_STATUS_OK;
+}
+
+/*
+ Store a messaging array for a pid
+*/
+
+static NTSTATUS messaging_tdb_store(TDB_CONTEXT *msg_tdb,
+ TDB_DATA key,
+ struct messaging_array *array)
+{
+ TDB_DATA data;
+ DATA_BLOB blob;
+ NTSTATUS status;
+ TALLOC_CTX *mem_ctx;
+ int ret;
+
+ if (array->num_messages == 0) {
+ tdb_delete(msg_tdb, key);
+ return NT_STATUS_OK;
+ }
+
+ if (!(mem_ctx = talloc_new(array))) {
+ return NT_STATUS_NO_MEMORY;
+ }
+
+ status = ndr_push_struct_blob(
+ &blob, mem_ctx, array,
+ (ndr_push_flags_fn_t)ndr_push_messaging_array);
+
+ if (!NT_STATUS_IS_OK(status)) {
+ talloc_free(mem_ctx);
+ return status;
+ }
+
+ if (DEBUGLEVEL >= 10) {
+ DEBUG(10, ("messaging_tdb_store:\n"));
+ NDR_PRINT_DEBUG(messaging_array, array);
+ }
+
+ data.dptr = blob.data;
+ data.dsize = blob.length;
+
+ ret = tdb_store(msg_tdb, key, data, TDB_REPLACE);
+ TALLOC_FREE(mem_ctx);
+
+ return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
+}
+
/****************************************************************************
Notify a process that it has a message. If the process doesn't exist
then delete its record in the database.
@@ -216,17 +286,6 @@ static NTSTATUS message_notify(struct server_id procid)
* Something has gone wrong
*/
- if (errno == ESRCH) {
- DEBUG(2,("pid %d doesn't exist - deleting messages record\n",
- (int)pid));
- tdb_delete(tdb, message_key_pid(procid));
-
- /*
- * INVALID_HANDLE is the closest I can think of -- vl
- */
- return NT_STATUS_INVALID_HANDLE;
- }
-
DEBUG(2,("message to process %d failed - %s\n", (int)pid,
strerror(errno)));
@@ -235,6 +294,7 @@ static NTSTATUS message_notify(struct server_id procid)
* errormap.o into lots of utils.
*/
+ if (errno == ESRCH) return NT_STATUS_INVALID_HANDLE;
if (errno == EINVAL) return NT_STATUS_INVALID_PARAMETER;
if (errno == EPERM) return NT_STATUS_ACCESS_DENIED;
return NT_STATUS_UNSUCCESSFUL;
@@ -244,12 +304,15 @@ static NTSTATUS message_notify(struct server_id procid)
Send a message to a particular pid.
****************************************************************************/
-static NTSTATUS message_send_pid(struct server_id pid, int msg_type,
- const void *buf, size_t len)
+static NTSTATUS messaging_tdb_send(TDB_CONTEXT *msg_tdb,
+ struct server_id pid, int msg_type,
+ const void *buf, size_t len)
{
- TDB_DATA dbuf;
- struct message_rec rec;
- int ret;
+ struct messaging_array *msg_array;
+ struct messaging_rec *rec;
+ TALLOC_CTX *mem_ctx;
+ NTSTATUS status;
+ TDB_DATA key = message_key_pid(pid);
/* NULL pointer means implicit length zero. */
if (!buf) {
@@ -263,138 +326,129 @@ static NTSTATUS message_send_pid(struct server_id pid, int msg_type,
SMB_ASSERT(procid_to_pid(&pid) > 0);
- rec.msg_version = MESSAGE_VERSION;
- rec.msg_type = msg_type;
- rec.dest = pid;
- rec.src = procid_self();
- rec.len = buf ? len : 0;
+ if (!(mem_ctx = talloc_init("message_send_pid"))) {
+ return NT_STATUS_NO_MEMORY;
+ }
+
+ if (tdb_chainlock(msg_tdb, key) == -1) {
+ return NT_STATUS_LOCK_NOT_GRANTED;
+ }
+
+ status = messaging_tdb_fetch(msg_tdb, key, mem_ctx, &msg_array);
+
+ if (!NT_STATUS_IS_OK(status)) {
+ tdb_chainunlock(msg_tdb, key);
+ TALLOC_FREE(mem_ctx);
+ return status;
+ }
- dbuf.dptr = (uint8 *)SMB_MALLOC(len + sizeof(rec));
- if (!dbuf.dptr) {
+ if (!(rec = TALLOC_REALLOC_ARRAY(mem_ctx, msg_array->messages,
+ struct messaging_rec,
+ msg_array->num_messages+1))) {
+ tdb_chainunlock(msg_tdb, key);
+ TALLOC_FREE(mem_ctx);
return NT_STATUS_NO_MEMORY;
}
- memcpy(dbuf.dptr, &rec, sizeof(rec));
- if (len > 0 && buf)
- memcpy((void *)((char*)dbuf.dptr+sizeof(rec)), buf, len);
+ rec[msg_array->num_messages].msg_version = MESSAGE_VERSION;
+ rec[msg_array->num_messages].msg_type = msg_type;
+ rec[msg_array->num_messages].dest = pid;
+ rec[msg_array->num_messages].src = procid_self();
+ rec[msg_array->num_messages].buf = data_blob_const(buf, len);
- dbuf.dsize = len + sizeof(rec);
+ msg_array->messages = rec;
+ msg_array->num_messages += 1;
- ret = tdb_append(tdb, message_key_pid(pid), dbuf);
+ status = messaging_tdb_store(msg_tdb, key, msg_array);
- SAFE_FREE(dbuf.dptr);
+ tdb_chainunlock(msg_tdb, key);
+ TALLOC_FREE(mem_ctx);
+
+ if (!NT_STATUS_IS_OK(status)) {
+ return status;
+ }
+
+ status = message_notify(pid);
- if (ret == -1) {
- return NT_STATUS_INTERNAL_ERROR;
+ if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
+ DEBUG(2, ("pid %s doesn't exist - deleting messages record\n",
+ procid_str_static(&pid)));
+ tdb_delete(msg_tdb, message_key_pid(pid));
}
- errno = 0; /* paranoia */
- return message_notify(pid);
+ return status;
}
/****************************************************************************
Count the messages pending for a particular pid. Expensive....
****************************************************************************/
-unsigned int messages_pending_for_pid(struct server_id pid)
+unsigned int messages_pending_for_pid(struct messaging_context *msg_ctx,
+ struct server_id pid)
{
- TDB_DATA dbuf;
- uint8 *buf;
- unsigned int message_count = 0;
+ struct messaging_array *msg_array;
+ unsigned int result;
- dbuf = tdb_fetch(tdb, message_key_pid(pid));
- if (dbuf.dptr == NULL || dbuf.dsize == 0) {
- SAFE_FREE(dbuf.dptr);
+ if (!NT_STATUS_IS_OK(messaging_tdb_fetch(msg_ctx->tdb,
+ message_key_pid(pid), NULL,
+ &msg_array))) {
+ DEBUG(10, ("messaging_tdb_fetch failed\n"));
return 0;
}
- for (buf = dbuf.dptr; dbuf.dsize > sizeof(struct message_rec);) {
- struct message_rec rec;
- memcpy(&rec, buf, sizeof(rec));
- buf += (sizeof(rec) + rec.len);
- dbuf.dsize -= (sizeof(rec) + rec.len);
- message_count++;
- }
-
- SAFE_FREE(dbuf.dptr);
- return message_count;
-}
+ result = msg_array->num_messages;
+ TALLOC_FREE(msg_array);
+ return result;
+}
/****************************************************************************
Retrieve all messages for the current process.
****************************************************************************/
-static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len)
+static NTSTATUS retrieve_all_messages(TDB_CONTEXT *msg_tdb,
+ TALLOC_CTX *mem_ctx,
+ struct messaging_array **presult)
{
- TDB_DATA kbuf;
- TDB_DATA dbuf;
- TDB_DATA null_dbuf;
-
- ZERO_STRUCT(null_dbuf);
-
- *msgs_buf = NULL;
- *total_len = 0;
+ struct messaging_array *result;
+ TDB_DATA key = message_key_pid(procid_self());
+ NTSTATUS status;
- kbuf = message_key_pid(procid_self());
+ if (tdb_chainlock(msg_tdb, key) == -1) {
+ return NT_STATUS_LOCK_NOT_GRANTED;
+ }
- if (tdb_chainlock(tdb, kbuf) == -1)
- return False;
+ status = messaging_tdb_fetch(msg_tdb, key, mem_ctx, &result);
- dbuf = tdb_fetch(tdb, kbuf);
/*
- * Replace with an empty record to keep the allocated
- * space in the tdb.
+ * We delete the record here, tdb_set_max_dead keeps it around
*/
- tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE);
- tdb_chainunlock(tdb, kbuf);
+ tdb_delete(msg_tdb, key);
+ tdb_chainunlock(msg_tdb, key);
- if (dbuf.dptr == NULL || dbuf.dsize == 0) {
- SAFE_FREE(dbuf.dptr);
- return False;
+ if (NT_STATUS_IS_OK(status)) {
+ *presult = result;
}
- *msgs_buf = (char *)dbuf.dptr;
- *total_len = dbuf.dsize;
-
- return True;
+ return status;
}
-/****************************************************************************
- Parse out the next message for the current process.
-****************************************************************************/
-
-static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type,
- struct server_id *src, char **buf, size_t *len)
+/*
+ Dispatch one messsaging_rec
+*/
+static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
+ struct messaging_rec *rec)
{
- struct message_rec rec;
- char *ret_buf = *buf;
-
- *buf = NULL;
- *len = 0;
-
- if (total_len - (ret_buf - msgs_buf) < sizeof(rec))
- return False;
-
- memcpy(&rec, ret_buf, sizeof(rec));
- ret_buf += sizeof(rec);
-
- if (rec.msg_version != MESSAGE_VERSION) {
- DEBUG(0,("message version %d received (expected %d)\n",
- rec.msg_version, MESSAGE_VERSION));
- return False;
- }
+ struct messaging_callback *cb, *next;
- if (rec.len > 0) {
- if (total_len - (ret_buf - msgs_buf) < rec.len)
- return False;
+ for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
+ next = cb->next;
+ if (cb->msg_type == rec->msg_type) {
+ cb->fn(msg_ctx, cb->private_data, rec->msg_type,
+ rec->src, &rec->buf);
+ return;
+ }
}
-
- *len = rec.len;
- *msg_type = rec.msg_type;
- *src = rec.src;
- *buf = ret_buf;
-
- return True;
+ return;
}
/****************************************************************************
@@ -404,14 +458,10 @@ static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type,
messages on an *odd* byte boundary.
****************************************************************************/
-void message_dispatch(void)
+void message_dispatch(struct messaging_context *msg_ctx)
{
- int msg_type;
- struct server_id src;
- char *buf;
- char *msgs_buf;
- size_t len, total_len;
- int n_handled;
+ struct messaging_array *msg_array = NULL;
+ uint32 i;
if (!received_signal)
return;
@@ -421,37 +471,16 @@ void message_dispatch(void)
received_signal = 0;
- if (!retrieve_all_messages(&msgs_buf, &total_len))
+ if (!NT_STATUS_IS_OK(retrieve_all_messages(msg_ctx->tdb, NULL,
+ &msg_array))) {
return;
+ }
- for (buf = msgs_buf;
- message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len);
- buf += len) {
- struct dispatch_fns *dfn;
-
- DEBUG(10,("message_dispatch: received msg_type=%d "
- "src_pid=%u\n", msg_type,
- (unsigned int) procid_to_pid(&src)));
-
- n_handled = 0;
- for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
- if (dfn->msg_type == msg_type) {
- DEBUG(10,("message_dispatch: processing "
- "message of type %d.\n", msg_type));
- dfn->fn(msg_type, src,
- len ? (void *)buf : NULL, len,
- dfn->private_data);
- n_handled++;
- break;
- }
- }
- if (!n_handled) {
- DEBUG(5,("message_dispatch: warning: no handler "
- "registed for msg_type %d in pid %u\n",
- msg_type, (unsigned int)sys_getpid()));
- }
+ for (i=0; i<msg_array->num_messages; i++) {
+ messaging_dispatch_rec(msg_ctx, &msg_array->messages[i]);
}
- SAFE_FREE(msgs_buf);
+
+ TALLOC_FREE(msg_array);
}
/****************************************************************************
@@ -461,60 +490,12 @@ void message_dispatch(void)
messages on an *odd* byte boundary.
****************************************************************************/
-static void message_register(int msg_type,
- void (*fn)(int msg_type, struct server_id pid,
- void *buf, size_t len,
- void *private_data),
- void *private_data)
-{
- struct dispatch_fns *dfn;
-
- for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
- if (dfn->msg_type == msg_type) {
- dfn->fn = fn;
- return;
- }
- }
-
- if (!(dfn = SMB_MALLOC_P(struct dispatch_fns))) {
- DEBUG(0,("message_register: Not enough memory. malloc "
- "failed!\n"));
- return;
- }
-
- ZERO_STRUCTPN(dfn);
-
- dfn->msg_type = msg_type;
- dfn->fn = fn;
- dfn->private_data = private_data;
-
- DLIST_ADD(dispatch_fns, dfn);
-}
-
-/****************************************************************************
- De-register the function for a particular message type.
-****************************************************************************/
-
-static void message_deregister(int msg_type)
-{
- struct dispatch_fns *dfn, *next;
-
- for (dfn = dispatch_fns; dfn; dfn = next) {
- next = dfn->next;
- if (dfn->msg_type == msg_type) {
- DLIST_REMOVE(dispatch_fns, dfn);
- SAFE_FREE(dfn);
- return;
- }
- }
-}
-
struct msg_all {
+ struct messaging_context *msg_ctx;
int msg_type;
uint32 msg_flag;
const void *buf;
size_t len;
- BOOL duplicates;
int n_sent;
};
@@ -522,41 +503,44 @@ struct msg_all {
Send one of the messages for the broadcast.
****************************************************************************/
-static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf,
- void *state)
+static int traverse_fn(TDB_CONTEXT *the_tdb,
+ const struct connections_key *ckey,
+ const struct connections_data *crec,
+ void *private_data)
{
- struct connections_data crec;
- struct msg_all *msg_all = (struct msg_all *)state;
+ struct msg_all *msg_all = (struct msg_all *)private_data;
NTSTATUS status;
- if (dbuf.dsize != sizeof(crec))
- return 0;
-
- memcpy(&crec, dbuf.dptr, sizeof(crec));
-
- if (crec.cnum != -1)
+ if (crec->cnum != -1)
return 0;
/* Don't send if the receiver hasn't registered an interest. */
- if(!(crec.bcast_msg_flags & msg_all->msg_flag))
+ if(!(crec->bcast_msg_flags & msg_all->msg_flag))
return 0;
/* If the msg send fails because the pid was not found (i.e. smbd died),
* the msg has already been deleted from the messages.tdb.*/
- status = message_send_pid(crec.pid, msg_all->msg_type,
- msg_all->buf, msg_all->len);
+ status = messaging_send_buf(msg_all->msg_ctx,
+ crec->pid, msg_all->msg_type,
+ (uint8 *)msg_all->buf, msg_all->len);
if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
+
+ TDB_DATA key;
/* If the pid was not found delete the entry from
* connections.tdb */
DEBUG(2,("pid %s doesn't exist - deleting connections "
- "%d [%s]\n", procid_str_static(&crec.pid), crec.cnum,
- crec.servicename));
- tdb_delete(the_tdb, kbuf);
+ "%d [%s]\n", procid_str_static(&crec->pid),
+ crec->cnum, crec->servicename));
+
+ key.dptr = (uint8 *)ckey;
+ key.dsize = sizeof(*ckey);
+
+ tdb_delete(the_tdb, key);
}
msg_all->n_sent++;
return 0;
@@ -577,7 +561,6 @@ static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf,
BOOL message_send_all(struct messaging_context *msg_ctx,
int msg_type,
const void *buf, size_t len,
- BOOL duplicates_allowed,
int *n_sent)
{
struct msg_all msg_all;
@@ -598,10 +581,10 @@ BOOL message_send_all(struct messaging_context *msg_ctx,
msg_all.buf = buf;
msg_all.len = len;
- msg_all.duplicates = duplicates_allowed;
msg_all.n_sent = 0;
+ msg_all.msg_ctx = msg_ctx;
- connections_traverse(traverse_fn, &msg_all);
+ connections_forall(traverse_fn, &msg_all);
if (n_sent)
*n_sent = msg_all.n_sent;
return True;
@@ -622,40 +605,6 @@ void message_unblock(void)
BlockSignals(False, SIGUSR1);
}
-/*
- * Samba4 API wrapper around the Samba3 implementation. Yes, I know, we could
- * import the whole Samba4 thing, but I want notify.c from Samba4 in first.
- */
-
-struct messaging_callback {
- struct messaging_callback *prev, *next;
- uint32 msg_type;
- void (*fn)(struct messaging_context *msg, void *private_data,
- uint32_t msg_type,
- struct server_id server_id, DATA_BLOB *data);
- void *private_data;
-};
-
-struct messaging_context {
- struct server_id id;
- struct event_context *event_ctx;
- struct messaging_callback *callbacks;
-};
-
-static int messaging_context_destructor(struct messaging_context *ctx)
-{
- struct messaging_callback *cb;
-
- for (cb = ctx->callbacks; cb; cb = cb->next) {
- /*
- * We unconditionally remove all instances of our callback
- * from the tdb basis.
- */
- message_deregister(cb->msg_type);
- }
- return 0;
-}
-
struct event_context *messaging_event_context(struct messaging_context *msg_ctx)
{
return msg_ctx->event_ctx;
@@ -673,7 +622,6 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
ctx->id = server_id;
ctx->event_ctx = ev;
- talloc_set_destructor(ctx, messaging_context_destructor);
if (!message_init(ctx)) {
DEBUG(0, ("message_init failed: %s\n", strerror(errno)));
@@ -683,35 +631,12 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
return ctx;
}
-static void messaging_callback(int msg_type, struct server_id pid,
- void *buf, size_t len, void *private_data)
-{
- struct messaging_context *ctx = talloc_get_type_abort(
- private_data, struct messaging_context);
- struct messaging_callback *cb, *next;
-
- for (cb = ctx->callbacks; cb; cb = next) {
- /*
- * Allow a callback to remove itself
- */
- next = cb->next;
-
- if (msg_type == cb->msg_type) {
- DATA_BLOB blob;
-
- blob.data = (uint8 *)buf;
- blob.length = len;
-
- cb->fn(ctx, cb->private_data, msg_type, pid, &blob);
- }
- }
-}
-
/*
* Register a dispatch function for a particular message type. Allow multiple
* registrants
*/
-NTSTATUS messaging_register(struct messaging_context *ctx, void *private_data,
+NTSTATUS messaging_register(struct messaging_context *msg_ctx,
+ void *private_data,
uint32_t msg_type,
void (*fn)(struct messaging_context *msg,
void *private_data,
@@ -721,7 +646,19 @@ NTSTATUS messaging_register(struct messaging_context *ctx, void *private_data,
{
struct messaging_callback *cb;
- if (!(cb = talloc(ctx, struct messaging_callback))) {
+ /*
+ * Only one callback per type
+ */
+
+ for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
+ if (cb->msg_type == msg_type) {
+ cb->fn = fn;
+ cb->private_data = private_data;
+ return NT_STATUS_OK;
+ }
+ }
+
+ if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
return NT_STATUS_NO_MEMORY;
}
@@ -729,8 +666,7 @@ NTSTATUS messaging_register(struct messaging_context *ctx, void *private_data,
cb->fn = fn;
cb->private_data = private_data;
- DLIST_ADD(ctx->callbacks, cb);
- message_register(msg_type, messaging_callback, ctx);
+ DLIST_ADD(msg_ctx->callbacks, cb);
return NT_STATUS_OK;
}
@@ -759,7 +695,8 @@ NTSTATUS messaging_send(struct messaging_context *msg_ctx,
struct server_id server,
uint32_t msg_type, const DATA_BLOB *data)
{
- return message_send_pid(server, msg_type, data->data, data->length);
+ return messaging_tdb_send(msg_ctx->tdb, server, msg_type,
+ data->data, data->length);
}
NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,