summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--source4/lib/messaging/messaging.c42
1 files changed, 41 insertions, 1 deletions
diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
index 58b5e5243e..ab94a30ace 100644
--- a/source4/lib/messaging/messaging.c
+++ b/source4/lib/messaging/messaging.c
@@ -48,11 +48,12 @@ struct messaging_context {
uint32_t num_types;
struct idr_context *dispatch_tree;
struct messaging_rec *pending;
+ struct messaging_rec *retry_queue;
struct irpc_list *irpc;
struct idr_context *idr;
const char **names;
struct timeval start_time;
-
+ struct timed_event *retry_te;
struct {
struct event_context *ev;
struct fd_event *fde;
@@ -83,6 +84,7 @@ struct messaging_rec {
} *header;
DATA_BLOB packet;
+ uint32_t retries;
};
@@ -168,6 +170,7 @@ static void cluster_message_handler(struct messaging_context *msg, DATA_BLOB pac
rec->path = msg->path;
rec->header = (struct messaging_header *)packet.data;
rec->packet = packet;
+ rec->retries = 0;
if (packet.length != sizeof(*rec->header) + rec->header->length) {
DEBUG(0,("messaging: bad message header size %d should be %d\n",
@@ -211,6 +214,26 @@ static NTSTATUS try_send(struct messaging_rec *rec)
}
/*
+ retry backed off messages
+*/
+static void msg_retry_timer(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private)
+{
+ struct messaging_context *msg = talloc_get_type(private,
+ struct messaging_context);
+ msg->retry_te = NULL;
+
+ /* put the messages back on the main queue */
+ while (msg->retry_queue) {
+ struct messaging_rec *rec = msg->retry_queue;
+ DLIST_REMOVE(msg->retry_queue, rec);
+ DLIST_ADD_END(msg->pending, rec, struct messaging_rec *);
+ }
+
+ EVENT_FD_WRITEABLE(msg->event.fde);
+}
+
+/*
handle a socket write event
*/
static void messaging_send_handler(struct messaging_context *msg)
@@ -220,8 +243,23 @@ static void messaging_send_handler(struct messaging_context *msg)
NTSTATUS status;
status = try_send(rec);
if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
+ rec->retries++;
+ if (rec->retries > 3) {
+ /* we're getting continuous write errors -
+ backoff this record */
+ DLIST_REMOVE(msg->pending, rec);
+ DLIST_ADD_END(msg->retry_queue, rec,
+ struct messaging_rec *);
+ if (msg->retry_te == NULL) {
+ msg->retry_te =
+ event_add_timed(msg->event.ev, msg,
+ timeval_current_ofs(1, 0),
+ msg_retry_timer, msg);
+ }
+ }
break;
}
+ rec->retries = 0;
if (!NT_STATUS_IS_OK(status)) {
DEBUG(1,("messaging: Lost message from %u to %u of type %u - %s\n",
rec->header->from.id, rec->header->to.id, rec->header->msg_type,
@@ -281,6 +319,7 @@ static void messaging_recv_handler(struct messaging_context *msg)
rec->path = msg->path;
rec->header = (struct messaging_header *)packet.data;
rec->packet = packet;
+ rec->retries = 0;
if (msize != sizeof(*rec->header) + rec->header->length) {
DEBUG(0,("messaging: bad message header size %d should be %d\n",
@@ -415,6 +454,7 @@ NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server,
return NT_STATUS_NO_MEMORY;
}
+ rec->retries = 0;
rec->msg = msg;
rec->header = (struct messaging_header *)rec->packet.data;
rec->header->version = MESSAGING_VERSION;