diff options
Diffstat (limited to 'source4/cluster/ctdb/tcp')
-rw-r--r-- | source4/cluster/ctdb/tcp/ctdb_tcp.h | 33 | ||||
-rw-r--r-- | source4/cluster/ctdb/tcp/tcp_connect.c | 57 | ||||
-rw-r--r-- | source4/cluster/ctdb/tcp/tcp_init.c | 20 | ||||
-rw-r--r-- | source4/cluster/ctdb/tcp/tcp_io.c | 205 |
4 files changed, 79 insertions, 236 deletions
diff --git a/source4/cluster/ctdb/tcp/ctdb_tcp.h b/source4/cluster/ctdb/tcp/ctdb_tcp.h index 0f8ce300b4..a34cd9736d 100644 --- a/source4/cluster/ctdb/tcp/ctdb_tcp.h +++ b/source4/cluster/ctdb/tcp/ctdb_tcp.h @@ -25,32 +25,12 @@ struct ctdb_tcp { }; /* - incoming packet structure - only used when we get a partial packet - on read -*/ -struct ctdb_tcp_partial { - uint8_t *data; - uint32_t length; -}; - - -/* state associated with an incoming connection */ struct ctdb_incoming { struct ctdb_context *ctdb; int fd; - struct ctdb_tcp_partial partial; -}; - -/* - outgoing packet structure - only allocated when we can't write immediately - to the socket -*/ -struct ctdb_tcp_packet { - struct ctdb_tcp_packet *next, *prev; - uint8_t *data; - uint32_t length; + struct ctdb_queue *queue; }; /* @@ -58,19 +38,16 @@ struct ctdb_tcp_packet { */ struct ctdb_tcp_node { int fd; - struct fd_event *fde; - struct ctdb_tcp_packet *queue; + struct ctdb_queue *queue; }; /* prototypes internal to tcp transport */ -void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private); -void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private); int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length); int ctdb_tcp_listen(struct ctdb_context *ctdb); void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private); + struct timeval t, void *private_data); +void ctdb_tcp_read_cb(uint8_t *data, size_t cnt, void *args); +void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private_data); #define CTDB_TCP_ALIGNMENT 8 diff --git a/source4/cluster/ctdb/tcp/tcp_connect.c b/source4/cluster/ctdb/tcp/tcp_connect.c index 85fffc2f70..a1f2d331cf 100644 --- a/source4/cluster/ctdb/tcp/tcp_connect.c +++ b/source4/cluster/ctdb/tcp/tcp_connect.c @@ -35,13 +35,32 @@ static void set_nonblocking(int fd) /* + called when a complete packet has come in - should not happen on this socket + */ +void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private_data) +{ + struct ctdb_node *node = talloc_get_type(private_data, struct ctdb_node); + struct ctdb_tcp_node *tnode = talloc_get_type( + node->private_data, struct ctdb_tcp_node); + + /* start a new connect cycle to try to re-establish the + link */ + close(tnode->fd); + ctdb_queue_set_fd(tnode->queue, -1); + tnode->fd = -1; + event_add_timed(node->ctdb->ev, node, timeval_zero(), + ctdb_tcp_node_connect, node); +} + +/* called when socket becomes writeable on connect */ static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private) + uint16_t flags, void *private_data) { - struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); - struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_node *node = talloc_get_type(private_data, + struct ctdb_node); + struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data, struct ctdb_tcp_node); struct ctdb_context *ctdb = node->ctdb; int error = 0; @@ -59,17 +78,13 @@ static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *f } talloc_free(fde); - tnode->fde = event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_READ, - ctdb_tcp_node_write, node); + + setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one)); + + ctdb_queue_set_fd(tnode->queue, tnode->fd); /* tell the ctdb layer we are connected */ node->ctdb->upcalls->node_connected(node); - - setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one)); - - if (tnode->queue) { - EVENT_FD_WRITEABLE(tnode->fde); - } } @@ -92,10 +107,11 @@ static int ctdb_tcp_get_address(struct ctdb_context *ctdb, called when we should try and establish a tcp connection to a node */ void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private) + struct timeval t, void *private_data) { - struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); - struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_node *node = talloc_get_type(private_data, + struct ctdb_node); + struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data, struct ctdb_tcp_node); struct ctdb_context *ctdb = node->ctdb; struct sockaddr_in sock_in; @@ -155,7 +171,7 @@ static int ctdb_incoming_destructor(struct ctdb_incoming *in) node in our cluster */ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private) + uint16_t flags, void *private_data) { struct ctdb_context *ctdb; struct ctdb_tcp *ctcp; @@ -164,8 +180,8 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, int fd; struct ctdb_incoming *in; - ctdb = talloc_get_type(private, struct ctdb_context); - ctcp = talloc_get_type(ctdb->private, struct ctdb_tcp); + ctdb = talloc_get_type(private_data, struct ctdb_context); + ctcp = talloc_get_type(ctdb->private_data, struct ctdb_tcp); memset(&addr, 0, sizeof(addr)); len = sizeof(addr); fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len); @@ -177,8 +193,8 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, set_nonblocking(in->fd); - event_add_fd(ctdb->ev, in, in->fd, EVENT_FD_READ, - ctdb_tcp_incoming_read, in); + in->queue = ctdb_queue_setup(ctdb, in, in->fd, CTDB_TCP_ALIGNMENT, + ctdb_tcp_read_cb, in); talloc_set_destructor(in, ctdb_incoming_destructor); } @@ -189,7 +205,8 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, */ int ctdb_tcp_listen(struct ctdb_context *ctdb) { - struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private, struct ctdb_tcp); + struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data, + struct ctdb_tcp); struct sockaddr_in sock; int one = 1; diff --git a/source4/cluster/ctdb/tcp/tcp_init.c b/source4/cluster/ctdb/tcp/tcp_init.c index 0058e7ad85..20b9bc9e33 100644 --- a/source4/cluster/ctdb/tcp/tcp_init.c +++ b/source4/cluster/ctdb/tcp/tcp_init.c @@ -29,7 +29,7 @@ /* start the protocol going */ -int ctdb_tcp_start(struct ctdb_context *ctdb) +static int ctdb_tcp_start(struct ctdb_context *ctdb) { int i; @@ -46,6 +46,12 @@ int ctdb_tcp_start(struct ctdb_context *ctdb) ctdb_tcp_node_connect, node); } + if (ctdb->flags&CTDB_FLAG_CONNECT_WAIT) { + /* wait until all nodes are connected (should not be needed + outide of test code) */ + ctdb_connect_wait(ctdb); + } + return 0; } @@ -53,14 +59,18 @@ int ctdb_tcp_start(struct ctdb_context *ctdb) /* initialise tcp portion of a ctdb node */ -int ctdb_tcp_add_node(struct ctdb_node *node) +static int ctdb_tcp_add_node(struct ctdb_node *node) { struct ctdb_tcp_node *tnode; tnode = talloc_zero(node, struct ctdb_tcp_node); CTDB_NO_MEMORY(node->ctdb, tnode); tnode->fd = -1; - node->private = tnode; + node->private_data = tnode; + + tnode->queue = ctdb_queue_setup(node->ctdb, node, tnode->fd, CTDB_TCP_ALIGNMENT, + ctdb_tcp_tnode_cb, node); + return 0; } @@ -68,7 +78,7 @@ int ctdb_tcp_add_node(struct ctdb_node *node) /* transport packet allocator - allows transport to control memory for packets */ -void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size) +static void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size) { /* tcp transport needs to round to 8 byte alignment to ensure that we can use a length header and 64 bit elements in @@ -95,7 +105,7 @@ int ctdb_tcp_init(struct ctdb_context *ctdb) CTDB_NO_MEMORY(ctdb, ctcp); ctcp->listen_fd = -1; - ctdb->private = ctcp; + ctdb->private_data = ctcp; ctdb->methods = &ctdb_tcp_methods; return 0; } diff --git a/source4/cluster/ctdb/tcp/tcp_io.c b/source4/cluster/ctdb/tcp/tcp_io.c index e59f6167ff..150d726afb 100644 --- a/source4/cluster/ctdb/tcp/tcp_io.c +++ b/source4/cluster/ctdb/tcp/tcp_io.c @@ -29,161 +29,43 @@ /* - called when we fail to send a message to a node -*/ -static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private) -{ - struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); - struct ctdb_tcp_node *tnode = talloc_get_type(node->private, - struct ctdb_tcp_node); - - /* start a new connect cycle to try to re-establish the - link */ - talloc_free(tnode->fde); - close(tnode->fd); - tnode->fd = -1; - event_add_timed(node->ctdb->ev, node, timeval_zero(), - ctdb_tcp_node_connect, node); -} - -/* - called when socket becomes readable -*/ -void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private) -{ - struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); - struct ctdb_tcp_node *tnode = talloc_get_type(node->private, - struct ctdb_tcp_node); - if (flags & EVENT_FD_READ) { - /* getting a read event on this fd in the current tcp model is - always an error, as we have separate read and write - sockets. In future we may combine them, but for now it must - mean that the socket is dead, so we try to reconnect */ - node->ctdb->upcalls->node_dead(node); - talloc_free(tnode->fde); - close(tnode->fd); - tnode->fd = -1; - event_add_timed(node->ctdb->ev, node, timeval_zero(), - ctdb_tcp_node_connect, node); - return; - } - - while (tnode->queue) { - struct ctdb_tcp_packet *pkt = tnode->queue; - ssize_t n; - - n = write(tnode->fd, pkt->data, pkt->length); - - if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { - event_add_timed(node->ctdb->ev, node, timeval_zero(), - ctdb_tcp_node_dead, node); - EVENT_FD_NOT_WRITEABLE(tnode->fde); - return; - } - if (n <= 0) return; - - if (n != pkt->length) { - pkt->length -= n; - pkt->data += n; - return; - } - - DLIST_REMOVE(tnode->queue, pkt); - talloc_free(pkt); - } - - EVENT_FD_NOT_WRITEABLE(tnode->fde); -} - - -/* - called when an incoming connection is readable -*/ -void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private) + called when a complete packet has come in + */ +void ctdb_tcp_read_cb(uint8_t *data, size_t cnt, void *args) { - struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming); - int num_ready = 0; - ssize_t nread; - uint8_t *data, *data_base; + struct ctdb_incoming *in = talloc_get_type(args, struct ctdb_incoming); + struct ctdb_req_header *hdr; - if (ioctl(in->fd, FIONREAD, &num_ready) != 0 || - num_ready == 0) { - /* we've lost the link from another node. We don't - notify the upper layers, as we only want to trigger - a full node reorganisation when a send fails - that - allows nodes to restart without penalty as long as - the network is idle */ + if (data == NULL) { + /* incoming socket has died */ talloc_free(in); return; } - in->partial.data = talloc_realloc_size(in, in->partial.data, - num_ready + in->partial.length); - if (in->partial.data == NULL) { - /* not much we can do except drop the socket */ - talloc_free(in); + if (cnt < sizeof(*hdr)) { + ctdb_set_error(in->ctdb, "Bad packet length %d\n", cnt); return; } - - nread = read(in->fd, in->partial.data+in->partial.length, num_ready); - if (nread <= 0) { - /* the connection must be dead */ - talloc_free(in); + hdr = (struct ctdb_req_header *)data; + if (cnt != hdr->length) { + ctdb_set_error(in->ctdb, "Bad header length %d expected %d\n", + hdr->length, cnt); return; } - data = in->partial.data; - nread += in->partial.length; - - in->partial.data = NULL; - in->partial.length = 0; - - if (nread >= 4 && *(uint32_t *)data == nread) { - /* most common case - we got a whole packet in one go - tell the ctdb layer above that we have a packet */ - in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread); + if (hdr->ctdb_magic != CTDB_MAGIC) { + ctdb_set_error(in->ctdb, "Non CTDB packet rejected\n"); return; } - data_base = data; - - while (nread >= 4 && *(uint32_t *)data <= nread) { - /* we have at least one packet */ - uint8_t *d2; - uint32_t len; - len = *(uint32_t *)data; - d2 = talloc_memdup(in, data, len); - if (d2 == NULL) { - /* sigh */ - talloc_free(in); - return; - } - in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len); - data += len; - nread -= len; - } - - if (nread > 0) { - /* we have only part of a packet */ - if (data_base == data) { - in->partial.data = data; - in->partial.length = nread; - } else { - in->partial.data = talloc_memdup(in, data, nread); - if (in->partial.data == NULL) { - talloc_free(in); - return; - } - in->partial.length = nread; - talloc_free(data_base); - } + if (hdr->ctdb_version != CTDB_VERSION) { + ctdb_set_error(in->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); return; } - talloc_free(data_base); + /* most common case - we got a whole packet in one go + tell the ctdb layer above that we have a packet */ + in->ctdb->upcalls->recv_pkt(in->ctdb, data, cnt); } /* @@ -191,50 +73,7 @@ void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, */ int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length) { - struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data, struct ctdb_tcp_node); - struct ctdb_tcp_packet *pkt; - uint32_t length2; - - /* enforce the length and alignment rules from the tcp packet allocator */ - length2 = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1); - *(uint32_t *)data = length2; - - if (length2 != length) { - memset(data+length, 0, length2-length); - } - - /* if the queue is empty then try an immediate write, avoiding - queue overhead. This relies on non-blocking sockets */ - if (tnode->queue == NULL && tnode->fd != -1) { - ssize_t n = write(tnode->fd, data, length2); - if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { - event_add_timed(node->ctdb->ev, node, timeval_zero(), - ctdb_tcp_node_dead, node); - /* yes, we report success, as the dead node is - handled via a separate event */ - return 0; - } - if (n > 0) { - data += n; - length2 -= n; - } - if (length2 == 0) return 0; - } - - pkt = talloc(tnode, struct ctdb_tcp_packet); - CTDB_NO_MEMORY(node->ctdb, pkt); - - pkt->data = talloc_memdup(pkt, data, length2); - CTDB_NO_MEMORY(node->ctdb, pkt->data); - - pkt->length = length2; - - if (tnode->queue == NULL && tnode->fd != -1) { - EVENT_FD_WRITEABLE(tnode->fde); - } - - DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *); - - return 0; + return ctdb_queue_send(tnode->queue, data, length); } |