diff options
author | Andrew Tridgell <tridge@samba.org> | 2007-01-19 03:54:48 +0000 |
---|---|---|
committer | Gerald (Jerry) Carter <jerry@samba.org> | 2007-10-10 14:43:46 -0500 |
commit | 5cb78383fafa15c2ff7a4ccd194cccd5cf5cd263 (patch) | |
tree | 050e2f47faf234685cd7f20ab7e4f37e6521f7a2 /source4/cluster/ctdb/tcp | |
parent | 8c3d15f6caa3f1ffda86755fa9b7ff9602cbb022 (diff) | |
download | samba-5cb78383fafa15c2ff7a4ccd194cccd5cf5cd263.tar.gz samba-5cb78383fafa15c2ff7a4ccd194cccd5cf5cd263.tar.bz2 samba-5cb78383fafa15c2ff7a4ccd194cccd5cf5cd263.zip |
r20889: import ctdb cluster backend from bzr
it will be interesting to see how the build farm handles this
(This used to be commit 53be449630bd67d649a9e70cc7e25a9799c0616b)
Diffstat (limited to 'source4/cluster/ctdb/tcp')
-rw-r--r-- | source4/cluster/ctdb/tcp/ctdb_tcp.h | 76 | ||||
-rw-r--r-- | source4/cluster/ctdb/tcp/tcp_connect.c | 191 | ||||
-rw-r--r-- | source4/cluster/ctdb/tcp/tcp_init.c | 102 | ||||
-rw-r--r-- | source4/cluster/ctdb/tcp/tcp_io.c | 254 |
4 files changed, 623 insertions, 0 deletions
diff --git a/source4/cluster/ctdb/tcp/ctdb_tcp.h b/source4/cluster/ctdb/tcp/ctdb_tcp.h new file mode 100644 index 0000000000..0f8ce300b4 --- /dev/null +++ b/source4/cluster/ctdb/tcp/ctdb_tcp.h @@ -0,0 +1,76 @@ +/* + ctdb database library + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + + +/* ctdb_tcp main state */ +struct ctdb_tcp { + int listen_fd; +}; + +/* + incoming packet structure - only used when we get a partial packet + on read +*/ +struct ctdb_tcp_partial { + uint8_t *data; + uint32_t length; +}; + + +/* + state associated with an incoming connection +*/ +struct ctdb_incoming { + struct ctdb_context *ctdb; + int fd; + struct ctdb_tcp_partial partial; +}; + +/* + outgoing packet structure - only allocated when we can't write immediately + to the socket +*/ +struct ctdb_tcp_packet { + struct ctdb_tcp_packet *next, *prev; + uint8_t *data; + uint32_t length; +}; + +/* + state associated with one tcp node +*/ +struct ctdb_tcp_node { + int fd; + struct fd_event *fde; + struct ctdb_tcp_packet *queue; +}; + + +/* prototypes internal to tcp transport */ +void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private); +void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private); +int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length); +int ctdb_tcp_listen(struct ctdb_context *ctdb); +void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private); + +#define CTDB_TCP_ALIGNMENT 8 diff --git a/source4/cluster/ctdb/tcp/tcp_connect.c b/source4/cluster/ctdb/tcp/tcp_connect.c new file mode 100644 index 0000000000..2404144ac1 --- /dev/null +++ b/source4/cluster/ctdb/tcp/tcp_connect.c @@ -0,0 +1,191 @@ +/* + ctdb over TCP + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "lib/tdb/include/tdb.h" +#include "system/network.h" +#include "system/filesys.h" +#include "cluster/ctdb/include/ctdb_private.h" +#include "ctdb_tcp.h" + +static void set_nonblocking(int fd) +{ + unsigned v; + v = fcntl(fd, F_GETFL, 0); + fcntl(fd, F_SETFL, v | O_NONBLOCK); +} + + +/* + called when socket becomes writeable on connect +*/ +static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private) +{ + struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); + struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_tcp_node); + struct ctdb_context *ctdb = node->ctdb; + int error = 0; + socklen_t len = sizeof(error); + + if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 || + error != 0) { + talloc_free(fde); + close(tnode->fd); + tnode->fd = -1; + event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0), + ctdb_tcp_node_connect, node); + return; + } + + talloc_free(fde); + tnode->fde = event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_READ, + ctdb_tcp_node_write, node); + + /* tell the ctdb layer we are connected */ + node->ctdb->upcalls->node_connected(node); + + if (tnode->queue) { + EVENT_FD_WRITEABLE(tnode->fde); + } +} + +/* + called when we should try and establish a tcp connection to a node +*/ +void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private) +{ + struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); + struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_tcp_node); + struct ctdb_context *ctdb = node->ctdb; + struct sockaddr_in sock_out; + + tnode->fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + + set_nonblocking(tnode->fd); + + inet_pton(AF_INET, node->address.address, &sock_out.sin_addr); + sock_out.sin_port = htons(node->address.port); + sock_out.sin_family = PF_INET; + + if (connect(tnode->fd, (struct sockaddr *)&sock_out, sizeof(sock_out)) != 0 && + errno != EINPROGRESS) { + /* try again once a second */ + close(tnode->fd); + event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0), + ctdb_tcp_node_connect, node); + return; + } + + /* non-blocking connect - wait for write event */ + event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_WRITE|EVENT_FD_READ, + ctdb_node_connect_write, node); +} + +/* + destroy a ctdb_incoming structure +*/ +static int ctdb_incoming_destructor(struct ctdb_incoming *in) +{ + close(in->fd); + in->fd = -1; + return 0; +} + +/* + called when we get contacted by another node + currently makes no attempt to check if the connection is really from a ctdb + node in our cluster +*/ +static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private) +{ + struct ctdb_context *ctdb; + struct ctdb_tcp *ctcp; + struct sockaddr_in addr; + socklen_t len; + int fd; + struct ctdb_incoming *in; + + ctdb = talloc_get_type(private, struct ctdb_context); + ctcp = talloc_get_type(ctdb->private, struct ctdb_tcp); + memset(&addr, 0, sizeof(addr)); + len = sizeof(addr); + fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len); + if (fd == -1) return; + + in = talloc_zero(ctdb, struct ctdb_incoming); + in->fd = fd; + in->ctdb = ctdb; + + set_nonblocking(in->fd); + + event_add_fd(ctdb->ev, in, in->fd, EVENT_FD_READ, + ctdb_tcp_incoming_read, in); + + talloc_set_destructor(in, ctdb_incoming_destructor); +} + + +/* + listen on our own address +*/ +int ctdb_tcp_listen(struct ctdb_context *ctdb) +{ + struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private, struct ctdb_tcp); + struct sockaddr_in sock; + int one = 1; + + sock.sin_port = htons(ctdb->address.port); + sock.sin_family = PF_INET; + inet_pton(AF_INET, ctdb->address.address, &sock.sin_addr); + + ctcp->listen_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + if (ctcp->listen_fd == -1) { + ctdb_set_error(ctdb, "socket failed\n"); + return -1; + } + + setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one)); + + if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sizeof(sock)) != 0) { + ctdb_set_error(ctdb, "bind failed\n"); + close(ctcp->listen_fd); + ctcp->listen_fd = -1; + return -1; + } + + if (listen(ctcp->listen_fd, 10) == -1) { + ctdb_set_error(ctdb, "listen failed\n"); + close(ctcp->listen_fd); + ctcp->listen_fd = -1; + return -1; + } + + event_add_fd(ctdb->ev, ctdb, ctcp->listen_fd, EVENT_FD_READ, + ctdb_listen_event, ctdb); + + return 0; +} + diff --git a/source4/cluster/ctdb/tcp/tcp_init.c b/source4/cluster/ctdb/tcp/tcp_init.c new file mode 100644 index 0000000000..b8ee8cb30e --- /dev/null +++ b/source4/cluster/ctdb/tcp/tcp_init.c @@ -0,0 +1,102 @@ +/* + ctdb over TCP + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/tdb/include/tdb.h" +#include "lib/events/events.h" +#include "system/network.h" +#include "system/filesys.h" +#include "cluster/ctdb/include/ctdb_private.h" +#include "ctdb_tcp.h" + +/* + start the protocol going +*/ +int ctdb_tcp_start(struct ctdb_context *ctdb) +{ + int i; + + /* listen on our own address */ + if (ctdb_tcp_listen(ctdb) != 0) return -1; + + /* startup connections to the other servers - will happen on + next event loop */ + for (i=0;i<ctdb->num_nodes;i++) { + struct ctdb_node *node = *(ctdb->nodes + i); + if (!(ctdb->flags & CTDB_FLAG_SELF_CONNECT) && + ctdb_same_address(&ctdb->address, &node->address)) continue; + event_add_timed(ctdb->ev, node, timeval_zero(), + ctdb_tcp_node_connect, node); + } + + return 0; +} + + +/* + initialise tcp portion of a ctdb node +*/ +int ctdb_tcp_add_node(struct ctdb_node *node) +{ + struct ctdb_tcp_node *tnode; + tnode = talloc_zero(node, struct ctdb_tcp_node); + CTDB_NO_MEMORY(node->ctdb, tnode); + + tnode->fd = -1; + node->private = tnode; + return 0; +} + + +/* + transport packet allocator - allows transport to control memory for packets +*/ +void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size) +{ + /* tcp transport needs to round to 8 byte alignment to ensure + that we can use a length header and 64 bit elements in + structures */ + size = (size+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1); + return talloc_size(ctdb, size); +} + + +static const struct ctdb_methods ctdb_tcp_methods = { + .start = ctdb_tcp_start, + .add_node = ctdb_tcp_add_node, + .queue_pkt = ctdb_tcp_queue_pkt, + .allocate_pkt = ctdb_tcp_allocate_pkt +}; + +/* + initialise tcp portion of ctdb +*/ +int ctdb_tcp_init(struct ctdb_context *ctdb) +{ + struct ctdb_tcp *ctcp; + ctcp = talloc_zero(ctdb, struct ctdb_tcp); + CTDB_NO_MEMORY(ctdb, ctcp); + + ctcp->listen_fd = -1; + ctdb->private = ctcp; + ctdb->methods = &ctdb_tcp_methods; + return 0; +} + diff --git a/source4/cluster/ctdb/tcp/tcp_io.c b/source4/cluster/ctdb/tcp/tcp_io.c new file mode 100644 index 0000000000..82e24f7260 --- /dev/null +++ b/source4/cluster/ctdb/tcp/tcp_io.c @@ -0,0 +1,254 @@ +/* + ctdb over TCP + + Copyright (C) Andrew Tridgell 2006 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "lib/util/dlinklist.h" +#include "lib/tdb/include/tdb.h" +#include "system/network.h" +#include "system/filesys.h" +#include "cluster/ctdb/include/ctdb_private.h" +#include "ctdb_tcp.h" + + +/* + called when we fail to send a message to a node +*/ +static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private) +{ + struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); + struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_tcp_node); + + /* flush the queue */ + while (tnode->queue) { + struct ctdb_tcp_packet *pkt = tnode->queue; + DLIST_REMOVE(tnode->queue, pkt); + talloc_free(pkt); + } + + /* start a new connect cycle to try to re-establish the + link */ + talloc_free(tnode->fde); + close(tnode->fd); + tnode->fd = -1; + event_add_timed(node->ctdb->ev, node, timeval_zero(), + ctdb_tcp_node_connect, node); +} + +/* + called when socket becomes readable +*/ +void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private) +{ + struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); + struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_tcp_node); + if (flags & EVENT_FD_READ) { + /* getting a read event on this fd in the current tcp model is + always an error, as we have separate read and write + sockets. In future we may combine them, but for now it must + mean that the socket is dead, so we try to reconnect */ + talloc_free(tnode->fde); + close(tnode->fd); + tnode->fd = -1; + event_add_timed(node->ctdb->ev, node, timeval_zero(), + ctdb_tcp_node_connect, node); + return; + } + + while (tnode->queue) { + struct ctdb_tcp_packet *pkt = tnode->queue; + ssize_t n; + + n = write(tnode->fd, pkt->data, pkt->length); + + if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + event_add_timed(node->ctdb->ev, node, timeval_zero(), + ctdb_tcp_node_dead, node); + EVENT_FD_NOT_WRITEABLE(tnode->fde); + return; + } + if (n <= 0) return; + + if (n != pkt->length) { + pkt->length -= n; + pkt->data += n; + return; + } + + DLIST_REMOVE(tnode->queue, pkt); + talloc_free(pkt); + } + + EVENT_FD_NOT_WRITEABLE(tnode->fde); +} + + +/* + called when an incoming connection is readable +*/ +void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private) +{ + struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming); + int num_ready = 0; + ssize_t nread; + uint8_t *data, *data_base; + + if (ioctl(in->fd, FIONREAD, &num_ready) != 0 || + num_ready == 0) { + /* we've lost the link from another node. We don't + notify the upper layers, as we only want to trigger + a full node reorganisation when a send fails - that + allows nodes to restart without penalty as long as + the network is idle */ + talloc_free(in); + return; + } + + in->partial.data = talloc_realloc_size(in, in->partial.data, + num_ready + in->partial.length); + if (in->partial.data == NULL) { + /* not much we can do except drop the socket */ + talloc_free(in); + return; + } + + nread = read(in->fd, in->partial.data+in->partial.length, num_ready); + if (nread <= 0) { + /* the connection must be dead */ + talloc_free(in); + return; + } + + data = in->partial.data; + nread += in->partial.length; + + in->partial.data = NULL; + in->partial.length = 0; + + if (nread >= 4 && *(uint32_t *)data == nread) { + /* most common case - we got a whole packet in one go + tell the ctdb layer above that we have a packet */ + in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread); + return; + } + + data_base = data; + + while (nread >= 4 && *(uint32_t *)data <= nread) { + /* we have at least one packet */ + uint8_t *d2; + uint32_t len; + len = *(uint32_t *)data; + d2 = talloc_memdup(in, data, len); + if (d2 == NULL) { + /* sigh */ + talloc_free(in); + return; + } + in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len); + data += len; + nread -= len; + return; + } + + if (nread < 4 || *(uint32_t *)data > nread) { + /* we have only part of a packet */ + if (data_base == data) { + in->partial.data = data; + in->partial.length = nread; + } else { + in->partial.data = talloc_memdup(in, data, nread); + if (in->partial.data == NULL) { + talloc_free(in); + return; + } + in->partial.length = nread; + talloc_free(data_base); + } + return; + } + + talloc_free(data_base); +} + +/* + queue a packet for sending +*/ +int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length) +{ + struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_tcp_node); + struct ctdb_tcp_packet *pkt; + uint32_t length2; + + /* enforce the length and alignment rules from the tcp packet allocator */ + length2 = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1); + *(uint32_t *)data = length2; + + if (length2 != length) { + memset(data+length, 0, length2-length); + } + { + int i, fd = open("/dev/null", O_WRONLY); + for (i=0;i<length2;i++) { + write(fd, &data[i], 1); + } + close(fd); + } + + /* if the queue is empty then try an immediate write, avoiding + queue overhead. This relies on non-blocking sockets */ + if (tnode->queue == NULL && tnode->fd != -1) { + ssize_t n = write(tnode->fd, data, length2); + if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + event_add_timed(node->ctdb->ev, node, timeval_zero(), + ctdb_tcp_node_dead, node); + /* yes, we report success, as the dead node is + handled via a separate event */ + return 0; + } + if (n > 0) { + data += n; + length2 -= n; + } + if (length2 == 0) return 0; + } + + pkt = talloc(tnode, struct ctdb_tcp_packet); + CTDB_NO_MEMORY(node->ctdb, pkt); + + pkt->data = talloc_memdup(pkt, data, length2); + CTDB_NO_MEMORY(node->ctdb, pkt->data); + + pkt->length = length2; + + if (tnode->queue == NULL && tnode->fd != -1) { + EVENT_FD_WRITEABLE(tnode->fde); + } + + DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *); + + return 0; +} |