From fe6bb1e9edb51890f190f0a305493fba1286c5cb Mon Sep 17 00:00:00 2001 From: Jeremy Allison Date: Fri, 10 Jan 2003 20:17:06 +0000 Subject: First part of efficiency fixes for message sending to pid's (cutting down the amount of time we hold tdb locks). Gulp down all messages at once rather than reading/re-writing one at a time. NOTE: All dispatch routines *must* be able to cope with incoming message on *odd* byte boundaries (all current handlers do). Jeremy. (This used to be commit 04243e39cf4e11dd20e6035f553722a9720f00ae) --- source3/lib/messages.c | 143 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 95 insertions(+), 48 deletions(-) diff --git a/source3/lib/messages.c b/source3/lib/messages.c index 8200b2f8c3..555a55569e 100644 --- a/source3/lib/messages.c +++ b/source3/lib/messages.c @@ -3,6 +3,7 @@ Samba internal messaging functions Copyright (C) Andrew Tridgell 2000 Copyright (C) 2001 by Martin Pool + Copyright (C) 2002 by Jeremy Allison This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -35,6 +36,9 @@ use that to reply by message_send_pid(). See ping_message() for a simple example. + *NOTE*: Dispatch functions must be able to cope with incoming + messages on an *odd* byte boundary. + This system doesn't have any inherent size limitations but is not very efficient for large messages or when messages are sent in very quick succession. @@ -86,6 +90,16 @@ static void ping_message(int msg_type, pid_t src, void *buf, size_t len) message_send_pid(src, MSG_PONG, buf, len, True); } +/**************************************************************************** + Return current debug level. +****************************************************************************/ + +void debuglevel_message(int msg_type, pid_t src, void *buf, size_t len) +{ + DEBUG(1,("INFO: Received REQ_DEBUGLEVEL message from PID %u\n",(unsigned int)src)); + message_send_pid(src, MSG_DEBUGLEVEL, DEBUGLEVEL_CLASS, sizeof(DEBUGLEVEL_CLASS), True); +} + /**************************************************************************** Initialise the messaging functions. ****************************************************************************/ @@ -106,6 +120,7 @@ BOOL message_init(void) CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1); message_register(MSG_PING, ping_message); + message_register(MSG_REQ_DEBUGLEVEL, debuglevel_message); return True; } @@ -133,9 +148,13 @@ static TDB_DATA message_key_pid(pid_t pid) static BOOL message_notify(pid_t pid) { - /* Doing kill with a non-positive pid causes messages to be - * sent to places we don't want. */ + /* + * Doing kill with a non-positive pid causes messages to be + * sent to places we don't want. + */ + SMB_ASSERT(pid > 0); + if (kill(pid, SIGUSR1) == -1) { if (errno == ESRCH) { DEBUG(2,("pid %d doesn't exist - deleting messages record\n", (int)pid)); @@ -160,16 +179,19 @@ BOOL message_send_pid(pid_t pid, int msg_type, const void *buf, size_t len, struct message_rec rec; void *p; + /* + * Doing kill with a non-positive pid causes messages to be + * sent to places we don't want. + */ + + SMB_ASSERT(pid > 0); + rec.msg_version = MESSAGE_VERSION; rec.msg_type = msg_type; rec.dest = pid; rec.src = sys_getpid(); rec.len = len; - /* Doing kill with a non-positive pid causes messages to be - * sent to places we don't want. */ - SMB_ASSERT(pid > 0); - kbuf = message_key_pid(pid); /* lock the record for the destination */ @@ -241,111 +263,136 @@ BOOL message_send_pid(pid_t pid, int msg_type, const void *buf, size_t len, failed: tdb_chainunlock(tdb, kbuf); + SAFE_FREE(dbuf.dptr); errno = 0; /* paranoia */ return False; } /**************************************************************************** - Retrieve the next message for the current process. + Retrieve all messages for the current process. ****************************************************************************/ -static BOOL message_recv(int *msg_type, pid_t *src, void **buf, size_t *len) +static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len) { TDB_DATA kbuf; TDB_DATA dbuf; - struct message_rec rec; + TDB_DATA null_dbuf; + + ZERO_STRUCT(null_dbuf); + + *msgs_buf = NULL; + *total_len = 0; kbuf = message_key_pid(sys_getpid()); tdb_chainlock(tdb, kbuf); - dbuf = tdb_fetch(tdb, kbuf); - if (dbuf.dptr == NULL || dbuf.dsize == 0) - goto failed; + /* + * Replace with an empty record to keep the allocated + * space in the tdb. + */ + tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE); + tdb_chainunlock(tdb, kbuf); + + if (dbuf.dptr == NULL || dbuf.dsize == 0) { + SAFE_FREE(dbuf.dptr); + return False; + } + + *msgs_buf = dbuf.dptr; + *total_len = dbuf.dsize; + + return True; +} + +/**************************************************************************** + Parse out the next message for the current process. +****************************************************************************/ + +static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type, pid_t *src, char **buf, size_t *len) +{ + struct message_rec rec; + char *ret_buf = *buf; - memcpy(&rec, dbuf.dptr, sizeof(rec)); + *buf = NULL; + *len = 0; + + if (total_len - (ret_buf - msgs_buf) < sizeof(rec)) + return False; + + memcpy(&rec, ret_buf, sizeof(rec)); + ret_buf += sizeof(rec); if (rec.msg_version != MESSAGE_VERSION) { DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION)); - goto failed; + return False; } if (rec.len > 0) { - (*buf) = (void *)malloc(rec.len); - if (!(*buf)) - goto failed; - - memcpy(*buf, dbuf.dptr+sizeof(rec), rec.len); - } else { - *buf = NULL; + if (total_len - (ret_buf - msgs_buf) < rec.len) + return False; } *len = rec.len; *msg_type = rec.msg_type; *src = rec.src; + *buf = ret_buf; - if (dbuf.dsize - (sizeof(rec)+rec.len) > 0) - memmove(dbuf.dptr, dbuf.dptr+sizeof(rec)+rec.len, dbuf.dsize - (sizeof(rec)+rec.len)); - dbuf.dsize -= sizeof(rec)+rec.len; - - if (dbuf.dsize == 0) - tdb_delete(tdb, kbuf); - else - tdb_store(tdb, kbuf, dbuf, TDB_REPLACE); - - SAFE_FREE(dbuf.dptr); - tdb_chainunlock(tdb, kbuf); return True; - - failed: - tdb_chainunlock(tdb, kbuf); - SAFE_FREE(dbuf.dptr); - return False; } /**************************************************************************** Receive and dispatch any messages pending for this process. Notice that all dispatch handlers for a particular msg_type get called, so you can register multiple handlers for a message. + *NOTE*: Dispatch functions must be able to cope with incoming + messages on an *odd* byte boundary. ****************************************************************************/ void message_dispatch(void) { int msg_type; pid_t src; - void *buf; - size_t len; + char *buf; + char *msgs_buf; + size_t len, total_len; struct dispatch_fns *dfn; int n_handled; - if (!received_signal) return; + if (!received_signal) + return; DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal)); received_signal = 0; - while (message_recv(&msg_type, &src, &buf, &len)) { - DEBUG(10,("message_dispatch: received msg_type=%d src_pid=%d\n", - msg_type, (int) src)); + if (!retrieve_all_messages(&msgs_buf, &total_len)) + return; + + for (buf = msgs_buf; message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len); buf += len) { + DEBUG(10,("message_dispatch: received msg_type=%d src_pid=%u\n", + msg_type, (unsigned int) src)); n_handled = 0; for (dfn = dispatch_fns; dfn; dfn = dfn->next) { if (dfn->msg_type == msg_type) { DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type)); - dfn->fn(msg_type, src, buf, len); + dfn->fn(msg_type, src, len ? (void *)buf : NULL, len); n_handled++; } } if (!n_handled) { - DEBUG(5,("message_dispatch: warning: no handlers registered for " - "msg_type %d in pid %d\n", - msg_type, sys_getpid())); + DEBUG(5,("message_dispatch: warning: no handlers registed for " + "msg_type %d in pid %u\n", + msg_type, (unsigned int)sys_getpid())); } - SAFE_FREE(buf); } + SAFE_FREE(msgs_buf); } /**************************************************************************** Register a dispatch function for a particular message type. + *NOTE*: Dispatch functions must be able to cope with incoming + messages on an *odd* byte boundary. ****************************************************************************/ void message_register(int msg_type, -- cgit