From c9f04d8648cfdd573d45d47467bc964ef01f754d Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Mon, 16 Apr 2007 00:18:54 +0000 Subject: r22231: merge from bzr ctdb tree (This used to be commit 807b959082d3b9a929c9f6597714e636638a940e) --- source4/cluster/ctdb/common/ctdb_io.c | 303 ++++++++++++++++++++++++++++++++++ 1 file changed, 303 insertions(+) create mode 100644 source4/cluster/ctdb/common/ctdb_io.c (limited to 'source4/cluster/ctdb/common/ctdb_io.c') diff --git a/source4/cluster/ctdb/common/ctdb_io.c b/source4/cluster/ctdb/common/ctdb_io.c new file mode 100644 index 0000000000..238f1701cf --- /dev/null +++ b/source4/cluster/ctdb/common/ctdb_io.c @@ -0,0 +1,303 @@ +/* + ctdb database library + Utility functions to read/write blobs of data from a file descriptor + and handle the case where we might need multiple read/writes to get all the + data. + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/tdb/include/tdb.h" +#include "lib/events/events.h" +#include "lib/util/dlinklist.h" +#include "system/network.h" +#include "system/filesys.h" +#include "../include/ctdb_private.h" +#include "../include/ctdb.h" + +/* structures for packet queueing - see common/ctdb_io.c */ +struct ctdb_partial { + uint8_t *data; + uint32_t length; +}; + +struct ctdb_queue_pkt { + struct ctdb_queue_pkt *next, *prev; + uint8_t *data; + uint32_t length; +}; + +struct ctdb_queue { + struct ctdb_context *ctdb; + struct ctdb_partial partial; /* partial input packet */ + struct ctdb_queue_pkt *out_queue; + struct fd_event *fde; + int fd; + size_t alignment; + void *private_data; + ctdb_queue_cb_fn_t callback; +}; + + + +/* + called when an incoming connection is readable +*/ +static void queue_io_read(struct ctdb_queue *queue) +{ + int num_ready = 0; + ssize_t nread; + uint8_t *data, *data_base; + + if (ioctl(queue->fd, FIONREAD, &num_ready) != 0 || + num_ready == 0) { + /* the descriptor has been closed */ + goto failed; + } + + + queue->partial.data = talloc_realloc_size(queue, queue->partial.data, + num_ready + queue->partial.length); + + if (queue->partial.data == NULL) { + goto failed; + } + + nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready); + if (nread <= 0) { + goto failed; + } + + + data = queue->partial.data; + nread += queue->partial.length; + + queue->partial.data = NULL; + queue->partial.length = 0; + + if (nread >= 4 && *(uint32_t *)data == nread) { + /* it is the responsibility of the incoming packet + function to free 'data' */ + queue->callback(data, nread, queue->private_data); + return; + } + + data_base = data; + + while (nread >= 4 && *(uint32_t *)data <= nread) { + /* we have at least one packet */ + uint8_t *d2; + uint32_t len; + len = *(uint32_t *)data; + d2 = talloc_memdup(queue, data, len); + if (d2 == NULL) { + /* sigh */ + goto failed; + } + queue->callback(d2, len, queue->private_data); + data += len; + nread -= len; + } + + if (nread > 0) { + /* we have only part of a packet */ + if (data_base == data) { + queue->partial.data = data; + queue->partial.length = nread; + } else { + queue->partial.data = talloc_memdup(queue, data, nread); + if (queue->partial.data == NULL) { + goto failed; + } + queue->partial.length = nread; + talloc_free(data_base); + } + return; + } + + talloc_free(data_base); + return; + +failed: + queue->callback(NULL, 0, queue->private_data); +} + + +/* used when an event triggers a dead queue */ +static void queue_dead(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue); + queue->callback(NULL, 0, queue->private_data); +} + + +/* + called when an incoming connection is writeable +*/ +static void queue_io_write(struct ctdb_queue *queue) +{ + while (queue->out_queue) { + struct ctdb_queue_pkt *pkt = queue->out_queue; + ssize_t n; + + n = write(queue->fd, pkt->data, pkt->length); + + if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + event_add_timed(queue->ctdb->ev, queue, timeval_zero(), + queue_dead, queue); + EVENT_FD_NOT_WRITEABLE(queue->fde); + return; + } + if (n <= 0) return; + + if (n != pkt->length) { + pkt->length -= n; + pkt->data += n; + return; + } + + DLIST_REMOVE(queue->out_queue, pkt); + talloc_free(pkt); + } + + EVENT_FD_NOT_WRITEABLE(queue->fde); +} + +/* + called when an incoming connection is readable or writeable +*/ +static void queue_io_handler(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private_data) +{ + struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue); + + if (flags & EVENT_FD_READ) { + queue_io_read(queue); + } else { + queue_io_write(queue); + } +} + + +/* + queue a packet for sending +*/ +int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length) +{ + struct ctdb_queue_pkt *pkt; + uint32_t length2; + + /* enforce the length and alignment rules from the tcp packet allocator */ + length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1); + *(uint32_t *)data = length2; + + if (length2 != length) { + memset(data+length, 0, length2-length); + } + + /* if the queue is empty then try an immediate write, avoiding + queue overhead. This relies on non-blocking sockets */ + if (queue->out_queue == NULL && queue->fd != -1) { + ssize_t n = write(queue->fd, data, length2); + if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + event_add_timed(queue->ctdb->ev, queue, timeval_zero(), + queue_dead, queue); + /* yes, we report success, as the dead node is + handled via a separate event */ + return 0; + } + if (n > 0) { + data += n; + length2 -= n; + } + if (length2 == 0) return 0; + } + + pkt = talloc(queue, struct ctdb_queue_pkt); + CTDB_NO_MEMORY(queue->ctdb, pkt); + + pkt->data = talloc_memdup(pkt, data, length2); + CTDB_NO_MEMORY(queue->ctdb, pkt->data); + + pkt->length = length2; + + if (queue->out_queue == NULL && queue->fd != -1) { + EVENT_FD_WRITEABLE(queue->fde); + } + + DLIST_ADD_END(queue->out_queue, pkt, struct ctdb_queue_pkt *); + + return 0; +} + + +/* + setup the fd used by the queue + */ +int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd) +{ + queue->fd = fd; + talloc_free(queue->fde); + queue->fde = NULL; + + if (fd != -1) { + queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ, + queue_io_handler, queue); + if (queue->fde == NULL) { + return -1; + } + + if (queue->out_queue) { + EVENT_FD_WRITEABLE(queue->fde); + } + } + + return 0; +} + + + +/* + setup a packet queue on a socket + */ +struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb, + TALLOC_CTX *mem_ctx, int fd, int alignment, + + ctdb_queue_cb_fn_t callback, + void *private_data) +{ + struct ctdb_queue *queue; + + queue = talloc_zero(mem_ctx, struct ctdb_queue); + CTDB_NO_MEMORY_NULL(ctdb, queue); + + queue->ctdb = ctdb; + queue->fd = fd; + queue->alignment = alignment; + queue->private_data = private_data; + queue->callback = callback; + if (fd != -1) { + if (ctdb_queue_set_fd(queue, fd) != 0) { + talloc_free(queue); + return NULL; + } + } + + return queue; +} -- cgit From b8d69a7ea2505b706ff7c74d7c97bc89d82dfa07 Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Tue, 10 Jul 2007 02:46:15 +0000 Subject: r23795: more v2->v3 conversion (This used to be commit 84b468b2f8f2dffda89593f816e8bc6a8b6d42ac) --- source4/cluster/ctdb/common/ctdb_io.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'source4/cluster/ctdb/common/ctdb_io.c') diff --git a/source4/cluster/ctdb/common/ctdb_io.c b/source4/cluster/ctdb/common/ctdb_io.c index 238f1701cf..6753d423c6 100644 --- a/source4/cluster/ctdb/common/ctdb_io.c +++ b/source4/cluster/ctdb/common/ctdb_io.c @@ -9,7 +9,7 @@ This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either - version 2 of the License, or (at your option) any later version. + version 3 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -- cgit From 6c973f4e8ccbcb6c9275f8a54e26abb19df7e15a Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Tue, 10 Jul 2007 03:42:26 +0000 Subject: r23798: updated old Temple Place FSF addresses to new URL (This used to be commit 40c0919aaa9c1b14bbaebb95ecce53eb0380fdbb) --- source4/cluster/ctdb/common/ctdb_io.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'source4/cluster/ctdb/common/ctdb_io.c') diff --git a/source4/cluster/ctdb/common/ctdb_io.c b/source4/cluster/ctdb/common/ctdb_io.c index 6753d423c6..517fbbd842 100644 --- a/source4/cluster/ctdb/common/ctdb_io.c +++ b/source4/cluster/ctdb/common/ctdb_io.c @@ -17,8 +17,7 @@ Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public - License along with this library; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + License along with this library; if not, see . */ #include "includes.h" -- cgit From 6504900f1f52927adab3489b8d04b6644ceaee7d Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Tue, 10 Jul 2007 08:06:51 +0000 Subject: r23806: update Samba4 with the latest ctdb code. This doesn't get the ctdb code fully working in Samba4, it just gets it building and not breaking non-clustered use of Samba. It will take a bit longer to update some of the calling ctdb_cluster.c code to make it work correctly in Samba4. Note also that Samba4 now only links to the client portion of ctdb. For the moment I am leaving the ctdbd as a separate daemon, which you install separately from http://ctdb.samba.org/. (This used to be commit b196077cbb55cbecad87065133c2d67198e31066) --- source4/cluster/ctdb/common/ctdb_io.c | 79 +++++++++++++++++++++++++---------- 1 file changed, 57 insertions(+), 22 deletions(-) (limited to 'source4/cluster/ctdb/common/ctdb_io.c') diff --git a/source4/cluster/ctdb/common/ctdb_io.c b/source4/cluster/ctdb/common/ctdb_io.c index 517fbbd842..3cc522b58a 100644 --- a/source4/cluster/ctdb/common/ctdb_io.c +++ b/source4/cluster/ctdb/common/ctdb_io.c @@ -6,18 +6,18 @@ Copyright (C) Andrew Tridgell 2006 - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation; either - version 3 of the License, or (at your option) any later version. - - This library is distributed in the hope that it will be useful, + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, see . + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see . */ #include "includes.h" @@ -39,6 +39,7 @@ struct ctdb_queue_pkt { struct ctdb_queue_pkt *next, *prev; uint8_t *data; uint32_t length; + uint32_t full_length; }; struct ctdb_queue { @@ -63,8 +64,10 @@ static void queue_io_read(struct ctdb_queue *queue) ssize_t nread; uint8_t *data, *data_base; - if (ioctl(queue->fd, FIONREAD, &num_ready) != 0 || - num_ready == 0) { + if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) { + return; + } + if (num_ready == 0) { /* the descriptor has been closed */ goto failed; } @@ -74,11 +77,14 @@ static void queue_io_read(struct ctdb_queue *queue) num_ready + queue->partial.length); if (queue->partial.data == NULL) { + DEBUG(0,("read error alloc failed for %u\n", + num_ready + queue->partial.length)); goto failed; } nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready); if (nread <= 0) { + DEBUG(0,("read error nread=%d\n", (int)nread)); goto failed; } @@ -103,8 +109,14 @@ static void queue_io_read(struct ctdb_queue *queue) uint8_t *d2; uint32_t len; len = *(uint32_t *)data; + if (len == 0) { + /* bad packet! treat as EOF */ + DEBUG(0,("Invalid packet of length 0\n")); + goto failed; + } d2 = talloc_memdup(queue, data, len); if (d2 == NULL) { + DEBUG(0,("read error memdup failed for %u\n", len)); /* sigh */ goto failed; } @@ -121,6 +133,8 @@ static void queue_io_read(struct ctdb_queue *queue) } else { queue->partial.data = talloc_memdup(queue, data, nread); if (queue->partial.data == NULL) { + DEBUG(0,("read error memdup partial failed for %u\n", + (unsigned)nread)); goto failed; } queue->partial.length = nread; @@ -154,13 +168,23 @@ static void queue_io_write(struct ctdb_queue *queue) while (queue->out_queue) { struct ctdb_queue_pkt *pkt = queue->out_queue; ssize_t n; - - n = write(queue->fd, pkt->data, pkt->length); + if (queue->ctdb->flags & CTDB_FLAG_TORTURE) { + n = write(queue->fd, pkt->data, 1); + } else { + n = write(queue->fd, pkt->data, pkt->length); + } if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + if (pkt->length != pkt->full_length) { + /* partial packet sent - we have to drop it */ + DLIST_REMOVE(queue->out_queue, pkt); + talloc_free(pkt); + } + talloc_free(queue->fde); + queue->fde = NULL; + queue->fd = -1; event_add_timed(queue->ctdb->ev, queue, timeval_zero(), queue_dead, queue); - EVENT_FD_NOT_WRITEABLE(queue->fde); return; } if (n <= 0) return; @@ -200,21 +224,31 @@ static void queue_io_handler(struct event_context *ev, struct fd_event *fde, int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length) { struct ctdb_queue_pkt *pkt; - uint32_t length2; + uint32_t length2, full_length; - /* enforce the length and alignment rules from the tcp packet allocator */ - length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1); - *(uint32_t *)data = length2; + if (queue->alignment) { + /* enforce the length and alignment rules from the tcp packet allocator */ + length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1); + *(uint32_t *)data = length2; + } else { + length2 = length; + } if (length2 != length) { memset(data+length, 0, length2-length); } + + full_length = length2; /* if the queue is empty then try an immediate write, avoiding queue overhead. This relies on non-blocking sockets */ - if (queue->out_queue == NULL && queue->fd != -1) { + if (queue->out_queue == NULL && queue->fd != -1 && + !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) { ssize_t n = write(queue->fd, data, length2); if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + talloc_free(queue->fde); + queue->fde = NULL; + queue->fd = -1; event_add_timed(queue->ctdb->ev, queue, timeval_zero(), queue_dead, queue); /* yes, we report success, as the dead node is @@ -235,6 +269,7 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length) CTDB_NO_MEMORY(queue->ctdb, pkt->data); pkt->length = length2; + pkt->full_length = full_length; if (queue->out_queue == NULL && queue->fd != -1) { EVENT_FD_WRITEABLE(queue->fde); @@ -256,7 +291,7 @@ int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd) queue->fde = NULL; if (fd != -1) { - queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ, + queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE, queue_io_handler, queue); if (queue->fde == NULL) { return -1; -- cgit From cd962355abad90a2161765a7be7d26e63572cab7 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Fri, 7 Sep 2007 15:08:14 +0000 Subject: r25000: Fix some more C++ compatibility warnings. (This used to be commit 08bb1ef643ab906f1645cf6f32763dc73b1884e4) --- source4/cluster/ctdb/common/ctdb_io.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'source4/cluster/ctdb/common/ctdb_io.c') diff --git a/source4/cluster/ctdb/common/ctdb_io.c b/source4/cluster/ctdb/common/ctdb_io.c index 3cc522b58a..5c418e4956 100644 --- a/source4/cluster/ctdb/common/ctdb_io.c +++ b/source4/cluster/ctdb/common/ctdb_io.c @@ -73,8 +73,9 @@ static void queue_io_read(struct ctdb_queue *queue) } - queue->partial.data = talloc_realloc_size(queue, queue->partial.data, - num_ready + queue->partial.length); + queue->partial.data = talloc_realloc(queue, queue->partial.data, + uint8_t, + num_ready + queue->partial.length); if (queue->partial.data == NULL) { DEBUG(0,("read error alloc failed for %u\n", -- cgit From dccf3f99e45137b6cd18c1de1c79808ad67130d1 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Sat, 8 Sep 2007 13:27:14 +0000 Subject: r25027: Fix more warnings. (This used to be commit 5085c53fcfade614e83d21fc2c1a5bc43bb2a729) --- source4/cluster/ctdb/common/ctdb_io.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'source4/cluster/ctdb/common/ctdb_io.c') diff --git a/source4/cluster/ctdb/common/ctdb_io.c b/source4/cluster/ctdb/common/ctdb_io.c index 5c418e4956..ca9c635878 100644 --- a/source4/cluster/ctdb/common/ctdb_io.c +++ b/source4/cluster/ctdb/common/ctdb_io.c @@ -115,7 +115,7 @@ static void queue_io_read(struct ctdb_queue *queue) DEBUG(0,("Invalid packet of length 0\n")); goto failed; } - d2 = talloc_memdup(queue, data, len); + d2 = (uint8_t *)talloc_memdup(queue, data, len); if (d2 == NULL) { DEBUG(0,("read error memdup failed for %u\n", len)); /* sigh */ @@ -132,7 +132,7 @@ static void queue_io_read(struct ctdb_queue *queue) queue->partial.data = data; queue->partial.length = nread; } else { - queue->partial.data = talloc_memdup(queue, data, nread); + queue->partial.data = (uint8_t *)talloc_memdup(queue, data, nread); if (queue->partial.data == NULL) { DEBUG(0,("read error memdup partial failed for %u\n", (unsigned)nread)); @@ -266,7 +266,7 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length) pkt = talloc(queue, struct ctdb_queue_pkt); CTDB_NO_MEMORY(queue->ctdb, pkt); - pkt->data = talloc_memdup(pkt, data, length2); + pkt->data = (uint8_t *)talloc_memdup(pkt, data, length2); CTDB_NO_MEMORY(queue->ctdb, pkt->data); pkt->length = length2; -- cgit