diff options
Diffstat (limited to 'source4/cluster/ctdb/common/ctdb_io.c')
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_io.c | 338 |
1 files changed, 0 insertions, 338 deletions
diff --git a/source4/cluster/ctdb/common/ctdb_io.c b/source4/cluster/ctdb/common/ctdb_io.c deleted file mode 100644 index 9116bd69f7..0000000000 --- a/source4/cluster/ctdb/common/ctdb_io.c +++ /dev/null @@ -1,338 +0,0 @@ -/* - ctdb database library - Utility functions to read/write blobs of data from a file descriptor - and handle the case where we might need multiple read/writes to get all the - data. - - Copyright (C) Andrew Tridgell 2006 - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, see <http://www.gnu.org/licenses/>. -*/ - -#include "includes.h" -#include "../tdb/include/tdb.h" -#include "lib/events/events.h" -#include "../lib/util/dlinklist.h" -#include "system/network.h" -#include "system/filesys.h" -#include "../include/ctdb_private.h" -#include "../include/ctdb.h" - -/* structures for packet queueing - see common/ctdb_io.c */ -struct ctdb_partial { - uint8_t *data; - uint32_t length; -}; - -struct ctdb_queue_pkt { - struct ctdb_queue_pkt *next, *prev; - uint8_t *data; - uint32_t length; - uint32_t full_length; -}; - -struct ctdb_queue { - struct ctdb_context *ctdb; - struct ctdb_partial partial; /* partial input packet */ - struct ctdb_queue_pkt *out_queue; - struct fd_event *fde; - int fd; - size_t alignment; - void *private_data; - ctdb_queue_cb_fn_t callback; -}; - - - -/* - called when an incoming connection is readable -*/ -static void queue_io_read(struct ctdb_queue *queue) -{ - int num_ready = 0; - ssize_t nread; - uint8_t *data, *data_base; - - if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) { - return; - } - if (num_ready == 0) { - /* the descriptor has been closed */ - goto failed; - } - - - queue->partial.data = talloc_realloc(queue, queue->partial.data, - uint8_t, - num_ready + queue->partial.length); - - if (queue->partial.data == NULL) { - DEBUG(0,("read error alloc failed for %u\n", - num_ready + queue->partial.length)); - goto failed; - } - - nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready); - if (nread <= 0) { - DEBUG(0,("read error nread=%d\n", (int)nread)); - goto failed; - } - - - data = queue->partial.data; - nread += queue->partial.length; - - queue->partial.data = NULL; - queue->partial.length = 0; - - if (nread >= 4 && *(uint32_t *)data == nread) { - /* it is the responsibility of the incoming packet - function to free 'data' */ - queue->callback(data, nread, queue->private_data); - return; - } - - data_base = data; - - while (nread >= 4 && *(uint32_t *)data <= nread) { - /* we have at least one packet */ - uint8_t *d2; - uint32_t len; - len = *(uint32_t *)data; - if (len == 0) { - /* bad packet! treat as EOF */ - DEBUG(0,("Invalid packet of length 0\n")); - goto failed; - } - d2 = (uint8_t *)talloc_memdup(queue, data, len); - if (d2 == NULL) { - DEBUG(0,("read error memdup failed for %u\n", len)); - /* sigh */ - goto failed; - } - queue->callback(d2, len, queue->private_data); - data += len; - nread -= len; - } - - if (nread > 0) { - /* we have only part of a packet */ - if (data_base == data) { - queue->partial.data = data; - queue->partial.length = nread; - } else { - queue->partial.data = (uint8_t *)talloc_memdup(queue, data, nread); - if (queue->partial.data == NULL) { - DEBUG(0,("read error memdup partial failed for %u\n", - (unsigned)nread)); - goto failed; - } - queue->partial.length = nread; - talloc_free(data_base); - } - return; - } - - talloc_free(data_base); - return; - -failed: - queue->callback(NULL, 0, queue->private_data); -} - - -/* used when an event triggers a dead queue */ -static void queue_dead(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private_data) -{ - struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue); - queue->callback(NULL, 0, queue->private_data); -} - - -/* - called when an incoming connection is writeable -*/ -static void queue_io_write(struct ctdb_queue *queue) -{ - while (queue->out_queue) { - struct ctdb_queue_pkt *pkt = queue->out_queue; - ssize_t n; - if (queue->ctdb->flags & CTDB_FLAG_TORTURE) { - n = write(queue->fd, pkt->data, 1); - } else { - n = write(queue->fd, pkt->data, pkt->length); - } - - if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { - if (pkt->length != pkt->full_length) { - /* partial packet sent - we have to drop it */ - DLIST_REMOVE(queue->out_queue, pkt); - talloc_free(pkt); - } - talloc_free(queue->fde); - queue->fde = NULL; - queue->fd = -1; - event_add_timed(queue->ctdb->ev, queue, timeval_zero(), - queue_dead, queue); - return; - } - if (n <= 0) return; - - if (n != pkt->length) { - pkt->length -= n; - pkt->data += n; - return; - } - - DLIST_REMOVE(queue->out_queue, pkt); - talloc_free(pkt); - } - - EVENT_FD_NOT_WRITEABLE(queue->fde); -} - -/* - called when an incoming connection is readable or writeable -*/ -static void queue_io_handler(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private_data) -{ - struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue); - - if (flags & EVENT_FD_READ) { - queue_io_read(queue); - } else { - queue_io_write(queue); - } -} - - -/* - queue a packet for sending -*/ -int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length) -{ - struct ctdb_queue_pkt *pkt; - uint32_t length2, full_length; - - if (queue->alignment) { - /* enforce the length and alignment rules from the tcp packet allocator */ - length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1); - *(uint32_t *)data = length2; - } else { - length2 = length; - } - - if (length2 != length) { - memset(data+length, 0, length2-length); - } - - full_length = length2; - - /* if the queue is empty then try an immediate write, avoiding - queue overhead. This relies on non-blocking sockets */ - if (queue->out_queue == NULL && queue->fd != -1 && - !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) { - ssize_t n = write(queue->fd, data, length2); - if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { - talloc_free(queue->fde); - queue->fde = NULL; - queue->fd = -1; - event_add_timed(queue->ctdb->ev, queue, timeval_zero(), - queue_dead, queue); - /* yes, we report success, as the dead node is - handled via a separate event */ - return 0; - } - if (n > 0) { - data += n; - length2 -= n; - } - if (length2 == 0) return 0; - } - - pkt = talloc(queue, struct ctdb_queue_pkt); - CTDB_NO_MEMORY(queue->ctdb, pkt); - - pkt->data = (uint8_t *)talloc_memdup(pkt, data, length2); - CTDB_NO_MEMORY(queue->ctdb, pkt->data); - - pkt->length = length2; - pkt->full_length = full_length; - - if (queue->out_queue == NULL && queue->fd != -1) { - EVENT_FD_WRITEABLE(queue->fde); - } - - DLIST_ADD_END(queue->out_queue, pkt, struct ctdb_queue_pkt *); - - return 0; -} - - -/* - setup the fd used by the queue - */ -int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd) -{ - queue->fd = fd; - talloc_free(queue->fde); - queue->fde = NULL; - - if (fd != -1) { - queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE, - queue_io_handler, queue); - if (queue->fde == NULL) { - return -1; - } - - if (queue->out_queue) { - EVENT_FD_WRITEABLE(queue->fde); - } - } - - return 0; -} - - - -/* - setup a packet queue on a socket - */ -struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb, - TALLOC_CTX *mem_ctx, int fd, int alignment, - - ctdb_queue_cb_fn_t callback, - void *private_data) -{ - struct ctdb_queue *queue; - - queue = talloc_zero(mem_ctx, struct ctdb_queue); - CTDB_NO_MEMORY_NULL(ctdb, queue); - - queue->ctdb = ctdb; - queue->fd = fd; - queue->alignment = alignment; - queue->private_data = private_data; - queue->callback = callback; - if (fd != -1) { - if (ctdb_queue_set_fd(queue, fd) != 0) { - talloc_free(queue); - return NULL; - } - } - - return queue; -} |