summaryrefslogtreecommitdiff
path: root/source4/lib
diff options
context:
space:
mode:
authorAndrew Tridgell <tridge@samba.org>2004-10-17 10:04:49 +0000
committerGerald (Jerry) Carter <jerry@samba.org>2007-10-10 12:59:57 -0500
commit6591a226144d371a6b68fc5e7201a90a77dc9153 (patch)
tree072ee4cbf33f0469f23e424517d17b79c750852d /source4/lib
parent844de2b65c931120a2408365c00fa80cb65959fc (diff)
downloadsamba-6591a226144d371a6b68fc5e7201a90a77dc9153.tar.gz
samba-6591a226144d371a6b68fc5e7201a90a77dc9153.tar.bz2
samba-6591a226144d371a6b68fc5e7201a90a77dc9153.zip
r3016: - converted the events code to talloc
- added the new messaging system, based on unix domain sockets. It gets over 10k messages/second on my laptop without any socket cacheing, which is better than I expected. - added a LOCAL-MESSAGING torture test (This used to be commit 3af06478da7ab34a272226d8d9ac87e0a4940cfb)
Diffstat (limited to 'source4/lib')
-rw-r--r--source4/lib/events.c46
-rw-r--r--source4/lib/messaging/config.m41
-rw-r--r--source4/lib/messaging/config.mk8
-rw-r--r--source4/lib/messaging/messaging.c445
-rw-r--r--source4/lib/socket/socket_unix.c10
5 files changed, 473 insertions, 37 deletions
diff --git a/source4/lib/events.c b/source4/lib/events.c
index 4ae7cad8af..0e98b42503 100644
--- a/source4/lib/events.c
+++ b/source4/lib/events.c
@@ -71,11 +71,11 @@
call, and all subsequent calls pass this event_context as the first
element. Event handlers also receive this as their first argument.
*/
-struct event_context *event_context_init(void)
+struct event_context *event_context_init(TALLOC_CTX *mem_ctx)
{
struct event_context *ev;
- ev = malloc(sizeof(*ev));
+ ev = talloc_p(mem_ctx, struct event_context);
if (!ev) return NULL;
/* start off with no events */
@@ -91,40 +91,12 @@ struct event_context *event_context_init(void)
*/
void event_context_destroy(struct event_context *ev)
{
- struct fd_event *fde;
- struct timed_event *te;
- struct loop_event *le;
-
ev->ref_count--;
if (ev->ref_count != 0) {
return;
}
- for (fde=ev->fd_events; fde;) {
- struct fd_event *next = fde->next;
- event_remove_fd(ev, fde);
- if (fde->ref_count == 0) {
- free(fde);
- }
- fde=next;
- }
- for (te=ev->timed_events; te;) {
- struct timed_event *next = te->next;
- event_remove_timed(ev, te);
- if (te->ref_count == 0) {
- free(te);
- }
- te=next;
- }
- for (le=ev->loop_events; le;) {
- struct loop_event *next = le->next;
- event_remove_loop(ev, le);
- if (le->ref_count == 0) {
- free(le);
- }
- le=next;
- }
- free(ev);
+ talloc_free(ev);
}
@@ -175,7 +147,7 @@ struct event_context * event_context_merge(struct event_context *ev, struct even
*/
struct fd_event *event_add_fd(struct event_context *ev, struct fd_event *e)
{
- e = memdup(e, sizeof(*e));
+ e = talloc_memdup(ev, e, sizeof(*e));
if (!e) return NULL;
DLIST_ADD(ev->fd_events, e);
e->ref_count = 1;
@@ -245,7 +217,7 @@ void event_remove_fd_all_handler(struct event_context *ev, void *handler)
*/
struct timed_event *event_add_timed(struct event_context *ev, struct timed_event *e)
{
- e = memdup(e, sizeof(*e));
+ e = talloc_memdup(ev, e, sizeof(*e));
if (!e) return NULL;
e->ref_count = 1;
DLIST_ADD(ev->timed_events, e);
@@ -274,7 +246,7 @@ BOOL event_remove_timed(struct event_context *ev, struct timed_event *e1)
*/
struct loop_event *event_add_loop(struct event_context *ev, struct loop_event *e)
{
- e = memdup(e, sizeof(*e));
+ e = talloc_memdup(ev, e, sizeof(*e));
if (!e) return NULL;
e->ref_count = 1;
DLIST_ADD(ev->loop_events, e);
@@ -330,7 +302,7 @@ int event_loop_once(struct event_context *ev)
struct loop_event *next = le->next;
if (le->ref_count == 0) {
DLIST_REMOVE(ev->loop_events, le);
- free(le);
+ talloc_unlink(ev, le);
} else {
le->ref_count++;
le->handler(ev, le, t);
@@ -351,7 +323,7 @@ int event_loop_once(struct event_context *ev)
if (ev->maxfd == fe->fd) {
ev->maxfd = EVENT_INVALID_MAXFD;
}
- free(fe);
+ talloc_unlink(ev, fe);
} else {
if (fe->flags & EVENT_FD_READ) {
FD_SET(fe->fd, &r_fds);
@@ -432,7 +404,7 @@ int event_loop_once(struct event_context *ev)
struct timed_event *next = te->next;
if (te->ref_count == 0) {
DLIST_REMOVE(ev->timed_events, te);
- free(te);
+ talloc_unlink(ev, te);
} else if (te->next_event <= t) {
te->ref_count++;
te->handler(ev, te, t);
diff --git a/source4/lib/messaging/config.m4 b/source4/lib/messaging/config.m4
new file mode 100644
index 0000000000..1797069ff2
--- /dev/null
+++ b/source4/lib/messaging/config.m4
@@ -0,0 +1 @@
+SMB_SUBSYSTEM_MK(MESSAGING,lib/messaging/config.mk)
diff --git a/source4/lib/messaging/config.mk b/source4/lib/messaging/config.mk
new file mode 100644
index 0000000000..6c30894779
--- /dev/null
+++ b/source4/lib/messaging/config.mk
@@ -0,0 +1,8 @@
+
+################################################
+# Start SUBSYSTEM MESSAGING
+[SUBSYSTEM::MESSAGING]
+INIT_OBJ_FILES = \
+ lib/messaging/messaging.o
+# End SUBSYSTEM MESSAGING
+################################################
diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
new file mode 100644
index 0000000000..f9caf5071c
--- /dev/null
+++ b/source4/lib/messaging/messaging.c
@@ -0,0 +1,445 @@
+/*
+ Unix SMB/CIFS implementation.
+
+ Samba internal messaging functions
+
+ Copyright (C) Andrew Tridgell 2004
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+*/
+
+#include "includes.h"
+
+/* change the message version with any incompatible changes in the protocol */
+#define MESSAGING_VERSION 1
+
+struct messaging_state {
+ servid_t server_id;
+ struct socket_context *sock;
+ char *path;
+ struct dispatch_fn *dispatch;
+
+ struct {
+ struct event_context *ev;
+ struct fd_event *fde;
+ } event;
+};
+
+/* we have a linked list of dispatch handlers that this messaging
+ server can deal with */
+struct dispatch_fn {
+ struct dispatch_fn *next, *prev;
+ uint32_t msg_type;
+ void *private;
+ void (*fn)(void *msg_ctx, void *private,
+ uint32_t msg_type, servid_t server_id, DATA_BLOB *data);
+};
+
+/* an individual message */
+struct messaging_rec {
+ struct messaging_rec *next, *prev;
+
+ struct messaging_state *msg;
+ struct socket_context *sock;
+ struct fd_event *fde;
+ const char *path;
+
+ struct {
+ uint32_t version;
+ uint32_t msg_type;
+ servid_t from;
+ servid_t to;
+ uint32_t length;
+ } header;
+
+ DATA_BLOB data;
+
+ uint32_t ndone;
+};
+
+/*
+ A useful function for testing the message system.
+*/
+static void ping_message(void *msg_ctx, void *private,
+ uint32_t msg_type, servid_t src, DATA_BLOB *data)
+{
+ DEBUG(1,("INFO: Received PING message from server %u [%.*s]\n",
+ (uint_t)src, data->length, data->data?(const char *)data->data:""));
+ messaging_send(msg_ctx, src, MSG_PONG, data);
+}
+
+/*
+ return the path to a messaging socket
+*/
+static char *messaging_path(TALLOC_CTX *mem_ctx, servid_t server_id)
+{
+ char *name = talloc_asprintf(mem_ctx, "messaging/msg.%u", (unsigned)server_id);
+ char *ret;
+ ret = lock_path(mem_ctx, name);
+ talloc_free(name);
+ return ret;
+}
+
+/*
+ dispatch a fully received message
+*/
+static void messaging_dispatch(struct messaging_state *msg, struct messaging_rec *rec)
+{
+ struct dispatch_fn *d;
+ for (d=msg->dispatch;d;d=d->next) {
+ if (d->msg_type == rec->header.msg_type) {
+ d->fn(msg, d->private, d->msg_type, rec->header.from, &rec->data);
+ }
+ }
+
+ /* we don't free the record itself here as there may
+ be more messages from this client */
+ data_blob_free(&rec->data);
+ rec->header.length = 0;
+ rec->ndone = 0;
+}
+
+
+/*
+ handle IO for a single message
+*/
+static void messaging_recv_handler(struct event_context *ev, struct fd_event *fde,
+ time_t t, uint16_t flags)
+{
+ struct messaging_rec *rec = fde->private;
+ struct messaging_state *msg = rec->msg;
+ NTSTATUS status;
+
+ if (rec->ndone < sizeof(rec->header)) {
+ /* receive the header */
+ DATA_BLOB blob;
+ status = socket_recv(rec->sock, rec,
+ &blob, sizeof(rec->header) - rec->ndone, 0);
+ if (NT_STATUS_IS_ERR(status)) {
+ talloc_free(rec);
+ return;
+ }
+
+ if (blob.length == 0) {
+ return;
+ }
+
+ memcpy(rec->ndone + (char *)&rec->header, blob.data, blob.length);
+ rec->ndone += blob.length;
+ data_blob_free(&blob);
+
+ if (rec->ndone == sizeof(rec->header)) {
+ if (rec->header.version != MESSAGING_VERSION) {
+ DEBUG(0,("meessage with wrong version %u\n",
+ rec->header.version));
+ talloc_free(rec);
+ }
+ rec->data = data_blob_talloc(rec, NULL, rec->header.length);
+ if (rec->data.length != rec->header.length) {
+ DEBUG(0,("Unable to allocate message of size %u\n",
+ rec->header.length));
+ talloc_free(rec);
+ }
+ }
+ }
+
+ if (rec->ndone >= sizeof(rec->header) &&
+ rec->ndone < sizeof(rec->header) + rec->header.length) {
+ /* receive the body, if any */
+ DATA_BLOB blob;
+ status = socket_recv(rec->sock, rec,
+ &blob, sizeof(rec->header) + rec->header.length - rec->ndone, 0);
+ if (NT_STATUS_IS_ERR(status)) {
+ talloc_free(rec);
+ return;
+ }
+
+ if (blob.length == 0) {
+ return;
+ }
+
+ memcpy(rec->data.data + (rec->ndone - sizeof(rec->header)),
+ blob.data, blob.length);
+
+ rec->ndone += blob.length;
+ }
+
+ if (rec->ndone == sizeof(rec->header) + rec->header.length) {
+ /* we've got the whole message */
+ messaging_dispatch(msg, rec);
+ }
+}
+
+/*
+ destroy a messaging record
+*/
+static int rec_destructor(void *ptr)
+{
+ struct messaging_rec *rec = ptr;
+ struct messaging_state *msg = rec->msg;
+ event_remove_fd(msg->event.ev, rec->fde);
+ return 0;
+}
+
+/*
+ handle a new incoming connection
+*/
+static void messaging_listen_handler(struct event_context *ev, struct fd_event *fde,
+ time_t t, uint16_t flags)
+{
+ struct messaging_state *msg = fde->private;
+ struct messaging_rec *rec;
+ NTSTATUS status;
+ struct fd_event fde2;
+
+ rec = talloc_p(msg, struct messaging_rec);
+ if (rec == NULL) {
+ smb_panic("Unable to allocate messaging_rec");
+ }
+
+ status = socket_accept(msg->sock, &rec->sock, 0);
+ if (!NT_STATUS_IS_OK(status)) {
+ smb_panic("Unable to accept messaging_rec");
+ }
+ talloc_steal(rec, rec->sock);
+
+ rec->msg = msg;
+ rec->ndone = 0;
+ rec->header.length = 0;
+ rec->path = msg->path;
+
+ fde2.private = rec;
+ fde2.fd = socket_get_fd(rec->sock);
+ fde2.flags = EVENT_FD_READ;
+ fde2.handler = messaging_recv_handler;
+
+ rec->fde = event_add_fd(msg->event.ev, &fde2);
+
+ talloc_set_destructor(rec, rec_destructor);
+}
+
+/*
+ Register a dispatch function for a particular message type.
+*/
+void messaging_register(void *ctx, void *private,
+ uint32_t msg_type,
+ void (*fn)(void *, void *, uint32_t, servid_t, DATA_BLOB *))
+{
+ struct messaging_state *msg = ctx;
+ struct dispatch_fn *d;
+
+ d = talloc_p(msg, struct dispatch_fn);
+ d->msg_type = msg_type;
+ d->private = private;
+ d->fn = fn;
+ DLIST_ADD(msg->dispatch, d);
+}
+
+/*
+ De-register the function for a particular message type.
+*/
+void messaging_deregister(void *ctx, uint32_t msg_type)
+{
+ struct messaging_state *msg = ctx;
+ struct dispatch_fn *d, *next;
+
+ for (d = msg->dispatch; d; d = next) {
+ next = d->next;
+ if (d->msg_type == msg_type) {
+ DLIST_REMOVE(msg->dispatch, d);
+ talloc_free(d);
+ }
+ }
+}
+
+
+
+/*
+ handle IO for sending a message
+*/
+static void messaging_send_handler(struct event_context *ev, struct fd_event *fde,
+ time_t t, uint16_t flags)
+{
+ struct messaging_rec *rec = fde->private;
+ NTSTATUS status;
+
+ if (rec->ndone < sizeof(rec->header)) {
+ /* send the header */
+ size_t nsent;
+ DATA_BLOB blob;
+
+ blob.data = rec->ndone + (char *)&rec->header;
+ blob.length = sizeof(rec->header) - rec->ndone;
+
+ status = socket_send(rec->sock, rec, &blob, &nsent, 0);
+ if (NT_STATUS_IS_ERR(status)) {
+ talloc_free(rec);
+ return;
+ }
+
+ if (nsent == 0) {
+ return;
+ }
+
+ rec->ndone += nsent;
+ }
+
+ if (rec->ndone >= sizeof(rec->header) &&
+ rec->ndone < sizeof(rec->header) + rec->header.length) {
+ /* send the body, if any */
+ DATA_BLOB blob;
+ size_t nsent;
+
+ blob.data = rec->data.data + (rec->ndone - sizeof(rec->header));
+ blob.length = rec->header.length - (rec->ndone - sizeof(rec->header));
+
+ status = socket_send(rec->sock, rec, &blob, &nsent, 0);
+ if (NT_STATUS_IS_ERR(status)) {
+ talloc_free(rec);
+ return;
+ }
+
+ rec->ndone += nsent;
+ }
+
+ if (rec->ndone == sizeof(rec->header) + rec->header.length) {
+ /* we've done the whole message */
+ talloc_free(rec);
+ }
+}
+
+
+/*
+ Send a message to a particular server
+*/
+NTSTATUS messaging_send(void *msg_ctx, servid_t server, uint32_t msg_type, DATA_BLOB *data)
+{
+ struct messaging_state *msg = msg_ctx;
+ struct messaging_rec *rec;
+ NTSTATUS status;
+ struct fd_event fde;
+
+ rec = talloc_p(msg, struct messaging_rec);
+ if (rec == NULL) {
+ return NT_STATUS_NO_MEMORY;
+ }
+
+ rec->msg = msg;
+ rec->header.version = MESSAGING_VERSION;
+ rec->header.msg_type = msg_type;
+ rec->header.from = msg->server_id;
+ rec->header.to = server;
+ rec->header.length = data?data->length:0;
+ if (rec->header.length != 0) {
+ rec->data = data_blob_talloc(rec, data->data, data->length);
+ } else {
+ rec->data = data_blob(NULL, 0);
+ }
+ rec->ndone = 0;
+
+ status = socket_create("unix", SOCKET_TYPE_STREAM, &rec->sock, 0);
+ if (!NT_STATUS_IS_OK(status)) {
+ talloc_free(rec);
+ return status;
+ }
+ talloc_steal(rec, rec->sock);
+
+ rec->path = messaging_path(rec, server);
+
+ status = socket_connect(rec->sock, NULL, 0, rec->path, 0, 0);
+ if (!NT_STATUS_IS_OK(status)) {
+ talloc_free(rec);
+ return status;
+ }
+
+ fde.private = rec;
+ fde.fd = socket_get_fd(rec->sock);
+ fde.flags = EVENT_FD_WRITE;
+ fde.handler = messaging_send_handler;
+
+ rec->fde = event_add_fd(msg->event.ev, &fde);
+
+ talloc_set_destructor(rec, rec_destructor);
+
+ return NT_STATUS_OK;
+}
+
+
+/*
+ destroy the messaging context
+*/
+static int messaging_destructor(void *msg_ctx)
+{
+ struct messaging_state *msg = msg_ctx;
+ event_remove_fd(msg->event.ev, msg->event.fde);
+ return 0;
+}
+
+/*
+ create the listening socket and setup the dispatcher
+*/
+void *messaging_init(TALLOC_CTX *mem_ctx, servid_t server_id, struct event_context *ev)
+{
+ struct messaging_state *msg;
+ NTSTATUS status;
+ struct fd_event fde;
+
+ msg = talloc_p(mem_ctx, struct messaging_state);
+ if (msg == NULL) {
+ return NULL;
+ }
+
+ /* create the messaging directory if needed */
+ msg->path = lock_path(msg, "messaging");
+ mkdir(msg->path, 0700);
+ talloc_free(msg->path);
+
+ msg->server_id = server_id;
+ msg->dispatch = NULL;
+ msg->path = messaging_path(msg, server_id);
+
+ status = socket_create("unix", SOCKET_TYPE_STREAM, &msg->sock, 0);
+ if (!NT_STATUS_IS_OK(status)) {
+ talloc_free(msg);
+ return NULL;
+ }
+
+ /* by stealing here we ensure that the socket is cleaned up (and even
+ deleted) on exit */
+ talloc_steal(msg, msg->sock);
+
+ status = socket_listen(msg->sock, msg->path, 0, 50, 0);
+ if (!NT_STATUS_IS_OK(status)) {
+ DEBUG(0,("Unable to setup messaging listener for '%s'\n", msg->path));
+ talloc_free(msg);
+ return NULL;
+ }
+
+ fde.private = msg;
+ fde.fd = socket_get_fd(msg->sock);
+ fde.flags = EVENT_FD_READ;
+ fde.handler = messaging_listen_handler;
+
+ msg->event.ev = ev;
+ msg->event.fde = event_add_fd(ev, &fde);
+
+ talloc_set_destructor(msg, messaging_destructor);
+
+ messaging_register(msg, NULL, MSG_PING, ping_message);
+
+ return msg;
+}
+
+
diff --git a/source4/lib/socket/socket_unix.c b/source4/lib/socket/socket_unix.c
index d87eaf49c4..7e169c47a7 100644
--- a/source4/lib/socket/socket_unix.c
+++ b/source4/lib/socket/socket_unix.c
@@ -29,6 +29,7 @@ static NTSTATUS unixdom_init(struct socket_context *sock)
if (sock->fd == -1) {
return NT_STATUS_INSUFFICIENT_RESOURCES;
}
+ sock->private_data = NULL;
return NT_STATUS_OK;
}
@@ -36,6 +37,11 @@ static NTSTATUS unixdom_init(struct socket_context *sock)
static void unixdom_close(struct socket_context *sock)
{
close(sock->fd);
+ /* if we were listening, then don't leave the socket lying
+ around in the filesystem */
+ if (sock->private_data) {
+ unlink((const char *)sock->private_data);
+ }
}
static NTSTATUS unixdom_connect(struct socket_context *sock,
@@ -82,6 +88,9 @@ static NTSTATUS unixdom_listen(struct socket_context *sock,
return NT_STATUS_INVALID_PARAMETER;
}
+ /* delete if it already exists */
+ unlink(my_address);
+
ZERO_STRUCT(my_addr);
my_addr.sun_family = AF_UNIX;
strncpy(my_addr.sun_path, my_address, sizeof(my_addr.sun_path));
@@ -104,6 +113,7 @@ static NTSTATUS unixdom_listen(struct socket_context *sock,
}
sock->state = SOCKET_STATE_SERVER_LISTEN;
+ sock->private_data = (void *)talloc_strdup(sock, my_address);
return NT_STATUS_OK;
}