summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--source4/lib/messaging/messaging.c213
1 files changed, 94 insertions, 119 deletions
diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
index 2130958b36..4d2cd9c910 100644
--- a/source4/lib/messaging/messaging.c
+++ b/source4/lib/messaging/messaging.c
@@ -31,9 +31,6 @@
/* change the message version with any incompatible changes in the protocol */
#define MESSAGING_VERSION 1
-/* the number of microseconds to backoff in retrying to send a message */
-#define MESSAGING_BACKOFF 250000
-
/* maximum message size */
#define MESSAGING_MAX_SIZE 512
@@ -42,6 +39,7 @@ struct messaging_context {
struct socket_context *sock;
const char *path;
struct dispatch_fn *dispatch;
+ struct messaging_rec *pending;
struct {
struct event_context *ev;
@@ -61,8 +59,8 @@ struct dispatch_fn {
/* an individual message */
struct messaging_rec {
+ struct messaging_rec *next, *prev;
struct messaging_context *msg;
- struct socket_context *sock;
const char *path;
struct {
@@ -76,6 +74,7 @@ struct messaging_rec {
DATA_BLOB data;
};
+
/*
A useful function for testing the message system.
*/
@@ -111,19 +110,67 @@ static void messaging_dispatch(struct messaging_context *msg, struct messaging_r
d->fn(msg, d->private, d->msg_type, rec->header.from, &rec->data);
}
}
-
rec->header.length = 0;
}
/*
- handle a new incoming connection
+ try to send the message
*/
-static void messaging_recv_handler(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private)
+static NTSTATUS try_send(struct messaging_rec *rec)
+{
+ struct messaging_context *msg = rec->msg;
+ DATA_BLOB blob;
+ size_t nsent;
+ void *priv;
+ NTSTATUS status;
+
+ blob = data_blob_talloc(rec, NULL, sizeof(rec->header) + rec->data.length);
+ NT_STATUS_HAVE_NO_MEMORY(blob.data);
+
+ memcpy(blob.data, &rec->header, sizeof(rec->header));
+ memcpy(blob.data + sizeof(rec->header), rec->data.data, rec->data.length);
+
+ /* we send with privileges so messages work from any context */
+ priv = root_privileges();
+ status = socket_sendto(msg->sock, &blob, &nsent, 0, rec->path, 0);
+ talloc_free(priv);
+
+ data_blob_free(&blob);
+
+ return status;
+}
+
+/*
+ handle a socket write event
+*/
+static void messaging_send_handler(struct messaging_context *msg)
+{
+ while (msg->pending) {
+ struct messaging_rec *rec = msg->pending;
+ NTSTATUS status;
+ status = try_send(rec);
+ if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
+ break;
+ }
+ if (!NT_STATUS_IS_OK(status)) {
+ DEBUG(1,("messaging: Lost message from %u to %u of type %u - %s\n",
+ rec->header.from, rec->header.to, rec->header.msg_type,
+ nt_errstr(status)));
+ }
+ DLIST_REMOVE(msg->pending, rec);
+ talloc_free(rec);
+ }
+ if (msg->pending == NULL) {
+ EVENT_FD_NOT_WRITEABLE(msg->event.fde);
+ }
+}
+
+/*
+ handle a new incoming packet
+*/
+static void messaging_recv_handler(struct messaging_context *msg)
{
- struct messaging_context *msg = talloc_get_type(private,
- struct messaging_context);
struct messaging_rec *rec;
NTSTATUS status;
uint8_t data[MESSAGING_MAX_SIZE];
@@ -146,7 +193,6 @@ static void messaging_recv_handler(struct event_context *ev, struct fd_event *fd
rec->msg = msg;
rec->path = msg->path;
- rec->sock = NULL;
memcpy(&rec->header, data, sizeof(rec->header));
if (msize != sizeof(rec->header) + rec->header.length) {
@@ -166,6 +212,24 @@ static void messaging_recv_handler(struct event_context *ev, struct fd_event *fd
talloc_free(rec);
}
+
+/*
+ handle a socket event
+*/
+static void messaging_handler(struct event_context *ev, struct fd_event *fde,
+ uint16_t flags, void *private)
+{
+ struct messaging_context *msg = talloc_get_type(private,
+ struct messaging_context);
+ if (flags & EVENT_FD_WRITE) {
+ messaging_send_handler(msg);
+ }
+ if (flags & EVENT_FD_READ) {
+ messaging_recv_handler(msg);
+ }
+}
+
+
/*
Register a dispatch function for a particular message type.
*/
@@ -200,89 +264,11 @@ void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void
}
-
-/*
- handle IO for sending a message
-*/
-static void messaging_send_handler(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private)
-{
- struct messaging_rec *rec = talloc_get_type(private, struct messaging_rec);
- uint8_t data[MESSAGING_MAX_SIZE];
- DATA_BLOB blob;
- size_t nsent;
- NTSTATUS status;
-
- memcpy(data, &rec->header, sizeof(rec->header));
- memcpy(data + sizeof(rec->header), rec->data.data, rec->data.length);
-
- blob.data = data;
- blob.length = sizeof(rec->header) + rec->header.length;
-
- status = socket_send(rec->sock, &blob, &nsent, 0);
- if (NT_STATUS_IS_ERR(status)) {
- DEBUG(3,("Unable to send message of type %d length %d - %s\n",
- rec->header.msg_type,
- rec->header.length,
- nt_errstr(status)));
- talloc_free(rec);
- return;
- }
-
- if (NT_STATUS_IS_OK(status)) {
- talloc_free(rec);
- }
-}
-
-
-/*
- wrapper around socket_connect with raised privileges
-*/
-static NTSTATUS try_connect(struct messaging_rec *rec)
-{
- NTSTATUS status;
- void *priv = root_privileges();
- status = socket_connect(rec->sock, NULL, 0, rec->path, 0, 0);
- talloc_free(priv);
- return status;
-}
-
-
-/*
- when the servers listen queue is full we use this to backoff the message
-*/
-static void messaging_backoff_handler(struct event_context *ev, struct timed_event *te,
- struct timeval t, void *private)
-{
- struct messaging_rec *rec = talloc_get_type(private, struct messaging_rec);
- struct messaging_context *msg = rec->msg;
- NTSTATUS status;
-
- status = try_connect(rec);
- if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
- /* backoff again */
- event_add_timed(msg->event.ev, rec,
- timeval_add(&t, 0, MESSAGING_BACKOFF),
- messaging_backoff_handler, rec);
- return;
- }
-
- if (!NT_STATUS_IS_OK(status)) {
- DEBUG(1,("messaging: Lost message from %u to %u of type %u after backoff - %s\n",
- rec->header.from, rec->header.to, rec->header.msg_type, nt_errstr(status)));
- talloc_free(rec);
- return;
- }
-
- event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
- EVENT_FD_WRITE, messaging_send_handler, rec);
-}
-
-
/*
Send a message to a particular server
*/
-NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, uint32_t msg_type, DATA_BLOB *data)
+NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server,
+ uint32_t msg_type, DATA_BLOB *data)
{
struct messaging_rec *rec;
NTSTATUS status;
@@ -292,45 +278,32 @@ NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, uint32_t
return NT_STATUS_NO_MEMORY;
}
- rec->msg = msg;
- rec->header.version = MESSAGING_VERSION;
+ rec->msg = msg;
+ rec->header.version = MESSAGING_VERSION;
rec->header.msg_type = msg_type;
- rec->header.from = msg->server_id;
- rec->header.to = server;
- rec->header.length = data?data->length:0;
+ rec->header.from = msg->server_id;
+ rec->header.to = server;
+ rec->header.length = data?data->length:0;
if (rec->header.length != 0) {
rec->data = data_blob_talloc(rec, data->data, data->length);
} else {
rec->data = data_blob(NULL, 0);
}
- status = socket_create("unix", SOCKET_TYPE_DGRAM, &rec->sock, 0);
- if (!NT_STATUS_IS_OK(status)) {
- talloc_free(rec);
- return status;
- }
- talloc_steal(rec, rec->sock);
-
rec->path = messaging_path(rec, server);
- status = try_connect(rec);
+ status = try_send(rec);
if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
- /* backoff on this message - the servers listen queue is full */
- event_add_timed(msg->event.ev, rec,
- timeval_current_ofs(0, MESSAGING_BACKOFF),
- messaging_backoff_handler, rec);
+ if (msg->pending == NULL) {
+ EVENT_FD_WRITEABLE(msg->event.fde);
+ }
+ DLIST_ADD(msg->pending, rec);
return NT_STATUS_OK;
}
- if (!NT_STATUS_IS_OK(status)) {
- talloc_free(rec);
- return status;
- }
-
- event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
- EVENT_FD_WRITE, messaging_send_handler, rec);
+ talloc_free(rec);
- return NT_STATUS_OK;
+ return status;
}
/*
@@ -381,6 +354,7 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
msg->path = messaging_path(msg, server_id);
msg->server_id = server_id;
msg->dispatch = NULL;
+ msg->pending = NULL;
status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
if (!NT_STATUS_IS_OK(status)) {
@@ -399,9 +373,12 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
return NULL;
}
+ /* it needs to be non blocking for sends */
+ set_blocking(socket_get_fd(msg->sock), False);
+
msg->event.ev = talloc_reference(msg, ev);
msg->event.fde = event_add_fd(ev, msg, socket_get_fd(msg->sock),
- EVENT_FD_READ, messaging_recv_handler, msg);
+ EVENT_FD_READ, messaging_handler, msg);
talloc_set_destructor(msg, messaging_destructor);
@@ -409,5 +386,3 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
return msg;
}
-
-