summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--source4/lib/messaging/messaging.c49
-rw-r--r--source4/torture/local/messaging.c19
2 files changed, 58 insertions, 10 deletions
diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
index 2f9a43c847..a0aabbbc21 100644
--- a/source4/lib/messaging/messaging.c
+++ b/source4/lib/messaging/messaging.c
@@ -49,8 +49,6 @@ struct dispatch_fn {
/* an individual message */
struct messaging_rec {
- struct messaging_rec *next, *prev;
-
struct messaging_state *msg;
struct socket_context *sock;
struct fd_event *fde;
@@ -324,6 +322,43 @@ static void messaging_send_handler(struct event_context *ev, struct fd_event *fd
/*
+ when the servers listen queue is full we use this to backoff the message
+*/
+static void messaging_backoff_handler(struct event_context *ev, struct timed_event *te, time_t t)
+{
+ struct messaging_rec *rec = te->private;
+ struct messaging_state *msg = rec->msg;
+ NTSTATUS status;
+ struct fd_event fde;
+
+ status = socket_connect(rec->sock, NULL, 0, rec->path, 0, 0);
+ if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
+ /* backoff again */
+ te->next_event = t+1;
+ return;
+ }
+
+ if (!NT_STATUS_IS_OK(status)) {
+ DEBUG(1,("messaging: Lost message from %u to %u of type %u after backoff - %s\n",
+ rec->header.from, rec->header.to, rec->header.msg_type, nt_errstr(status)));
+ talloc_free(rec);
+ return;
+ }
+
+ fde.private = rec;
+ fde.fd = socket_get_fd(rec->sock);
+ fde.flags = EVENT_FD_WRITE;
+ fde.handler = messaging_send_handler;
+
+ rec->fde = event_add_fd(msg->event.ev, &fde);
+
+ talloc_set_destructor(rec, rec_destructor);
+
+ messaging_send_handler(msg->event.ev, rec->fde, 0, EVENT_FD_WRITE);
+}
+
+
+/*
Send a message to a particular server
*/
NTSTATUS messaging_send(void *msg_ctx, servid_t server, uint32_t msg_type, DATA_BLOB *data)
@@ -361,6 +396,16 @@ NTSTATUS messaging_send(void *msg_ctx, servid_t server, uint32_t msg_type, DATA_
rec->path = messaging_path(rec, server);
status = socket_connect(rec->sock, NULL, 0, rec->path, 0, 0);
+ if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
+ /* backoff on this message - the servers listen queue is full */
+ struct timed_event te;
+ te.next_event = time(NULL)+1;
+ te.handler = messaging_backoff_handler;
+ te.private = rec;
+ event_add_timed(msg->event.ev, &te);
+ return NT_STATUS_OK;
+ }
+
if (!NT_STATUS_IS_OK(status)) {
talloc_free(rec);
return status;
diff --git a/source4/torture/local/messaging.c b/source4/torture/local/messaging.c
index d4aaf80f4f..6177bc82a8 100644
--- a/source4/torture/local/messaging.c
+++ b/source4/torture/local/messaging.c
@@ -28,9 +28,10 @@ static void ping_message(void *msg_ctx, void *private,
uint32_t msg_type, servid_t src, DATA_BLOB *data)
{
NTSTATUS status;
- do {
- status = messaging_send(msg_ctx, src, MY_PONG, data);
- } while (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES));
+ status = messaging_send(msg_ctx, src, MY_PONG, data);
+ if (!NT_STATUS_IS_OK(status)) {
+ printf("pong failed - %s\n", nt_errstr(status));
+ }
}
static void pong_message(void *msg_ctx, void *private,
@@ -85,15 +86,17 @@ static BOOL test_ping_speed(TALLOC_CTX *mem_ctx)
status1 = messaging_send(msg_ctx, 1, MY_PING, &data);
status2 = messaging_send(msg_ctx, 1, MY_PING, NULL);
- if (NT_STATUS_IS_OK(status1)) {
- ping_count++;
+ if (!NT_STATUS_IS_OK(status1)) {
+ printf("msg1 failed - %s\n", nt_errstr(status1));
}
- if (NT_STATUS_IS_OK(status2)) {
- ping_count++;
+ if (!NT_STATUS_IS_OK(status2)) {
+ printf("msg2 failed - %s\n", nt_errstr(status2));
}
- while (ping_count > pong_count + 10) {
+ ping_count += 2;
+
+ while (ping_count > pong_count + 20) {
event_loop_once(ev);
}
}