summaryrefslogtreecommitdiff
path: root/source4/lib/messaging/messaging.c
diff options
context:
space:
mode:
authorAndrew Tridgell <tridge@samba.org>2005-06-05 06:53:07 +0000
committerGerald (Jerry) Carter <jerry@samba.org>2007-10-10 13:17:37 -0500
commitbf1ffa283caef6a3c98b5cc7f5bc8205c2818b06 (patch)
tree11b434e632aaee2472c906b7ddb9d14fdfd48e38 /source4/lib/messaging/messaging.c
parent03840652354598db203a3596077ecc55726880c8 (diff)
downloadsamba-bf1ffa283caef6a3c98b5cc7f5bc8205c2818b06.tar.gz
samba-bf1ffa283caef6a3c98b5cc7f5bc8205c2818b06.tar.bz2
samba-bf1ffa283caef6a3c98b5cc7f5bc8205c2818b06.zip
r7294: implemented the irpc messaging system. This is the core of the
management system I proposed on samba-technical a couple of days ago. Essentially it is a very lightweight way for any code in Samba to make IDL based rpc calls to anywhere else in the code, without the client or server having to go to the trouble of setting up a full rpc service. It can be used with any of our existing IDL, but I expect it will mostly be used for a new set of Samba specific management calls. The LOCAL-IRPC torture test demonstrates how it can be used by calling the echo_AddOne() call over this transport. (This used to be commit 3d589a09954eb8b318f567e1150b0c27412fb942)
Diffstat (limited to 'source4/lib/messaging/messaging.c')
-rw-r--r--source4/lib/messaging/messaging.c283
1 files changed, 281 insertions, 2 deletions
diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
index 936e3b9515..b605fa0494 100644
--- a/source4/lib/messaging/messaging.c
+++ b/source4/lib/messaging/messaging.c
@@ -27,6 +27,8 @@
#include "messages.h"
#include "dlinklist.h"
#include "lib/socket/socket.h"
+#include "librpc/gen_ndr/ndr_irpc.h"
+#include "lib/messaging/irpc.h"
/* change the message version with any incompatible changes in the protocol */
#define MESSAGING_VERSION 1
@@ -37,6 +39,8 @@ struct messaging_context {
const char *path;
struct dispatch_fn *dispatch;
struct messaging_rec *pending;
+ struct irpc_list *irpc;
+ struct idr_context *idr;
struct {
struct event_context *ev;
@@ -72,6 +76,10 @@ struct messaging_rec {
};
+static void irpc_handler(struct messaging_context *, void *,
+ uint32_t, uint32_t, DATA_BLOB *);
+
+
/*
A useful function for testing the message system.
*/
@@ -363,8 +371,10 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
msg->path = messaging_path(msg, server_id);
msg->server_id = server_id;
- msg->dispatch = NULL;
- msg->pending = NULL;
+ msg->dispatch = NULL;
+ msg->pending = NULL;
+ msg->idr = idr_init(msg);
+ msg->irpc = NULL;
status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
if (!NT_STATUS_IS_OK(status)) {
@@ -393,6 +403,275 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
talloc_set_destructor(msg, messaging_destructor);
messaging_register(msg, NULL, MSG_PING, ping_message);
+ messaging_register(msg, NULL, MSG_IRPC, irpc_handler);
return msg;
}
+
+
+/*
+ a list of registered irpc server functions
+*/
+struct irpc_list {
+ struct irpc_list *next, *prev;
+ struct GUID uuid;
+ const struct dcerpc_interface_table *table;
+ int callnum;
+ irpc_function_t fn;
+};
+
+
+/*
+ register a irpc server function
+*/
+NTSTATUS irpc_register(struct messaging_context *msg_ctx,
+ const struct dcerpc_interface_table *table,
+ int call, irpc_function_t fn)
+{
+ struct irpc_list *irpc;
+
+ irpc = talloc(msg_ctx, struct irpc_list);
+ NT_STATUS_HAVE_NO_MEMORY(irpc);
+
+ irpc->table = table;
+ irpc->callnum = call;
+ irpc->fn = fn;
+ GUID_from_string(irpc->table->uuid, &irpc->uuid);
+
+ DLIST_ADD(msg_ctx->irpc, irpc);
+
+ return NT_STATUS_OK;
+}
+
+
+/*
+ handle an incoming irpc reply message
+*/
+static void irpc_handler_reply(struct messaging_context *msg_ctx,
+ struct ndr_pull *ndr, struct irpc_header *header)
+{
+ struct irpc_request *irpc;
+
+ irpc = idr_find(msg_ctx->idr, header->callid);
+ if (irpc == NULL) return;
+
+ /* parse the reply data */
+ irpc->status = irpc->table->calls[irpc->callnum].ndr_pull(ndr, NDR_OUT, irpc->r);
+ if (NT_STATUS_IS_OK(irpc->status)) {
+ irpc->status = header->status;
+ }
+ irpc->done = True;
+ if (irpc->async.fn) {
+ irpc->async.fn(irpc);
+ }
+}
+
+
+/*
+ handle an incoming irpc request message
+*/
+static void irpc_handler_request(struct messaging_context *msg_ctx,
+ struct ndr_pull *ndr, struct irpc_header *header,
+ uint32_t src)
+{
+ struct irpc_list *i;
+ void *r;
+ NTSTATUS status;
+ struct irpc_message m;
+ struct ndr_push *push;
+ DATA_BLOB packet;
+
+ for (i=msg_ctx->irpc; i; i=i->next) {
+ if (GUID_equal(&i->uuid, &header->uuid) &&
+ i->table->if_version == header->if_version &&
+ i->callnum == header->callnum) {
+ break;
+ }
+ }
+
+ if (i == NULL) {
+ /* no registered handler for this message */
+ return;
+ }
+
+ /* allocate space for the structure */
+ r = talloc_zero_size(ndr, i->table->calls[header->callnum].struct_size);
+ if (r == NULL) goto failed;
+
+ /* parse the request data */
+ status = i->table->calls[i->callnum].ndr_pull(ndr, NDR_IN, r);
+ if (!NT_STATUS_IS_OK(status)) goto failed;
+
+ /* make the call */
+ m.from = src;
+ header->status = i->fn(&m, r);
+
+ /* setup the reply */
+ push = ndr_push_init_ctx(ndr);
+ if (push == NULL) goto failed;
+
+ header->flags |= IRPC_FLAG_REPLY;
+
+ /* construct the packet */
+ status = ndr_push_irpc_header(push, NDR_SCALARS|NDR_BUFFERS, header);
+ if (!NT_STATUS_IS_OK(status)) goto failed;
+
+ status = i->table->calls[i->callnum].ndr_push(push, NDR_OUT, r);
+ if (!NT_STATUS_IS_OK(status)) goto failed;
+
+ /* send the reply message */
+ packet = ndr_push_blob(push);
+ status = messaging_send(msg_ctx, src, MSG_IRPC, &packet);
+ if (!NT_STATUS_IS_OK(status)) goto failed;
+
+failed:
+ /* nothing to clean up */
+ return;
+}
+
+/*
+ handle an incoming irpc message
+*/
+static void irpc_handler(struct messaging_context *msg_ctx, void *private,
+ uint32_t msg_type, uint32_t src, DATA_BLOB *packet)
+{
+ struct irpc_header header;
+ struct ndr_pull *ndr;
+ NTSTATUS status;
+
+ ndr = ndr_pull_init_blob(packet, msg_ctx);
+ if (ndr == NULL) goto failed;
+
+ status = ndr_pull_irpc_header(ndr, NDR_BUFFERS|NDR_SCALARS, &header);
+ if (!NT_STATUS_IS_OK(status)) goto failed;
+
+ if (header.flags & IRPC_FLAG_REPLY) {
+ irpc_handler_reply(msg_ctx, ndr, &header);
+ } else {
+ irpc_handler_request(msg_ctx, ndr, &header, src);
+ }
+
+failed:
+ talloc_free(ndr);
+}
+
+
+/*
+ destroy a irpc request
+*/
+static int irpc_destructor(void *ptr)
+{
+ struct irpc_request *irpc = talloc_get_type(ptr, struct irpc_request);
+ idr_remove(irpc->msg_ctx->idr, irpc->callid);
+ return 0;
+}
+
+/*
+ timeout a irpc request
+*/
+static void irpc_timeout(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private)
+{
+ struct irpc_request *irpc = talloc_get_type(private, struct irpc_request);
+ irpc->status = NT_STATUS_IO_TIMEOUT;
+ irpc->done = True;
+ if (irpc->async.fn) {
+ irpc->async.fn(irpc);
+ }
+}
+
+
+/*
+ make a irpc call - async send
+*/
+struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx,
+ uint32_t server_id,
+ const struct dcerpc_interface_table *table,
+ int callnum, void *r)
+{
+ struct irpc_header header;
+ struct ndr_push *ndr;
+ NTSTATUS status;
+ DATA_BLOB packet;
+ struct irpc_request *irpc;
+
+ irpc = talloc(msg_ctx, struct irpc_request);
+ if (irpc == NULL) goto failed;
+
+ irpc->msg_ctx = msg_ctx;
+ irpc->table = table;
+ irpc->callnum = callnum;
+ irpc->callid = idr_get_new(msg_ctx->idr, irpc, UINT16_MAX);
+ if (irpc->callid == -1) goto failed;
+ irpc->r = r;
+ irpc->done = False;
+ irpc->async.fn = NULL;
+
+ talloc_set_destructor(irpc, irpc_destructor);
+
+ /* setup the header */
+ status = GUID_from_string(table->uuid, &header.uuid);
+ if (!NT_STATUS_IS_OK(status)) goto failed;
+
+ header.if_version = table->if_version;
+ header.callid = irpc->callid;
+ header.callnum = callnum;
+ header.flags = 0;
+ header.status = NT_STATUS_OK;
+
+ /* construct the irpc packet */
+ ndr = ndr_push_init_ctx(irpc);
+ if (ndr == NULL) goto failed;
+
+ status = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header);
+ if (!NT_STATUS_IS_OK(status)) goto failed;
+
+ status = table->calls[callnum].ndr_push(ndr, NDR_IN, r);
+ if (!NT_STATUS_IS_OK(status)) goto failed;
+
+ /* and send it */
+ packet = ndr_push_blob(ndr);
+ status = messaging_send(msg_ctx, server_id, MSG_IRPC, &packet);
+ if (!NT_STATUS_IS_OK(status)) goto failed;
+
+ event_add_timed(msg_ctx->event.ev, irpc,
+ timeval_current_ofs(IRPC_CALL_TIMEOUT, 0),
+ irpc_timeout, irpc);
+
+ talloc_free(ndr);
+ return irpc;
+
+failed:
+ talloc_free(irpc);
+ return NULL;
+}
+
+/*
+ wait for a irpc reply
+*/
+NTSTATUS irpc_call_recv(struct irpc_request *irpc)
+{
+ NTSTATUS status;
+ NT_STATUS_HAVE_NO_MEMORY(irpc);
+ while (!irpc->done) {
+ if (event_loop_once(irpc->msg_ctx->event.ev) != 0) {
+ return NT_STATUS_CONNECTION_DISCONNECTED;
+ }
+ }
+ status = irpc->status;
+ talloc_free(irpc);
+ return status;
+}
+
+/*
+ perform a synchronous irpc request
+*/
+NTSTATUS irpc_call(struct messaging_context *msg_ctx,
+ uint32_t server_id,
+ const struct dcerpc_interface_table *table,
+ int callnum, void *r)
+{
+ struct irpc_request *irpc = irpc_call_send(msg_ctx, server_id,
+ table, callnum, r);
+ return irpc_call_recv(irpc);
+}