summaryrefslogtreecommitdiff
path: root/source4/lib/messaging/messaging.c
diff options
context:
space:
mode:
authorAndrew Tridgell <tridge@samba.org>2005-05-01 18:49:07 +0000
committerGerald (Jerry) Carter <jerry@samba.org>2007-10-10 13:16:25 -0500
commit7282ddda0a38139fa457e6451f100f6d1792d927 (patch)
treec6a1e2edddde77306ad9b24c088f52e4403baffc /source4/lib/messaging/messaging.c
parent425350bb618b7168de1d5d808c9ac5a76d84fcf0 (diff)
downloadsamba-7282ddda0a38139fa457e6451f100f6d1792d927.tar.gz
samba-7282ddda0a38139fa457e6451f100f6d1792d927.tar.bz2
samba-7282ddda0a38139fa457e6451f100f6d1792d927.zip
r6561: re-did the internal message system based on DGRAM unix domain
sockets. This gains us about 40% in messaging speed. (This used to be commit f244a64ed537447e44229172427b5b6a5c64800c)
Diffstat (limited to 'source4/lib/messaging/messaging.c')
-rw-r--r--source4/lib/messaging/messaging.c202
1 files changed, 64 insertions, 138 deletions
diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
index 8127e7e8fc..c95028bea5 100644
--- a/source4/lib/messaging/messaging.c
+++ b/source4/lib/messaging/messaging.c
@@ -34,10 +34,13 @@
/* the number of microseconds to backoff in retrying to send a message */
#define MESSAGING_BACKOFF 250000
+/* maximum message size */
+#define MESSAGING_MAX_SIZE 512
+
struct messaging_context {
uint32_t server_id;
struct socket_context *sock;
- char *path;
+ const char *path;
struct dispatch_fn *dispatch;
struct {
@@ -60,7 +63,6 @@ struct dispatch_fn {
struct messaging_rec {
struct messaging_context *msg;
struct socket_context *sock;
- struct fd_event *fde;
const char *path;
struct {
@@ -72,8 +74,6 @@ struct messaging_rec {
} header;
DATA_BLOB data;
-
- uint32_t ndone;
};
/*
@@ -112,112 +112,58 @@ static void messaging_dispatch(struct messaging_context *msg, struct messaging_r
}
}
- /* we don't free the record itself here as there may
- be more messages from this client */
- data_blob_free(&rec->data);
rec->header.length = 0;
- rec->ndone = 0;
}
/*
- handle IO for a single message
+ handle a new incoming connection
*/
static void messaging_recv_handler(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
{
- struct messaging_rec *rec = talloc_get_type(private, struct messaging_rec);
- struct messaging_context *msg = rec->msg;
+ struct messaging_context *msg = talloc_get_type(private,
+ struct messaging_context);
+ struct messaging_rec *rec;
NTSTATUS status;
+ uint8_t data[MESSAGING_MAX_SIZE];
+ size_t msize;
- if (rec->ndone < sizeof(rec->header)) {
- /* receive the header */
- size_t nread;
-
- status = socket_recv(rec->sock,
- rec->ndone + (char *)&rec->header,
- sizeof(rec->header) - rec->ndone, &nread, 0);
- if (NT_STATUS_IS_ERR(status)) {
- talloc_free(rec);
- return;
- }
-
- if (nread == 0) {
- return;
- }
-
- rec->ndone += nread;
-
- if (rec->ndone == sizeof(rec->header)) {
- if (rec->header.version != MESSAGING_VERSION) {
- DEBUG(0,("meessage with wrong version %u\n",
- rec->header.version));
- talloc_free(rec);
- }
- rec->data = data_blob_talloc(rec, NULL, rec->header.length);
- if (rec->data.length != rec->header.length) {
- DEBUG(0,("Unable to allocate message of size %u\n",
- rec->header.length));
- talloc_free(rec);
- }
- }
- }
-
- if (rec->ndone >= sizeof(rec->header) &&
- rec->ndone < sizeof(rec->header) + rec->header.length) {
- /* receive the body, if any */
- size_t nread;
-
- status = socket_recv(rec->sock,
- rec->data.data + (rec->ndone - sizeof(rec->header)),
- sizeof(rec->header) + rec->header.length - rec->ndone,
- &nread, 0);
- if (NT_STATUS_IS_ERR(status)) {
- talloc_free(rec);
- return;
- }
-
- if (nread == 0) {
- return;
- }
-
- rec->ndone += nread;
+ status = socket_recv(msg->sock, data, sizeof(data), &msize, 0);
+ if (!NT_STATUS_IS_OK(status)) {
+ return;
}
- if (rec->ndone == sizeof(rec->header) + rec->header.length) {
- /* we've got the whole message */
- messaging_dispatch(msg, rec);
+ if (msize < sizeof(rec->header)) {
+ DEBUG(0,("messaging: bad message of size %d\n", msize));
+ return;
}
-}
-
-/*
- handle a new incoming connection
-*/
-static void messaging_listen_handler(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private)
-{
- struct messaging_context *msg = talloc_get_type(private,
- struct messaging_context);
- struct messaging_rec *rec;
- NTSTATUS status;
rec = talloc(msg, struct messaging_rec);
if (rec == NULL) {
smb_panic("Unable to allocate messaging_rec");
}
- status = socket_accept(msg->sock, &rec->sock);
- if (!NT_STATUS_IS_OK(status)) {
- smb_panic("Unable to accept messaging_rec");
- }
- talloc_steal(rec, rec->sock);
-
rec->msg = msg;
- rec->ndone = 0;
- rec->header.length = 0;
rec->path = msg->path;
- rec->fde = event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
- EVENT_FD_READ, messaging_recv_handler, rec);
+ rec->sock = NULL;
+
+ memcpy(&rec->header, data, sizeof(rec->header));
+ if (msize != sizeof(rec->header) + rec->header.length) {
+ DEBUG(0,("messaging: bad message header size %d should be %d\n",
+ rec->header.length, msize - sizeof(rec->header)));
+ talloc_free(rec);
+ return;
+ }
+
+ rec->data = data_blob_talloc(rec, data, rec->header.length);
+ if (rec->data.data == NULL) {
+ talloc_free(rec);
+ return;
+ }
+
+ messaging_dispatch(msg, rec);
+ talloc_free(rec);
}
/*
@@ -262,49 +208,28 @@ static void messaging_send_handler(struct event_context *ev, struct fd_event *fd
uint16_t flags, void *private)
{
struct messaging_rec *rec = talloc_get_type(private, struct messaging_rec);
+ uint8_t data[MESSAGING_MAX_SIZE];
+ DATA_BLOB blob;
+ size_t nsent;
NTSTATUS status;
- if (rec->ndone < sizeof(rec->header)) {
- /* send the header */
- size_t nsent;
- DATA_BLOB blob;
-
- blob.data = rec->ndone + (uint8_t *)&rec->header;
- blob.length = sizeof(rec->header) - rec->ndone;
-
- status = socket_send(rec->sock, &blob, &nsent, 0);
- if (NT_STATUS_IS_ERR(status)) {
- talloc_free(rec);
- return;
- }
-
- if (nsent == 0) {
- return;
- }
-
- rec->ndone += nsent;
- }
-
- if (rec->ndone >= sizeof(rec->header) &&
- rec->ndone < sizeof(rec->header) + rec->header.length) {
- /* send the body, if any */
- DATA_BLOB blob;
- size_t nsent;
-
- blob.data = rec->data.data + (rec->ndone - sizeof(rec->header));
- blob.length = rec->header.length - (rec->ndone - sizeof(rec->header));
+ memcpy(data, &rec->header, sizeof(rec->header));
+ memcpy(data + sizeof(rec->header), rec->data.data, rec->data.length);
- status = socket_send(rec->sock, &blob, &nsent, 0);
- if (NT_STATUS_IS_ERR(status)) {
- talloc_free(rec);
- return;
- }
+ blob.data = data;
+ blob.length = sizeof(rec->header) + rec->header.length;
- rec->ndone += nsent;
+ status = socket_send(rec->sock, &blob, &nsent, 0);
+ if (NT_STATUS_IS_ERR(status)) {
+ DEBUG(3,("Unable to send message of type %d length %d - %s\n",
+ rec->header.msg_type,
+ rec->header.length,
+ nt_errstr(status)));
+ talloc_free(rec);
+ return;
}
- if (rec->ndone == sizeof(rec->header) + rec->header.length) {
- /* we've done the whole message */
+ if (NT_STATUS_IS_OK(status)) {
talloc_free(rec);
}
}
@@ -349,8 +274,8 @@ static void messaging_backoff_handler(struct event_context *ev, struct timed_eve
return;
}
- rec->fde = event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
- EVENT_FD_WRITE, messaging_send_handler, rec);
+ event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
+ EVENT_FD_WRITE, messaging_send_handler, rec);
}
@@ -378,9 +303,8 @@ NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, uint32_t
} else {
rec->data = data_blob(NULL, 0);
}
- rec->ndone = 0;
- status = socket_create("unix", SOCKET_TYPE_STREAM, &rec->sock, 0);
+ status = socket_create("unix", SOCKET_TYPE_DGRAM, &rec->sock, 0);
if (!NT_STATUS_IS_OK(status)) {
talloc_free(rec);
return status;
@@ -403,8 +327,8 @@ NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, uint32_t
return status;
}
- rec->fde = event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
- EVENT_FD_WRITE, messaging_send_handler, rec);
+ event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
+ EVENT_FD_WRITE, messaging_send_handler, rec);
return NT_STATUS_OK;
}
@@ -437,10 +361,12 @@ static int messaging_destructor(void *ptr)
/*
create the listening socket and setup the dispatcher
*/
-struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id, struct event_context *ev)
+struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id,
+ struct event_context *ev)
{
struct messaging_context *msg;
NTSTATUS status;
+ char *path;
msg = talloc(mem_ctx, struct messaging_context);
if (msg == NULL) {
@@ -448,15 +374,15 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
}
/* create the messaging directory if needed */
- msg->path = smbd_tmp_path(msg, "messaging");
- mkdir(msg->path, 0700);
- talloc_free(msg->path);
+ path = smbd_tmp_path(msg, "messaging");
+ mkdir(path, 0700);
+ talloc_free(path);
+ msg->path = messaging_path(msg, server_id);
msg->server_id = server_id;
msg->dispatch = NULL;
- msg->path = messaging_path(msg, server_id);
- status = socket_create("unix", SOCKET_TYPE_STREAM, &msg->sock, 0);
+ status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
if (!NT_STATUS_IS_OK(status)) {
talloc_free(msg);
return NULL;
@@ -475,7 +401,7 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
msg->event.ev = talloc_reference(msg, ev);
msg->event.fde = event_add_fd(ev, msg, socket_get_fd(msg->sock),
- EVENT_FD_READ, messaging_listen_handler, msg);
+ EVENT_FD_READ, messaging_recv_handler, msg);
talloc_set_destructor(msg, messaging_destructor);