From 7282ddda0a38139fa457e6451f100f6d1792d927 Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Sun, 1 May 2005 18:49:07 +0000 Subject: r6561: re-did the internal message system based on DGRAM unix domain sockets. This gains us about 40% in messaging speed. (This used to be commit f244a64ed537447e44229172427b5b6a5c64800c) --- source4/lib/messaging/messaging.c | 202 ++++++++++++-------------------------- 1 file changed, 64 insertions(+), 138 deletions(-) diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c index 8127e7e8fc..c95028bea5 100644 --- a/source4/lib/messaging/messaging.c +++ b/source4/lib/messaging/messaging.c @@ -34,10 +34,13 @@ /* the number of microseconds to backoff in retrying to send a message */ #define MESSAGING_BACKOFF 250000 +/* maximum message size */ +#define MESSAGING_MAX_SIZE 512 + struct messaging_context { uint32_t server_id; struct socket_context *sock; - char *path; + const char *path; struct dispatch_fn *dispatch; struct { @@ -60,7 +63,6 @@ struct dispatch_fn { struct messaging_rec { struct messaging_context *msg; struct socket_context *sock; - struct fd_event *fde; const char *path; struct { @@ -72,8 +74,6 @@ struct messaging_rec { } header; DATA_BLOB data; - - uint32_t ndone; }; /* @@ -112,112 +112,58 @@ static void messaging_dispatch(struct messaging_context *msg, struct messaging_r } } - /* we don't free the record itself here as there may - be more messages from this client */ - data_blob_free(&rec->data); rec->header.length = 0; - rec->ndone = 0; } /* - handle IO for a single message + handle a new incoming connection */ static void messaging_recv_handler(struct event_context *ev, struct fd_event *fde, uint16_t flags, void *private) { - struct messaging_rec *rec = talloc_get_type(private, struct messaging_rec); - struct messaging_context *msg = rec->msg; + struct messaging_context *msg = talloc_get_type(private, + struct messaging_context); + struct messaging_rec *rec; NTSTATUS status; + uint8_t data[MESSAGING_MAX_SIZE]; + size_t msize; - if (rec->ndone < sizeof(rec->header)) { - /* receive the header */ - size_t nread; - - status = socket_recv(rec->sock, - rec->ndone + (char *)&rec->header, - sizeof(rec->header) - rec->ndone, &nread, 0); - if (NT_STATUS_IS_ERR(status)) { - talloc_free(rec); - return; - } - - if (nread == 0) { - return; - } - - rec->ndone += nread; - - if (rec->ndone == sizeof(rec->header)) { - if (rec->header.version != MESSAGING_VERSION) { - DEBUG(0,("meessage with wrong version %u\n", - rec->header.version)); - talloc_free(rec); - } - rec->data = data_blob_talloc(rec, NULL, rec->header.length); - if (rec->data.length != rec->header.length) { - DEBUG(0,("Unable to allocate message of size %u\n", - rec->header.length)); - talloc_free(rec); - } - } - } - - if (rec->ndone >= sizeof(rec->header) && - rec->ndone < sizeof(rec->header) + rec->header.length) { - /* receive the body, if any */ - size_t nread; - - status = socket_recv(rec->sock, - rec->data.data + (rec->ndone - sizeof(rec->header)), - sizeof(rec->header) + rec->header.length - rec->ndone, - &nread, 0); - if (NT_STATUS_IS_ERR(status)) { - talloc_free(rec); - return; - } - - if (nread == 0) { - return; - } - - rec->ndone += nread; + status = socket_recv(msg->sock, data, sizeof(data), &msize, 0); + if (!NT_STATUS_IS_OK(status)) { + return; } - if (rec->ndone == sizeof(rec->header) + rec->header.length) { - /* we've got the whole message */ - messaging_dispatch(msg, rec); + if (msize < sizeof(rec->header)) { + DEBUG(0,("messaging: bad message of size %d\n", msize)); + return; } -} - -/* - handle a new incoming connection -*/ -static void messaging_listen_handler(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private) -{ - struct messaging_context *msg = talloc_get_type(private, - struct messaging_context); - struct messaging_rec *rec; - NTSTATUS status; rec = talloc(msg, struct messaging_rec); if (rec == NULL) { smb_panic("Unable to allocate messaging_rec"); } - status = socket_accept(msg->sock, &rec->sock); - if (!NT_STATUS_IS_OK(status)) { - smb_panic("Unable to accept messaging_rec"); - } - talloc_steal(rec, rec->sock); - rec->msg = msg; - rec->ndone = 0; - rec->header.length = 0; rec->path = msg->path; - rec->fde = event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock), - EVENT_FD_READ, messaging_recv_handler, rec); + rec->sock = NULL; + + memcpy(&rec->header, data, sizeof(rec->header)); + if (msize != sizeof(rec->header) + rec->header.length) { + DEBUG(0,("messaging: bad message header size %d should be %d\n", + rec->header.length, msize - sizeof(rec->header))); + talloc_free(rec); + return; + } + + rec->data = data_blob_talloc(rec, data, rec->header.length); + if (rec->data.data == NULL) { + talloc_free(rec); + return; + } + + messaging_dispatch(msg, rec); + talloc_free(rec); } /* @@ -262,49 +208,28 @@ static void messaging_send_handler(struct event_context *ev, struct fd_event *fd uint16_t flags, void *private) { struct messaging_rec *rec = talloc_get_type(private, struct messaging_rec); + uint8_t data[MESSAGING_MAX_SIZE]; + DATA_BLOB blob; + size_t nsent; NTSTATUS status; - if (rec->ndone < sizeof(rec->header)) { - /* send the header */ - size_t nsent; - DATA_BLOB blob; - - blob.data = rec->ndone + (uint8_t *)&rec->header; - blob.length = sizeof(rec->header) - rec->ndone; - - status = socket_send(rec->sock, &blob, &nsent, 0); - if (NT_STATUS_IS_ERR(status)) { - talloc_free(rec); - return; - } - - if (nsent == 0) { - return; - } - - rec->ndone += nsent; - } - - if (rec->ndone >= sizeof(rec->header) && - rec->ndone < sizeof(rec->header) + rec->header.length) { - /* send the body, if any */ - DATA_BLOB blob; - size_t nsent; - - blob.data = rec->data.data + (rec->ndone - sizeof(rec->header)); - blob.length = rec->header.length - (rec->ndone - sizeof(rec->header)); + memcpy(data, &rec->header, sizeof(rec->header)); + memcpy(data + sizeof(rec->header), rec->data.data, rec->data.length); - status = socket_send(rec->sock, &blob, &nsent, 0); - if (NT_STATUS_IS_ERR(status)) { - talloc_free(rec); - return; - } + blob.data = data; + blob.length = sizeof(rec->header) + rec->header.length; - rec->ndone += nsent; + status = socket_send(rec->sock, &blob, &nsent, 0); + if (NT_STATUS_IS_ERR(status)) { + DEBUG(3,("Unable to send message of type %d length %d - %s\n", + rec->header.msg_type, + rec->header.length, + nt_errstr(status))); + talloc_free(rec); + return; } - if (rec->ndone == sizeof(rec->header) + rec->header.length) { - /* we've done the whole message */ + if (NT_STATUS_IS_OK(status)) { talloc_free(rec); } } @@ -349,8 +274,8 @@ static void messaging_backoff_handler(struct event_context *ev, struct timed_eve return; } - rec->fde = event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock), - EVENT_FD_WRITE, messaging_send_handler, rec); + event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock), + EVENT_FD_WRITE, messaging_send_handler, rec); } @@ -378,9 +303,8 @@ NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, uint32_t } else { rec->data = data_blob(NULL, 0); } - rec->ndone = 0; - status = socket_create("unix", SOCKET_TYPE_STREAM, &rec->sock, 0); + status = socket_create("unix", SOCKET_TYPE_DGRAM, &rec->sock, 0); if (!NT_STATUS_IS_OK(status)) { talloc_free(rec); return status; @@ -403,8 +327,8 @@ NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, uint32_t return status; } - rec->fde = event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock), - EVENT_FD_WRITE, messaging_send_handler, rec); + event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock), + EVENT_FD_WRITE, messaging_send_handler, rec); return NT_STATUS_OK; } @@ -437,10 +361,12 @@ static int messaging_destructor(void *ptr) /* create the listening socket and setup the dispatcher */ -struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id, struct event_context *ev) +struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id, + struct event_context *ev) { struct messaging_context *msg; NTSTATUS status; + char *path; msg = talloc(mem_ctx, struct messaging_context); if (msg == NULL) { @@ -448,15 +374,15 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id } /* create the messaging directory if needed */ - msg->path = smbd_tmp_path(msg, "messaging"); - mkdir(msg->path, 0700); - talloc_free(msg->path); + path = smbd_tmp_path(msg, "messaging"); + mkdir(path, 0700); + talloc_free(path); + msg->path = messaging_path(msg, server_id); msg->server_id = server_id; msg->dispatch = NULL; - msg->path = messaging_path(msg, server_id); - status = socket_create("unix", SOCKET_TYPE_STREAM, &msg->sock, 0); + status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0); if (!NT_STATUS_IS_OK(status)) { talloc_free(msg); return NULL; @@ -475,7 +401,7 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id msg->event.ev = talloc_reference(msg, ev); msg->event.fde = event_add_fd(ev, msg, socket_get_fd(msg->sock), - EVENT_FD_READ, messaging_listen_handler, msg); + EVENT_FD_READ, messaging_recv_handler, msg); talloc_set_destructor(msg, messaging_destructor); -- cgit