From 6591a226144d371a6b68fc5e7201a90a77dc9153 Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Sun, 17 Oct 2004 10:04:49 +0000 Subject: r3016: - converted the events code to talloc - added the new messaging system, based on unix domain sockets. It gets over 10k messages/second on my laptop without any socket cacheing, which is better than I expected. - added a LOCAL-MESSAGING torture test (This used to be commit 3af06478da7ab34a272226d8d9ac87e0a4940cfb) --- source4/lib/events.c | 46 +--- source4/lib/messaging/config.m4 | 1 + source4/lib/messaging/config.mk | 8 + source4/lib/messaging/messaging.c | 445 ++++++++++++++++++++++++++++++++++++++ source4/lib/socket/socket_unix.c | 10 + 5 files changed, 473 insertions(+), 37 deletions(-) create mode 100644 source4/lib/messaging/config.m4 create mode 100644 source4/lib/messaging/config.mk create mode 100644 source4/lib/messaging/messaging.c (limited to 'source4/lib') diff --git a/source4/lib/events.c b/source4/lib/events.c index 4ae7cad8af..0e98b42503 100644 --- a/source4/lib/events.c +++ b/source4/lib/events.c @@ -71,11 +71,11 @@ call, and all subsequent calls pass this event_context as the first element. Event handlers also receive this as their first argument. */ -struct event_context *event_context_init(void) +struct event_context *event_context_init(TALLOC_CTX *mem_ctx) { struct event_context *ev; - ev = malloc(sizeof(*ev)); + ev = talloc_p(mem_ctx, struct event_context); if (!ev) return NULL; /* start off with no events */ @@ -91,40 +91,12 @@ struct event_context *event_context_init(void) */ void event_context_destroy(struct event_context *ev) { - struct fd_event *fde; - struct timed_event *te; - struct loop_event *le; - ev->ref_count--; if (ev->ref_count != 0) { return; } - for (fde=ev->fd_events; fde;) { - struct fd_event *next = fde->next; - event_remove_fd(ev, fde); - if (fde->ref_count == 0) { - free(fde); - } - fde=next; - } - for (te=ev->timed_events; te;) { - struct timed_event *next = te->next; - event_remove_timed(ev, te); - if (te->ref_count == 0) { - free(te); - } - te=next; - } - for (le=ev->loop_events; le;) { - struct loop_event *next = le->next; - event_remove_loop(ev, le); - if (le->ref_count == 0) { - free(le); - } - le=next; - } - free(ev); + talloc_free(ev); } @@ -175,7 +147,7 @@ struct event_context * event_context_merge(struct event_context *ev, struct even */ struct fd_event *event_add_fd(struct event_context *ev, struct fd_event *e) { - e = memdup(e, sizeof(*e)); + e = talloc_memdup(ev, e, sizeof(*e)); if (!e) return NULL; DLIST_ADD(ev->fd_events, e); e->ref_count = 1; @@ -245,7 +217,7 @@ void event_remove_fd_all_handler(struct event_context *ev, void *handler) */ struct timed_event *event_add_timed(struct event_context *ev, struct timed_event *e) { - e = memdup(e, sizeof(*e)); + e = talloc_memdup(ev, e, sizeof(*e)); if (!e) return NULL; e->ref_count = 1; DLIST_ADD(ev->timed_events, e); @@ -274,7 +246,7 @@ BOOL event_remove_timed(struct event_context *ev, struct timed_event *e1) */ struct loop_event *event_add_loop(struct event_context *ev, struct loop_event *e) { - e = memdup(e, sizeof(*e)); + e = talloc_memdup(ev, e, sizeof(*e)); if (!e) return NULL; e->ref_count = 1; DLIST_ADD(ev->loop_events, e); @@ -330,7 +302,7 @@ int event_loop_once(struct event_context *ev) struct loop_event *next = le->next; if (le->ref_count == 0) { DLIST_REMOVE(ev->loop_events, le); - free(le); + talloc_unlink(ev, le); } else { le->ref_count++; le->handler(ev, le, t); @@ -351,7 +323,7 @@ int event_loop_once(struct event_context *ev) if (ev->maxfd == fe->fd) { ev->maxfd = EVENT_INVALID_MAXFD; } - free(fe); + talloc_unlink(ev, fe); } else { if (fe->flags & EVENT_FD_READ) { FD_SET(fe->fd, &r_fds); @@ -432,7 +404,7 @@ int event_loop_once(struct event_context *ev) struct timed_event *next = te->next; if (te->ref_count == 0) { DLIST_REMOVE(ev->timed_events, te); - free(te); + talloc_unlink(ev, te); } else if (te->next_event <= t) { te->ref_count++; te->handler(ev, te, t); diff --git a/source4/lib/messaging/config.m4 b/source4/lib/messaging/config.m4 new file mode 100644 index 0000000000..1797069ff2 --- /dev/null +++ b/source4/lib/messaging/config.m4 @@ -0,0 +1 @@ +SMB_SUBSYSTEM_MK(MESSAGING,lib/messaging/config.mk) diff --git a/source4/lib/messaging/config.mk b/source4/lib/messaging/config.mk new file mode 100644 index 0000000000..6c30894779 --- /dev/null +++ b/source4/lib/messaging/config.mk @@ -0,0 +1,8 @@ + +################################################ +# Start SUBSYSTEM MESSAGING +[SUBSYSTEM::MESSAGING] +INIT_OBJ_FILES = \ + lib/messaging/messaging.o +# End SUBSYSTEM MESSAGING +################################################ diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c new file mode 100644 index 0000000000..f9caf5071c --- /dev/null +++ b/source4/lib/messaging/messaging.c @@ -0,0 +1,445 @@ +/* + Unix SMB/CIFS implementation. + + Samba internal messaging functions + + Copyright (C) Andrew Tridgell 2004 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +*/ + +#include "includes.h" + +/* change the message version with any incompatible changes in the protocol */ +#define MESSAGING_VERSION 1 + +struct messaging_state { + servid_t server_id; + struct socket_context *sock; + char *path; + struct dispatch_fn *dispatch; + + struct { + struct event_context *ev; + struct fd_event *fde; + } event; +}; + +/* we have a linked list of dispatch handlers that this messaging + server can deal with */ +struct dispatch_fn { + struct dispatch_fn *next, *prev; + uint32_t msg_type; + void *private; + void (*fn)(void *msg_ctx, void *private, + uint32_t msg_type, servid_t server_id, DATA_BLOB *data); +}; + +/* an individual message */ +struct messaging_rec { + struct messaging_rec *next, *prev; + + struct messaging_state *msg; + struct socket_context *sock; + struct fd_event *fde; + const char *path; + + struct { + uint32_t version; + uint32_t msg_type; + servid_t from; + servid_t to; + uint32_t length; + } header; + + DATA_BLOB data; + + uint32_t ndone; +}; + +/* + A useful function for testing the message system. +*/ +static void ping_message(void *msg_ctx, void *private, + uint32_t msg_type, servid_t src, DATA_BLOB *data) +{ + DEBUG(1,("INFO: Received PING message from server %u [%.*s]\n", + (uint_t)src, data->length, data->data?(const char *)data->data:"")); + messaging_send(msg_ctx, src, MSG_PONG, data); +} + +/* + return the path to a messaging socket +*/ +static char *messaging_path(TALLOC_CTX *mem_ctx, servid_t server_id) +{ + char *name = talloc_asprintf(mem_ctx, "messaging/msg.%u", (unsigned)server_id); + char *ret; + ret = lock_path(mem_ctx, name); + talloc_free(name); + return ret; +} + +/* + dispatch a fully received message +*/ +static void messaging_dispatch(struct messaging_state *msg, struct messaging_rec *rec) +{ + struct dispatch_fn *d; + for (d=msg->dispatch;d;d=d->next) { + if (d->msg_type == rec->header.msg_type) { + d->fn(msg, d->private, d->msg_type, rec->header.from, &rec->data); + } + } + + /* we don't free the record itself here as there may + be more messages from this client */ + data_blob_free(&rec->data); + rec->header.length = 0; + rec->ndone = 0; +} + + +/* + handle IO for a single message +*/ +static void messaging_recv_handler(struct event_context *ev, struct fd_event *fde, + time_t t, uint16_t flags) +{ + struct messaging_rec *rec = fde->private; + struct messaging_state *msg = rec->msg; + NTSTATUS status; + + if (rec->ndone < sizeof(rec->header)) { + /* receive the header */ + DATA_BLOB blob; + status = socket_recv(rec->sock, rec, + &blob, sizeof(rec->header) - rec->ndone, 0); + if (NT_STATUS_IS_ERR(status)) { + talloc_free(rec); + return; + } + + if (blob.length == 0) { + return; + } + + memcpy(rec->ndone + (char *)&rec->header, blob.data, blob.length); + rec->ndone += blob.length; + data_blob_free(&blob); + + if (rec->ndone == sizeof(rec->header)) { + if (rec->header.version != MESSAGING_VERSION) { + DEBUG(0,("meessage with wrong version %u\n", + rec->header.version)); + talloc_free(rec); + } + rec->data = data_blob_talloc(rec, NULL, rec->header.length); + if (rec->data.length != rec->header.length) { + DEBUG(0,("Unable to allocate message of size %u\n", + rec->header.length)); + talloc_free(rec); + } + } + } + + if (rec->ndone >= sizeof(rec->header) && + rec->ndone < sizeof(rec->header) + rec->header.length) { + /* receive the body, if any */ + DATA_BLOB blob; + status = socket_recv(rec->sock, rec, + &blob, sizeof(rec->header) + rec->header.length - rec->ndone, 0); + if (NT_STATUS_IS_ERR(status)) { + talloc_free(rec); + return; + } + + if (blob.length == 0) { + return; + } + + memcpy(rec->data.data + (rec->ndone - sizeof(rec->header)), + blob.data, blob.length); + + rec->ndone += blob.length; + } + + if (rec->ndone == sizeof(rec->header) + rec->header.length) { + /* we've got the whole message */ + messaging_dispatch(msg, rec); + } +} + +/* + destroy a messaging record +*/ +static int rec_destructor(void *ptr) +{ + struct messaging_rec *rec = ptr; + struct messaging_state *msg = rec->msg; + event_remove_fd(msg->event.ev, rec->fde); + return 0; +} + +/* + handle a new incoming connection +*/ +static void messaging_listen_handler(struct event_context *ev, struct fd_event *fde, + time_t t, uint16_t flags) +{ + struct messaging_state *msg = fde->private; + struct messaging_rec *rec; + NTSTATUS status; + struct fd_event fde2; + + rec = talloc_p(msg, struct messaging_rec); + if (rec == NULL) { + smb_panic("Unable to allocate messaging_rec"); + } + + status = socket_accept(msg->sock, &rec->sock, 0); + if (!NT_STATUS_IS_OK(status)) { + smb_panic("Unable to accept messaging_rec"); + } + talloc_steal(rec, rec->sock); + + rec->msg = msg; + rec->ndone = 0; + rec->header.length = 0; + rec->path = msg->path; + + fde2.private = rec; + fde2.fd = socket_get_fd(rec->sock); + fde2.flags = EVENT_FD_READ; + fde2.handler = messaging_recv_handler; + + rec->fde = event_add_fd(msg->event.ev, &fde2); + + talloc_set_destructor(rec, rec_destructor); +} + +/* + Register a dispatch function for a particular message type. +*/ +void messaging_register(void *ctx, void *private, + uint32_t msg_type, + void (*fn)(void *, void *, uint32_t, servid_t, DATA_BLOB *)) +{ + struct messaging_state *msg = ctx; + struct dispatch_fn *d; + + d = talloc_p(msg, struct dispatch_fn); + d->msg_type = msg_type; + d->private = private; + d->fn = fn; + DLIST_ADD(msg->dispatch, d); +} + +/* + De-register the function for a particular message type. +*/ +void messaging_deregister(void *ctx, uint32_t msg_type) +{ + struct messaging_state *msg = ctx; + struct dispatch_fn *d, *next; + + for (d = msg->dispatch; d; d = next) { + next = d->next; + if (d->msg_type == msg_type) { + DLIST_REMOVE(msg->dispatch, d); + talloc_free(d); + } + } +} + + + +/* + handle IO for sending a message +*/ +static void messaging_send_handler(struct event_context *ev, struct fd_event *fde, + time_t t, uint16_t flags) +{ + struct messaging_rec *rec = fde->private; + NTSTATUS status; + + if (rec->ndone < sizeof(rec->header)) { + /* send the header */ + size_t nsent; + DATA_BLOB blob; + + blob.data = rec->ndone + (char *)&rec->header; + blob.length = sizeof(rec->header) - rec->ndone; + + status = socket_send(rec->sock, rec, &blob, &nsent, 0); + if (NT_STATUS_IS_ERR(status)) { + talloc_free(rec); + return; + } + + if (nsent == 0) { + return; + } + + rec->ndone += nsent; + } + + if (rec->ndone >= sizeof(rec->header) && + rec->ndone < sizeof(rec->header) + rec->header.length) { + /* send the body, if any */ + DATA_BLOB blob; + size_t nsent; + + blob.data = rec->data.data + (rec->ndone - sizeof(rec->header)); + blob.length = rec->header.length - (rec->ndone - sizeof(rec->header)); + + status = socket_send(rec->sock, rec, &blob, &nsent, 0); + if (NT_STATUS_IS_ERR(status)) { + talloc_free(rec); + return; + } + + rec->ndone += nsent; + } + + if (rec->ndone == sizeof(rec->header) + rec->header.length) { + /* we've done the whole message */ + talloc_free(rec); + } +} + + +/* + Send a message to a particular server +*/ +NTSTATUS messaging_send(void *msg_ctx, servid_t server, uint32_t msg_type, DATA_BLOB *data) +{ + struct messaging_state *msg = msg_ctx; + struct messaging_rec *rec; + NTSTATUS status; + struct fd_event fde; + + rec = talloc_p(msg, struct messaging_rec); + if (rec == NULL) { + return NT_STATUS_NO_MEMORY; + } + + rec->msg = msg; + rec->header.version = MESSAGING_VERSION; + rec->header.msg_type = msg_type; + rec->header.from = msg->server_id; + rec->header.to = server; + rec->header.length = data?data->length:0; + if (rec->header.length != 0) { + rec->data = data_blob_talloc(rec, data->data, data->length); + } else { + rec->data = data_blob(NULL, 0); + } + rec->ndone = 0; + + status = socket_create("unix", SOCKET_TYPE_STREAM, &rec->sock, 0); + if (!NT_STATUS_IS_OK(status)) { + talloc_free(rec); + return status; + } + talloc_steal(rec, rec->sock); + + rec->path = messaging_path(rec, server); + + status = socket_connect(rec->sock, NULL, 0, rec->path, 0, 0); + if (!NT_STATUS_IS_OK(status)) { + talloc_free(rec); + return status; + } + + fde.private = rec; + fde.fd = socket_get_fd(rec->sock); + fde.flags = EVENT_FD_WRITE; + fde.handler = messaging_send_handler; + + rec->fde = event_add_fd(msg->event.ev, &fde); + + talloc_set_destructor(rec, rec_destructor); + + return NT_STATUS_OK; +} + + +/* + destroy the messaging context +*/ +static int messaging_destructor(void *msg_ctx) +{ + struct messaging_state *msg = msg_ctx; + event_remove_fd(msg->event.ev, msg->event.fde); + return 0; +} + +/* + create the listening socket and setup the dispatcher +*/ +void *messaging_init(TALLOC_CTX *mem_ctx, servid_t server_id, struct event_context *ev) +{ + struct messaging_state *msg; + NTSTATUS status; + struct fd_event fde; + + msg = talloc_p(mem_ctx, struct messaging_state); + if (msg == NULL) { + return NULL; + } + + /* create the messaging directory if needed */ + msg->path = lock_path(msg, "messaging"); + mkdir(msg->path, 0700); + talloc_free(msg->path); + + msg->server_id = server_id; + msg->dispatch = NULL; + msg->path = messaging_path(msg, server_id); + + status = socket_create("unix", SOCKET_TYPE_STREAM, &msg->sock, 0); + if (!NT_STATUS_IS_OK(status)) { + talloc_free(msg); + return NULL; + } + + /* by stealing here we ensure that the socket is cleaned up (and even + deleted) on exit */ + talloc_steal(msg, msg->sock); + + status = socket_listen(msg->sock, msg->path, 0, 50, 0); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0,("Unable to setup messaging listener for '%s'\n", msg->path)); + talloc_free(msg); + return NULL; + } + + fde.private = msg; + fde.fd = socket_get_fd(msg->sock); + fde.flags = EVENT_FD_READ; + fde.handler = messaging_listen_handler; + + msg->event.ev = ev; + msg->event.fde = event_add_fd(ev, &fde); + + talloc_set_destructor(msg, messaging_destructor); + + messaging_register(msg, NULL, MSG_PING, ping_message); + + return msg; +} + + diff --git a/source4/lib/socket/socket_unix.c b/source4/lib/socket/socket_unix.c index d87eaf49c4..7e169c47a7 100644 --- a/source4/lib/socket/socket_unix.c +++ b/source4/lib/socket/socket_unix.c @@ -29,6 +29,7 @@ static NTSTATUS unixdom_init(struct socket_context *sock) if (sock->fd == -1) { return NT_STATUS_INSUFFICIENT_RESOURCES; } + sock->private_data = NULL; return NT_STATUS_OK; } @@ -36,6 +37,11 @@ static NTSTATUS unixdom_init(struct socket_context *sock) static void unixdom_close(struct socket_context *sock) { close(sock->fd); + /* if we were listening, then don't leave the socket lying + around in the filesystem */ + if (sock->private_data) { + unlink((const char *)sock->private_data); + } } static NTSTATUS unixdom_connect(struct socket_context *sock, @@ -82,6 +88,9 @@ static NTSTATUS unixdom_listen(struct socket_context *sock, return NT_STATUS_INVALID_PARAMETER; } + /* delete if it already exists */ + unlink(my_address); + ZERO_STRUCT(my_addr); my_addr.sun_family = AF_UNIX; strncpy(my_addr.sun_path, my_address, sizeof(my_addr.sun_path)); @@ -104,6 +113,7 @@ static NTSTATUS unixdom_listen(struct socket_context *sock, } sock->state = SOCKET_STATE_SERVER_LISTEN; + sock->private_data = (void *)talloc_strdup(sock, my_address); return NT_STATUS_OK; } -- cgit