summaryrefslogtreecommitdiff
path: root/source4/cluster/ctdb/common/ctdb_io.c
diff options
context:
space:
mode:
Diffstat (limited to 'source4/cluster/ctdb/common/ctdb_io.c')
-rw-r--r--source4/cluster/ctdb/common/ctdb_io.c79
1 files changed, 57 insertions, 22 deletions
diff --git a/source4/cluster/ctdb/common/ctdb_io.c b/source4/cluster/ctdb/common/ctdb_io.c
index 517fbbd842..3cc522b58a 100644
--- a/source4/cluster/ctdb/common/ctdb_io.c
+++ b/source4/cluster/ctdb/common/ctdb_io.c
@@ -6,18 +6,18 @@
Copyright (C) Andrew Tridgell 2006
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 3 of the License, or (at your option) any later version.
-
- This library is distributed in the hope that it will be useful,
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#include "includes.h"
@@ -39,6 +39,7 @@ struct ctdb_queue_pkt {
struct ctdb_queue_pkt *next, *prev;
uint8_t *data;
uint32_t length;
+ uint32_t full_length;
};
struct ctdb_queue {
@@ -63,8 +64,10 @@ static void queue_io_read(struct ctdb_queue *queue)
ssize_t nread;
uint8_t *data, *data_base;
- if (ioctl(queue->fd, FIONREAD, &num_ready) != 0 ||
- num_ready == 0) {
+ if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
+ return;
+ }
+ if (num_ready == 0) {
/* the descriptor has been closed */
goto failed;
}
@@ -74,11 +77,14 @@ static void queue_io_read(struct ctdb_queue *queue)
num_ready + queue->partial.length);
if (queue->partial.data == NULL) {
+ DEBUG(0,("read error alloc failed for %u\n",
+ num_ready + queue->partial.length));
goto failed;
}
nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready);
if (nread <= 0) {
+ DEBUG(0,("read error nread=%d\n", (int)nread));
goto failed;
}
@@ -103,8 +109,14 @@ static void queue_io_read(struct ctdb_queue *queue)
uint8_t *d2;
uint32_t len;
len = *(uint32_t *)data;
+ if (len == 0) {
+ /* bad packet! treat as EOF */
+ DEBUG(0,("Invalid packet of length 0\n"));
+ goto failed;
+ }
d2 = talloc_memdup(queue, data, len);
if (d2 == NULL) {
+ DEBUG(0,("read error memdup failed for %u\n", len));
/* sigh */
goto failed;
}
@@ -121,6 +133,8 @@ static void queue_io_read(struct ctdb_queue *queue)
} else {
queue->partial.data = talloc_memdup(queue, data, nread);
if (queue->partial.data == NULL) {
+ DEBUG(0,("read error memdup partial failed for %u\n",
+ (unsigned)nread));
goto failed;
}
queue->partial.length = nread;
@@ -154,13 +168,23 @@ static void queue_io_write(struct ctdb_queue *queue)
while (queue->out_queue) {
struct ctdb_queue_pkt *pkt = queue->out_queue;
ssize_t n;
-
- n = write(queue->fd, pkt->data, pkt->length);
+ if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
+ n = write(queue->fd, pkt->data, 1);
+ } else {
+ n = write(queue->fd, pkt->data, pkt->length);
+ }
if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ if (pkt->length != pkt->full_length) {
+ /* partial packet sent - we have to drop it */
+ DLIST_REMOVE(queue->out_queue, pkt);
+ talloc_free(pkt);
+ }
+ talloc_free(queue->fde);
+ queue->fde = NULL;
+ queue->fd = -1;
event_add_timed(queue->ctdb->ev, queue, timeval_zero(),
queue_dead, queue);
- EVENT_FD_NOT_WRITEABLE(queue->fde);
return;
}
if (n <= 0) return;
@@ -200,21 +224,31 @@ static void queue_io_handler(struct event_context *ev, struct fd_event *fde,
int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
{
struct ctdb_queue_pkt *pkt;
- uint32_t length2;
+ uint32_t length2, full_length;
- /* enforce the length and alignment rules from the tcp packet allocator */
- length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
- *(uint32_t *)data = length2;
+ if (queue->alignment) {
+ /* enforce the length and alignment rules from the tcp packet allocator */
+ length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
+ *(uint32_t *)data = length2;
+ } else {
+ length2 = length;
+ }
if (length2 != length) {
memset(data+length, 0, length2-length);
}
+
+ full_length = length2;
/* if the queue is empty then try an immediate write, avoiding
queue overhead. This relies on non-blocking sockets */
- if (queue->out_queue == NULL && queue->fd != -1) {
+ if (queue->out_queue == NULL && queue->fd != -1 &&
+ !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
ssize_t n = write(queue->fd, data, length2);
if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ talloc_free(queue->fde);
+ queue->fde = NULL;
+ queue->fd = -1;
event_add_timed(queue->ctdb->ev, queue, timeval_zero(),
queue_dead, queue);
/* yes, we report success, as the dead node is
@@ -235,6 +269,7 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
CTDB_NO_MEMORY(queue->ctdb, pkt->data);
pkt->length = length2;
+ pkt->full_length = full_length;
if (queue->out_queue == NULL && queue->fd != -1) {
EVENT_FD_WRITEABLE(queue->fde);
@@ -256,7 +291,7 @@ int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
queue->fde = NULL;
if (fd != -1) {
- queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ,
+ queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
queue_io_handler, queue);
if (queue->fde == NULL) {
return -1;