diff options
Diffstat (limited to 'source4/cluster/ctdb/common/ctdb_io.c')
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_io.c | 79 |
1 files changed, 57 insertions, 22 deletions
diff --git a/source4/cluster/ctdb/common/ctdb_io.c b/source4/cluster/ctdb/common/ctdb_io.c index 517fbbd842..3cc522b58a 100644 --- a/source4/cluster/ctdb/common/ctdb_io.c +++ b/source4/cluster/ctdb/common/ctdb_io.c @@ -6,18 +6,18 @@ Copyright (C) Andrew Tridgell 2006 - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation; either - version 3 of the License, or (at your option) any later version. - - This library is distributed in the hope that it will be useful, + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, see <http://www.gnu.org/licenses/>. + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. */ #include "includes.h" @@ -39,6 +39,7 @@ struct ctdb_queue_pkt { struct ctdb_queue_pkt *next, *prev; uint8_t *data; uint32_t length; + uint32_t full_length; }; struct ctdb_queue { @@ -63,8 +64,10 @@ static void queue_io_read(struct ctdb_queue *queue) ssize_t nread; uint8_t *data, *data_base; - if (ioctl(queue->fd, FIONREAD, &num_ready) != 0 || - num_ready == 0) { + if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) { + return; + } + if (num_ready == 0) { /* the descriptor has been closed */ goto failed; } @@ -74,11 +77,14 @@ static void queue_io_read(struct ctdb_queue *queue) num_ready + queue->partial.length); if (queue->partial.data == NULL) { + DEBUG(0,("read error alloc failed for %u\n", + num_ready + queue->partial.length)); goto failed; } nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready); if (nread <= 0) { + DEBUG(0,("read error nread=%d\n", (int)nread)); goto failed; } @@ -103,8 +109,14 @@ static void queue_io_read(struct ctdb_queue *queue) uint8_t *d2; uint32_t len; len = *(uint32_t *)data; + if (len == 0) { + /* bad packet! treat as EOF */ + DEBUG(0,("Invalid packet of length 0\n")); + goto failed; + } d2 = talloc_memdup(queue, data, len); if (d2 == NULL) { + DEBUG(0,("read error memdup failed for %u\n", len)); /* sigh */ goto failed; } @@ -121,6 +133,8 @@ static void queue_io_read(struct ctdb_queue *queue) } else { queue->partial.data = talloc_memdup(queue, data, nread); if (queue->partial.data == NULL) { + DEBUG(0,("read error memdup partial failed for %u\n", + (unsigned)nread)); goto failed; } queue->partial.length = nread; @@ -154,13 +168,23 @@ static void queue_io_write(struct ctdb_queue *queue) while (queue->out_queue) { struct ctdb_queue_pkt *pkt = queue->out_queue; ssize_t n; - - n = write(queue->fd, pkt->data, pkt->length); + if (queue->ctdb->flags & CTDB_FLAG_TORTURE) { + n = write(queue->fd, pkt->data, 1); + } else { + n = write(queue->fd, pkt->data, pkt->length); + } if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + if (pkt->length != pkt->full_length) { + /* partial packet sent - we have to drop it */ + DLIST_REMOVE(queue->out_queue, pkt); + talloc_free(pkt); + } + talloc_free(queue->fde); + queue->fde = NULL; + queue->fd = -1; event_add_timed(queue->ctdb->ev, queue, timeval_zero(), queue_dead, queue); - EVENT_FD_NOT_WRITEABLE(queue->fde); return; } if (n <= 0) return; @@ -200,21 +224,31 @@ static void queue_io_handler(struct event_context *ev, struct fd_event *fde, int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length) { struct ctdb_queue_pkt *pkt; - uint32_t length2; + uint32_t length2, full_length; - /* enforce the length and alignment rules from the tcp packet allocator */ - length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1); - *(uint32_t *)data = length2; + if (queue->alignment) { + /* enforce the length and alignment rules from the tcp packet allocator */ + length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1); + *(uint32_t *)data = length2; + } else { + length2 = length; + } if (length2 != length) { memset(data+length, 0, length2-length); } + + full_length = length2; /* if the queue is empty then try an immediate write, avoiding queue overhead. This relies on non-blocking sockets */ - if (queue->out_queue == NULL && queue->fd != -1) { + if (queue->out_queue == NULL && queue->fd != -1 && + !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) { ssize_t n = write(queue->fd, data, length2); if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + talloc_free(queue->fde); + queue->fde = NULL; + queue->fd = -1; event_add_timed(queue->ctdb->ev, queue, timeval_zero(), queue_dead, queue); /* yes, we report success, as the dead node is @@ -235,6 +269,7 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length) CTDB_NO_MEMORY(queue->ctdb, pkt->data); pkt->length = length2; + pkt->full_length = full_length; if (queue->out_queue == NULL && queue->fd != -1) { EVENT_FD_WRITEABLE(queue->fde); @@ -256,7 +291,7 @@ int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd) queue->fde = NULL; if (fd != -1) { - queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ, + queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE, queue_io_handler, queue); if (queue->fde == NULL) { return -1; |