diff options
Diffstat (limited to 'source4/cluster/ctdb/ib')
-rw-r--r-- | source4/cluster/ctdb/ib/README.txt | 19 | ||||
-rw-r--r-- | source4/cluster/ctdb/ib/config.m4 | 31 | ||||
-rw-r--r-- | source4/cluster/ctdb/ib/ibw_ctdb.c | 174 | ||||
-rw-r--r-- | source4/cluster/ctdb/ib/ibw_ctdb.h | 50 | ||||
-rw-r--r-- | source4/cluster/ctdb/ib/ibw_ctdb_init.c | 237 | ||||
-rw-r--r-- | source4/cluster/ctdb/ib/ibwrapper.c | 1361 | ||||
-rw-r--r-- | source4/cluster/ctdb/ib/ibwrapper.h | 218 | ||||
-rw-r--r-- | source4/cluster/ctdb/ib/ibwrapper_internal.h | 126 | ||||
-rw-r--r-- | source4/cluster/ctdb/ib/ibwrapper_test.c | 659 |
9 files changed, 0 insertions, 2875 deletions
diff --git a/source4/cluster/ctdb/ib/README.txt b/source4/cluster/ctdb/ib/README.txt deleted file mode 100644 index 40419829ca..0000000000 --- a/source4/cluster/ctdb/ib/README.txt +++ /dev/null @@ -1,19 +0,0 @@ -Compilation -=========== - -For the configure script, please set the OFED include & library path by e.g.: - -export CFLAGS="-I/usr/local/ofed/include -L/usr/local/ofed/lib" - -After then: - -./configure --enable-infiniband - -Example for testing -=================== -bin/ctdb_test --transport ib --nlist ../2nodes_rm.txt --listen 10.0.0.1 -bin/ctdb_test --transport ib --nlist ../2nodes_rm.txt --listen 10.0.0.2 - -where 2nodes_rm.txt: -10.0.0.1 -10.0.0.2 diff --git a/source4/cluster/ctdb/ib/config.m4 b/source4/cluster/ctdb/ib/config.m4 deleted file mode 100644 index 9d95ea7a5a..0000000000 --- a/source4/cluster/ctdb/ib/config.m4 +++ /dev/null @@ -1,31 +0,0 @@ -AC_ARG_ENABLE(--enable-infiniband, -[ --enable-infiniband Turn on infiniband support (default=no)]) - -HAVE_INFINIBAND=no - -if eval "test x$enable_infiniband = xyes"; then - AC_DEFINE(USE_INFINIBAND,1,[Use infiniband]) - HAVE_INFINIBAND=yes - - INFINIBAND_WRAPPER_OBJ="ib/ibwrapper.o ib/ibw_ctdb.o ib/ibw_ctdb_init.o" - INFINIBAND_LIBS="-lrdmacm -libverbs" - INFINIBAND_BINS="bin/ibwrapper_test" - - AC_CHECK_HEADERS(infiniband/verbs.h, [], [ - echo "ERROR: you need infiniband/verbs.h when ib enabled!" - exit -1]) - AC_CHECK_HEADERS(rdma/rdma_cma.h, [], [ - echo "ERROR: you need rdma/rdma_cma.h when ib enabled!" - exit -1]) - AC_CHECK_LIB(ibverbs, ibv_create_qp, [], [ - echo "ERROR: you need libibverbs when ib enabled!" - exit -1]) - AC_CHECK_LIB(rdmacm, rdma_connect, [], [ - echo "ERROR: you need librdmacm when ib enabled!" - exit -1]) -fi - -AC_SUBST(HAVE_INFINIBAND) -AC_SUBST(INFINIBAND_WRAPPER_OBJ) -AC_SUBST(INFINIBAND_LIBS) -AC_SUBST(INFINIBAND_BINS) diff --git a/source4/cluster/ctdb/ib/ibw_ctdb.c b/source4/cluster/ctdb/ib/ibw_ctdb.c deleted file mode 100644 index 5822ffb186..0000000000 --- a/source4/cluster/ctdb/ib/ibw_ctdb.c +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Unix SMB/CIFS implementation. - * Join infiniband wrapper and ctdb. - * - * Copyright (C) Sven Oehme <oehmes@de.ibm.com> 2006 - * - * Major code contributions by Peter Somogyi <psomogyi@gamax.hu> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, see <http://www.gnu.org/licenses/>. - */ - -#include "includes.h" -#include "lib/events/events.h" -#include <system/network.h> -#include <assert.h> -#include "ctdb_private.h" -#include "ibwrapper.h" -#include "ibw_ctdb.h" - -int ctdb_ibw_get_address(struct ctdb_context *ctdb, - const char *address, struct in_addr *addr) -{ - if (inet_pton(AF_INET, address, addr) <= 0) { - struct hostent *he = gethostbyname(address); - if (he == NULL || he->h_length > sizeof(*addr)) { - ctdb_set_error(ctdb, "invalid nework address '%s'\n", - address); - return -1; - } - memcpy(addr, he->h_addr, he->h_length); - } - return 0; -} - -int ctdb_ibw_node_connect(struct ctdb_node *node) -{ - struct ctdb_ibw_node *cn = talloc_get_type(node->private_data, struct ctdb_ibw_node); - int rc; - - assert(cn!=NULL); - assert(cn->conn!=NULL); - struct sockaddr_in sock_out; - - memset(&sock_out, 0, sizeof(struct sockaddr_in)); - sock_out.sin_port = htons(node->address.port); - sock_out.sin_family = PF_INET; - if (ctdb_ibw_get_address(node->ctdb, node->address.address, &sock_out.sin_addr)) { - DEBUG(0, ("ctdb_ibw_node_connect failed\n")); - return -1; - } - - rc = ibw_connect(cn->conn, &sock_out, node); - if (rc) { - DEBUG(0, ("ctdb_ibw_node_connect/ibw_connect failed - retrying...\n")); - /* try again once a second */ - event_add_timed(node->ctdb->ev, node, timeval_current_ofs(1, 0), - ctdb_ibw_node_connect_event, node); - } - - /* continues at ibw_ctdb.c/IBWC_CONNECTED in good case */ - return 0; -} - -void ctdb_ibw_node_connect_event(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private_data) -{ - struct ctdb_node *node = talloc_get_type(private_data, struct ctdb_node); - - ctdb_ibw_node_connect(node); -} - -int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn) -{ - if (ctx!=NULL) { - /* ctx->state changed */ - switch(ctx->state) { - case IBWS_INIT: /* ctx start - after ibw_init */ - break; - case IBWS_READY: /* after ibw_bind & ibw_listen */ - break; - case IBWS_CONNECT_REQUEST: /* after [IBWS_READY + incoming request] */ - /* => [(ibw_accept)IBWS_READY | (ibw_disconnect)STOPPED | ERROR] */ - if (ibw_accept(ctx, conn, NULL)) { - DEBUG(0, ("connstate_handler/ibw_accept failed\n")); - return -1; - } /* else continue in IBWC_CONNECTED */ - break; - case IBWS_STOPPED: /* normal stop <= ibw_disconnect+(IBWS_READY | IBWS_CONNECT_REQUEST) */ - /* TODO: have a CTDB upcall for which CTDB should wait in a (final) loop */ - break; - case IBWS_ERROR: /* abnormal state; ibw_stop must be called after this */ - break; - default: - assert(0); - break; - } - } - - if (conn!=NULL) { - /* conn->state changed */ - switch(conn->state) { - case IBWC_INIT: /* conn start - internal state */ - break; - case IBWC_CONNECTED: { /* after ibw_accept or ibw_connect */ - struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node); - if (node!=NULL) { /* after ibw_connect */ - struct ctdb_ibw_node *cn = talloc_get_type(node->private_data, struct ctdb_ibw_node); - - node->ctdb->upcalls->node_connected(node); - ctdb_flush_cn_queue(cn); - } else { /* after ibw_accept */ - /* NOP in CTDB case */ - } - } break; - case IBWC_DISCONNECTED: { /* after ibw_disconnect */ - struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node); - if (node!=NULL) - node->ctdb->upcalls->node_dead(node); - talloc_free(conn); - /* normal + intended disconnect => not reconnecting in this layer */ - } break; - case IBWC_ERROR: { - struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node); - if (node!=NULL) { - struct ctdb_ibw_node *cn = talloc_get_type(node->private_data, struct ctdb_ibw_node); - struct ibw_ctx *ictx = cn->conn->ctx; - - DEBUG(10, ("IBWC_ERROR, reconnecting...\n")); - talloc_free(cn->conn); /* internal queue content is destroyed */ - cn->conn = (void *)ibw_conn_new(ictx, node); - event_add_timed(node->ctdb->ev, node, timeval_current_ofs(1, 0), - ctdb_ibw_node_connect_event, node); - } - } break; - default: - assert(0); - break; - } - } - - return 0; -} - -int ctdb_ibw_receive_handler(struct ibw_conn *conn, void *buf, int n) -{ - struct ctdb_context *ctdb = talloc_get_type(conn->ctx->ctx_userdata, struct ctdb_context); - void *buf2; /* future TODO: a solution for removal of this */ - - assert(ctdb!=NULL); - assert(buf!=NULL); - assert(conn!=NULL); - assert(conn->state==IBWC_CONNECTED); - - /* so far "buf" is an ib-registered memory area - * and being reused for next receive - * noticed that HL requires talloc-ed memory to be stolen */ - buf2 = talloc_zero_size(conn, n); - memcpy(buf2, buf, n); - - ctdb->upcalls->recv_pkt(ctdb, (uint8_t *)buf2, (uint32_t)n); - - return 0; -} diff --git a/source4/cluster/ctdb/ib/ibw_ctdb.h b/source4/cluster/ctdb/ib/ibw_ctdb.h deleted file mode 100644 index 98ea102eac..0000000000 --- a/source4/cluster/ctdb/ib/ibw_ctdb.h +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Unix SMB/CIFS implementation. - * Join infiniband wrapper and ctdb. - * - * Copyright (C) Sven Oehme <oehmes@de.ibm.com> 2006 - * - * Major code contributions by Peter Somogyi <psomogyi@gamax.hu> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, see <http://www.gnu.org/licenses/>. - */ - -struct ctdb_ibw_msg { - uint8_t *data; - uint32_t length; - struct ctdb_ibw_msg *prev; - struct ctdb_ibw_msg *next; -}; - -struct ctdb_ibw_node { - struct ibw_conn *conn; - - struct ctdb_ibw_msg *queue; - struct ctdb_ibw_msg *queue_last; - int qcnt; -}; - -int ctdb_ibw_get_address(struct ctdb_context *ctdb, - const char *address, struct in_addr *addr); - -int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn); -int ctdb_ibw_receive_handler(struct ibw_conn *conn, void *buf, int n); - -int ctdb_ibw_node_connect(struct ctdb_node *node); -void ctdb_ibw_node_connect_event(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private_data); - -int ctdb_flush_cn_queue(struct ctdb_ibw_node *cn); - -int ctdb_ibw_init(struct ctdb_context *ctdb); diff --git a/source4/cluster/ctdb/ib/ibw_ctdb_init.c b/source4/cluster/ctdb/ib/ibw_ctdb_init.c deleted file mode 100644 index 8dbb9c241c..0000000000 --- a/source4/cluster/ctdb/ib/ibw_ctdb_init.c +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Unix SMB/CIFS implementation. - * Join infiniband wrapper and ctdb. - * - * Copyright (C) Sven Oehme <oehmes@de.ibm.com> 2006 - * - * Major code contributions by Peter Somogyi <psomogyi@gamax.hu> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, see <http://www.gnu.org/licenses/>. - */ - -#include "includes.h" -#include "lib/events/events.h" -#include <system/network.h> -#include <assert.h> -#include "ctdb_private.h" -#include "ibwrapper.h" -#include "ibw_ctdb.h" - -static int ctdb_ibw_listen(struct ctdb_context *ctdb, int backlog) -{ - struct ibw_ctx *ictx = talloc_get_type(ctdb->private_data, struct ibw_ctx); - struct sockaddr_in my_addr; - - assert(ictx!=NULL); - memset(&my_addr, 0, sizeof(struct sockaddr_in)); - my_addr.sin_port = htons(ctdb->address.port); - my_addr.sin_family = PF_INET; - if (ctdb_ibw_get_address(ctdb, ctdb->address.address, &my_addr.sin_addr)) - return -1; - - if (ibw_bind(ictx, &my_addr)) { - DEBUG(0, ("ctdb_ibw_listen: ibw_bind failed\n")); - return -1; - } - - if (ibw_listen(ictx, backlog)) { - DEBUG(0, ("ctdb_ibw_listen: ibw_listen failed\n")); - return -1; - } - - return 0; -} - -/* - * initialise ibw portion of a ctdb node - */ -static int ctdb_ibw_add_node(struct ctdb_node *node) -{ - struct ibw_ctx *ictx = talloc_get_type(node->ctdb->private_data, struct ibw_ctx); - struct ctdb_ibw_node *cn = talloc_zero(node, struct ctdb_ibw_node); - - assert(cn!=NULL); - cn->conn = ibw_conn_new(ictx, node); - node->private_data = (void *)cn; - - return (cn->conn!=NULL ? 0 : -1); -} - -/* - * initialise infiniband - */ -static int ctdb_ibw_initialise(struct ctdb_context *ctdb) -{ - int i, ret; - - ret = ctdb_ibw_init(ctdb); - if (ret != 0) { - return ret; - } - - for (i=0; i<ctdb->num_nodes; i++) { - if (ctdb_ibw_add_node(ctdb->nodes[i]) != 0) { - DEBUG(0, ("methods->add_node failed at %d\n", i)); - return -1; - } - } - - /* listen on our own address */ - if (ctdb_ibw_listen(ctdb, 10)) /* TODO: backlog as param */ - return -1; - - return 0; -} - - -/* - * Start infiniband - */ -static int ctdb_ibw_start(struct ctdb_context *ctdb) -{ - int i, ret; - - /* everything async here */ - for (i=0;i<ctdb->num_nodes;i++) { - struct ctdb_node *node = ctdb->nodes[i]; - if (!ctdb_same_address(&ctdb->address, &node->address)) { - ctdb_ibw_node_connect(node); - } - } - - return 0; -} - -static int ctdb_ibw_send_pkt(struct ibw_conn *conn, uint8_t *data, uint32_t length) -{ - void *buf, *key; - - if (ibw_alloc_send_buf(conn, &buf, &key, length)) { - DEBUG(0, ("queue_pkt/ibw_alloc_send_buf failed\n")); - return -1; - } - - memcpy(buf, data, length); - return ibw_send(conn, buf, key, length); -} - -int ctdb_flush_cn_queue(struct ctdb_ibw_node *cn) -{ - struct ctdb_ibw_msg *p; - int rc = 0; - - while(cn->queue) { - p = cn->queue; - rc = ctdb_ibw_send_pkt(cn->conn, p->data, p->length); - if (rc) - return -1; /* will be retried later when conn is up */ - - DLIST_REMOVE(cn->queue, p); - cn->qcnt--; - talloc_free(p); /* it will talloc_free p->data as well */ - } - assert(cn->qcnt==0); - /* cn->queue_last = NULL is not needed - see DLIST_ADD_AFTER */ - - return rc; -} - -static int ctdb_ibw_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length) -{ - struct ctdb_ibw_node *cn = talloc_get_type(node->private_data, struct ctdb_ibw_node); - int rc; - - assert(length>=sizeof(uint32_t)); - assert(cn!=NULL); - - if (cn->conn==NULL) { - DEBUG(0, ("ctdb_ibw_queue_pkt: conn is NULL\n")); - return -1; - } - - if (cn->conn->state==IBWC_CONNECTED) { - rc = ctdb_ibw_send_pkt(cn->conn, data, length); - } else { - struct ctdb_ibw_msg *p = talloc_zero(cn, struct ctdb_ibw_msg); - p->data = talloc_memdup(p, data, length); - p->length = length; - - DLIST_ADD_AFTER(cn->queue, p, cn->queue_last); - cn->queue_last = p; - cn->qcnt++; - - rc = 0; - } - - return rc; -} - -/* - * transport packet allocator - allows transport to control memory for packets - */ -static void *ctdb_ibw_allocate_pkt(TALLOC_CTX *mem_ctx, size_t size) -{ - /* TODO: use ibw_alloc_send_buf instead... */ - return talloc_size(mem_ctx, size); -} - -#ifdef __NOTDEF__ - -static int ctdb_ibw_stop(struct ctdb_context *cctx) -{ - struct ibw_ctx *ictx = talloc_get_type(cctx->private_data, struct ibw_ctx); - - assert(ictx!=NULL); - return ibw_stop(ictx); -} - -#endif /* __NOTDEF__ */ - -static const struct ctdb_methods ctdb_ibw_methods = { - .initialise= ctdb_ibw_initialise, - .start = ctdb_ibw_start, - .queue_pkt = ctdb_ibw_queue_pkt, - .add_node = ctdb_ibw_add_node, - .allocate_pkt = ctdb_ibw_allocate_pkt, - -// .stop = ctdb_ibw_stop -}; - -/* - * initialise ibw portion of ctdb - */ -int ctdb_ibw_init(struct ctdb_context *ctdb) -{ - struct ibw_ctx *ictx; - - DEBUG(10, ("ctdb_ibw_init invoked...\n")); - ictx = ibw_init( - NULL, //struct ibw_initattr *attr, /* TODO */ - 0, //int nattr, /* TODO */ - ctdb, - ctdb_ibw_connstate_handler, - ctdb_ibw_receive_handler, - ctdb->ev); - - if (ictx==NULL) { - DEBUG(0, ("ctdb_ibw_init: ibw_init failed\n")); - return -1; - } - - ctdb->methods = &ctdb_ibw_methods; - ctdb->private_data = ictx; - - DEBUG(10, ("ctdb_ibw_init succeeded.\n")); - return 0; -} diff --git a/source4/cluster/ctdb/ib/ibwrapper.c b/source4/cluster/ctdb/ib/ibwrapper.c deleted file mode 100644 index 31acbc4a2d..0000000000 --- a/source4/cluster/ctdb/ib/ibwrapper.c +++ /dev/null @@ -1,1361 +0,0 @@ -/* - * Unix SMB/CIFS implementation. - * Wrap Infiniband calls. - * - * Copyright (C) Sven Oehme <oehmes@de.ibm.com> 2006 - * - * Major code contributions by Peter Somogyi <psomogyi@gamax.hu> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, see <http://www.gnu.org/licenses/>. - */ - -#include <stdlib.h> -#include <string.h> -#include <stdio.h> -#include <errno.h> -#include <sys/types.h> -#include <netinet/in.h> -#include <sys/socket.h> -#include <netdb.h> -#include <arpa/inet.h> -#include <malloc.h> -#include <assert.h> -#include <unistd.h> - -#include "includes.h" -#include "lib/events/events.h" -#include "ibwrapper.h" - -#include <infiniband/kern-abi.h> -#include <rdma/rdma_cma_abi.h> -#include <rdma/rdma_cma.h> - -#include "ibwrapper_internal.h" -#include "lib/util/dlinklist.h" - -#define IBW_LASTERR_BUFSIZE 512 -static char ibw_lasterr[IBW_LASTERR_BUFSIZE]; - -#define IBW_MAX_SEND_WR 256 -#define IBW_MAX_RECV_WR 1024 -#define IBW_RECV_BUFSIZE 256 -#define IBW_RECV_THRESHOLD (1 * 1024 * 1024) - -static void ibw_event_handler_verbs(struct event_context *ev, - struct fd_event *fde, uint16_t flags, void *private_data); -static int ibw_fill_cq(struct ibw_conn *conn); -static int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc); -static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc); -static int ibw_send_packet(struct ibw_conn *conn, void *buf, struct ibw_wr *p, uint32_t len); - -static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn, - uint32_t n, struct ibv_mr **ppmr) -{ - void *buf; - - DEBUG(10, ("ibw_alloc_mr(cmid=%p, n=%u)\n", pconn->cm_id, n)); - buf = memalign(pctx->pagesize, n); - if (!buf) { - sprintf(ibw_lasterr, "couldn't allocate memory\n"); - return NULL; - } - - *ppmr = ibv_reg_mr(pconn->pd, buf, n, IBV_ACCESS_LOCAL_WRITE); - if (!*ppmr) { - sprintf(ibw_lasterr, "couldn't allocate mr\n"); - free(buf); - return NULL; - } - - return buf; -} - -static void ibw_free_mr(char **ppbuf, struct ibv_mr **ppmr) -{ - DEBUG(10, ("ibw_free_mr(%p %p)\n", *ppbuf, *ppmr)); - if (*ppmr!=NULL) { - ibv_dereg_mr(*ppmr); - *ppmr = NULL; - } - if (*ppbuf) { - free(*ppbuf); - *ppbuf = NULL; - } -} - -static int ibw_init_memory(struct ibw_conn *conn) -{ - struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); - struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - struct ibw_opts *opts = &pctx->opts; - int i; - struct ibw_wr *p; - - DEBUG(10, ("ibw_init_memory(cmid: %p)\n", pconn->cm_id)); - pconn->buf_send = ibw_alloc_mr(pctx, pconn, - opts->max_send_wr * opts->recv_bufsize, &pconn->mr_send); - if (!pconn->buf_send) { - sprintf(ibw_lasterr, "couldn't allocate work send buf\n"); - return -1; - } - - pconn->buf_recv = ibw_alloc_mr(pctx, pconn, - opts->max_recv_wr * opts->recv_bufsize, &pconn->mr_recv); - if (!pconn->buf_recv) { - sprintf(ibw_lasterr, "couldn't allocate work recv buf\n"); - return -1; - } - - pconn->wr_index = talloc_size(pconn, opts->max_send_wr * sizeof(struct ibw_wr *)); - assert(pconn->wr_index!=NULL); - - for(i=0; i<opts->max_send_wr; i++) { - p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr); - p->buf = pconn->buf_send + (i * opts->recv_bufsize); - p->wr_id = i; - - DLIST_ADD(pconn->wr_list_avail, p); - } - - return 0; -} - -static int ibw_ctx_priv_destruct(struct ibw_ctx_priv *pctx) -{ - DEBUG(10, ("ibw_ctx_priv_destruct(%p)\n", pctx)); - - /* destroy cm */ - if (pctx->cm_channel) { - rdma_destroy_event_channel(pctx->cm_channel); - pctx->cm_channel = NULL; - } - if (pctx->cm_channel_event) { - /* TODO: do we have to do this here? */ - talloc_free(pctx->cm_channel_event); - pctx->cm_channel_event = NULL; - } - if (pctx->cm_id) { - rdma_destroy_id(pctx->cm_id); - pctx->cm_id = NULL; - } - - return 0; -} - -static int ibw_ctx_destruct(struct ibw_ctx *ctx) -{ - DEBUG(10, ("ibw_ctx_destruct(%p)\n", ctx)); - return 0; -} - -static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn) -{ - DEBUG(10, ("ibw_conn_priv_destruct(%p, cmid: %p)\n", - pconn, pconn->cm_id)); - - /* pconn->wr_index is freed by talloc */ - /* pconn->wr_index[i] are freed by talloc */ - - /* destroy verbs */ - if (pconn->cm_id!=NULL && pconn->cm_id->qp!=NULL) { - rdma_destroy_qp(pconn->cm_id); - pconn->cm_id->qp = NULL; - } - - if (pconn->cq!=NULL) { - ibv_destroy_cq(pconn->cq); - pconn->cq = NULL; - } - - if (pconn->verbs_channel!=NULL) { - ibv_destroy_comp_channel(pconn->verbs_channel); - pconn->verbs_channel = NULL; - } - - /* must be freed here because its order is important */ - if (pconn->verbs_channel_event) { - talloc_free(pconn->verbs_channel_event); - pconn->verbs_channel_event = NULL; - } - - /* free memory regions */ - ibw_free_mr(&pconn->buf_send, &pconn->mr_send); - ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv); - - if (pconn->pd) { - ibv_dealloc_pd(pconn->pd); - pconn->pd = NULL; - DEBUG(10, ("pconn=%p pd deallocated\n", pconn)); - } - - if (pconn->cm_id) { - rdma_destroy_id(pconn->cm_id); - pconn->cm_id = NULL; - DEBUG(10, ("pconn=%p cm_id destroyed\n", pconn)); - } - - return 0; -} - -static int ibw_wr_destruct(struct ibw_wr *wr) -{ - if (wr->buf_large!=NULL) - ibw_free_mr(&wr->buf_large, &wr->mr_large); - return 0; -} - -static int ibw_conn_destruct(struct ibw_conn *conn) -{ - DEBUG(10, ("ibw_conn_destruct(%p)\n", conn)); - - /* important here: ctx is a talloc _parent_ */ - DLIST_REMOVE(conn->ctx->conn_list, conn); - return 0; -} - -struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx, TALLOC_CTX *mem_ctx) -{ - struct ibw_conn *conn; - struct ibw_conn_priv *pconn; - - assert(ctx!=NULL); - - conn = talloc_zero(mem_ctx, struct ibw_conn); - assert(conn!=NULL); - talloc_set_destructor(conn, ibw_conn_destruct); - - pconn = talloc_zero(conn, struct ibw_conn_priv); - assert(pconn!=NULL); - talloc_set_destructor(pconn, ibw_conn_priv_destruct); - - conn->ctx = ctx; - conn->internal = (void *)pconn; - - DLIST_ADD(ctx->conn_list, conn); - - return conn; -} - -static int ibw_setup_cq_qp(struct ibw_conn *conn) -{ - struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); - struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - struct ibv_qp_init_attr init_attr; - struct ibv_qp_attr attr; - int rc; - - DEBUG(10, ("ibw_setup_cq_qp(cmid: %p)\n", pconn->cm_id)); - - /* init verbs */ - pconn->verbs_channel = ibv_create_comp_channel(pconn->cm_id->verbs); - if (!pconn->verbs_channel) { - sprintf(ibw_lasterr, "ibv_create_comp_channel failed %d\n", errno); - return -1; - } - DEBUG(10, ("created channel %p\n", pconn->verbs_channel)); - - pconn->verbs_channel_event = event_add_fd(pctx->ectx, NULL, /* not pconn or conn */ - pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn); - - pconn->pd = ibv_alloc_pd(pconn->cm_id->verbs); - if (!pconn->pd) { - sprintf(ibw_lasterr, "ibv_alloc_pd failed %d\n", errno); - return -1; - } - DEBUG(10, ("created pd %p\n", pconn->pd)); - - /* init mr */ - if (ibw_init_memory(conn)) - return -1; - - /* init cq */ - pconn->cq = ibv_create_cq(pconn->cm_id->verbs, - pctx->opts.max_recv_wr + pctx->opts.max_send_wr, - conn, pconn->verbs_channel, 0); - if (pconn->cq==NULL) { - sprintf(ibw_lasterr, "ibv_create_cq failed\n"); - return -1; - } - - rc = ibv_req_notify_cq(pconn->cq, 0); - if (rc) { - sprintf(ibw_lasterr, "ibv_req_notify_cq failed with %d\n", rc); - return rc; - } - - /* init qp */ - memset(&init_attr, 0, sizeof(init_attr)); - init_attr.cap.max_send_wr = pctx->opts.max_send_wr; - init_attr.cap.max_recv_wr = pctx->opts.max_recv_wr; - init_attr.cap.max_recv_sge = 1; - init_attr.cap.max_send_sge = 1; - init_attr.qp_type = IBV_QPT_RC; - init_attr.send_cq = pconn->cq; - init_attr.recv_cq = pconn->cq; - - rc = rdma_create_qp(pconn->cm_id, pconn->pd, &init_attr); - if (rc) { - sprintf(ibw_lasterr, "rdma_create_qp failed with %d\n", rc); - return rc; - } - /* elase result is in pconn->cm_id->qp */ - - rc = ibv_query_qp(pconn->cm_id->qp, &attr, IBV_QP_PATH_MTU, &init_attr); - if (rc) { - sprintf(ibw_lasterr, "ibv_query_qp failed with %d\n", rc); - return rc; - } - - return ibw_fill_cq(conn); -} - -static int ibw_refill_cq_recv(struct ibw_conn *conn) -{ - struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); - struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - int rc; - struct ibv_sge list = { - .addr = (uintptr_t) NULL, /* filled below */ - .length = pctx->opts.recv_bufsize, - .lkey = pconn->mr_recv->lkey /* always the same */ - }; - struct ibv_recv_wr wr = { - .wr_id = 0, /* filled below */ - .sg_list = &list, - .num_sge = 1, - }; - struct ibv_recv_wr *bad_wr; - - DEBUG(10, ("ibw_refill_cq_recv(cmid: %p)\n", pconn->cm_id)); - - list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index; - wr.wr_id = pconn->recv_index; - pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr; - - rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr); - if (rc) { - sprintf(ibw_lasterr, "refill/ibv_post_recv failed with %d\n", rc); - DEBUG(0, (ibw_lasterr)); - return -2; - } - - return 0; -} - -static int ibw_fill_cq(struct ibw_conn *conn) -{ - struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); - struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - int i, rc; - struct ibv_sge list = { - .addr = (uintptr_t) NULL, /* filled below */ - .length = pctx->opts.recv_bufsize, - .lkey = pconn->mr_recv->lkey /* always the same */ - }; - struct ibv_recv_wr wr = { - .wr_id = 0, /* filled below */ - .sg_list = &list, - .num_sge = 1, - }; - struct ibv_recv_wr *bad_wr; - - DEBUG(10, ("ibw_fill_cq(cmid: %p)\n", pconn->cm_id)); - - for(i = pctx->opts.max_recv_wr; i!=0; i--) { - list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index; - wr.wr_id = pconn->recv_index; - pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr; - - rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr); - if (rc) { - sprintf(ibw_lasterr, "fill/ibv_post_recv failed with %d\n", rc); - DEBUG(0, (ibw_lasterr)); - return -2; - } - } - - return 0; -} - -static int ibw_manage_connect(struct ibw_conn *conn) -{ - struct rdma_conn_param conn_param; - struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - int rc; - - DEBUG(10, ("ibw_manage_connect(cmid: %p)\n", pconn->cm_id)); - - if (ibw_setup_cq_qp(conn)) - return -1; - - /* cm connect */ - memset(&conn_param, 0, sizeof conn_param); - conn_param.responder_resources = 1; - conn_param.initiator_depth = 1; - conn_param.retry_count = 10; - - rc = rdma_connect(pconn->cm_id, &conn_param); - if (rc) - sprintf(ibw_lasterr, "rdma_connect error %d\n", rc); - - return rc; -} - -static void ibw_event_handler_cm(struct event_context *ev, - struct fd_event *fde, uint16_t flags, void *private_data) -{ - int rc; - struct ibw_ctx *ctx = talloc_get_type(private_data, struct ibw_ctx); - struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv); - struct ibw_conn *conn = NULL; - struct ibw_conn_priv *pconn = NULL; - struct rdma_cm_id *cma_id = NULL; - struct rdma_cm_event *event = NULL; - - assert(ctx!=NULL); - - rc = rdma_get_cm_event(pctx->cm_channel, &event); - if (rc) { - ctx->state = IBWS_ERROR; - event = NULL; - sprintf(ibw_lasterr, "rdma_get_cm_event error %d\n", rc); - goto error; - } - cma_id = event->id; - - DEBUG(10, ("cma_event type %d cma_id %p (%s)\n", event->event, cma_id, - (cma_id == pctx->cm_id) ? "parent" : "child")); - - switch (event->event) { - case RDMA_CM_EVENT_ADDR_RESOLVED: - DEBUG(11, ("RDMA_CM_EVENT_ADDR_RESOLVED\n")); - /* continuing from ibw_connect ... */ - rc = rdma_resolve_route(cma_id, 2000); - if (rc) { - sprintf(ibw_lasterr, "rdma_resolve_route error %d\n", rc); - goto error; - } - /* continued at RDMA_CM_EVENT_ROUTE_RESOLVED */ - break; - - case RDMA_CM_EVENT_ROUTE_RESOLVED: - DEBUG(11, ("RDMA_CM_EVENT_ROUTE_RESOLVED\n")); - /* after RDMA_CM_EVENT_ADDR_RESOLVED: */ - assert(cma_id->context!=NULL); - conn = talloc_get_type(cma_id->context, struct ibw_conn); - - rc = ibw_manage_connect(conn); - if (rc) - goto error; - - break; - - case RDMA_CM_EVENT_CONNECT_REQUEST: - DEBUG(11, ("RDMA_CM_EVENT_CONNECT_REQUEST\n")); - ctx->state = IBWS_CONNECT_REQUEST; - conn = ibw_conn_new(ctx, ctx); - pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - pconn->cm_id = cma_id; /* !!! event will be freed but id not */ - cma_id->context = (void *)conn; - DEBUG(10, ("pconn->cm_id %p\n", pconn->cm_id)); - - if (ibw_setup_cq_qp(conn)) - goto error; - - conn->state = IBWC_INIT; - pctx->connstate_func(ctx, conn); - - /* continued at ibw_accept when invoked by the func above */ - if (!pconn->is_accepted) { - rc = rdma_reject(cma_id, NULL, 0); - if (rc) - DEBUG(0, ("rdma_reject failed with rc=%d\n", rc)); - talloc_free(conn); - DEBUG(10, ("pconn->cm_id %p wasn't accepted\n", pconn->cm_id)); - } - - /* TODO: clarify whether if it's needed by upper layer: */ - ctx->state = IBWS_READY; - pctx->connstate_func(ctx, NULL); - - /* NOTE: more requests can arrive until RDMA_CM_EVENT_ESTABLISHED ! */ - break; - - case RDMA_CM_EVENT_ESTABLISHED: - /* expected after ibw_accept and ibw_connect[not directly] */ - DEBUG(1, ("ESTABLISHED (conn: %p)\n", cma_id->context)); - conn = talloc_get_type(cma_id->context, struct ibw_conn); - assert(conn!=NULL); /* important assumption */ - - DEBUG(10, ("ibw_setup_cq_qp succeeded (cmid=%p)\n", cma_id)); - - /* client conn is up */ - conn->state = IBWC_CONNECTED; - - /* both ctx and conn have changed */ - pctx->connstate_func(ctx, conn); - break; - - case RDMA_CM_EVENT_ADDR_ERROR: - sprintf(ibw_lasterr, "RDMA_CM_EVENT_ADDR_ERROR, error %d\n", event->status); - case RDMA_CM_EVENT_ROUTE_ERROR: - sprintf(ibw_lasterr, "RDMA_CM_EVENT_ROUTE_ERROR, error %d\n", event->status); - case RDMA_CM_EVENT_CONNECT_ERROR: - sprintf(ibw_lasterr, "RDMA_CM_EVENT_CONNECT_ERROR, error %d\n", event->status); - case RDMA_CM_EVENT_UNREACHABLE: - sprintf(ibw_lasterr, "RDMA_CM_EVENT_UNREACHABLE, error %d\n", event->status); - goto error; - case RDMA_CM_EVENT_REJECTED: - sprintf(ibw_lasterr, "RDMA_CM_EVENT_REJECTED, error %d\n", event->status); - DEBUG(1, ("cm event handler: %s", ibw_lasterr)); - conn = talloc_get_type(cma_id->context, struct ibw_conn); - if (conn) { - /* must be done BEFORE connstate */ - if ((rc=rdma_ack_cm_event(event))) - DEBUG(0, ("reject/rdma_ack_cm_event failed with %d\n", rc)); - event = NULL; /* not to touch cma_id or conn */ - conn->state = IBWC_ERROR; - /* it should free the conn */ - pctx->connstate_func(NULL, conn); - } - break; /* this is not strictly an error */ - - case RDMA_CM_EVENT_DISCONNECTED: - DEBUG(11, ("RDMA_CM_EVENT_DISCONNECTED\n")); - if ((rc=rdma_ack_cm_event(event))) - DEBUG(0, ("disc/rdma_ack_cm_event failed with %d\n", rc)); - event = NULL; /* don't ack more */ - - if (cma_id!=pctx->cm_id) { - DEBUG(0, ("client DISCONNECT event cm_id=%p\n", cma_id)); - conn = talloc_get_type(cma_id->context, struct ibw_conn); - conn->state = IBWC_DISCONNECTED; - pctx->connstate_func(NULL, conn); - } - break; - - case RDMA_CM_EVENT_DEVICE_REMOVAL: - sprintf(ibw_lasterr, "cma detected device removal!\n"); - goto error; - - default: - sprintf(ibw_lasterr, "unknown event %d\n", event->event); - goto error; - } - - if (event!=NULL && (rc=rdma_ack_cm_event(event))) { - sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc); - goto error; - } - - return; -error: - DEBUG(0, ("cm event handler: %s", ibw_lasterr)); - - if (event!=NULL) { - if (cma_id!=NULL && cma_id!=pctx->cm_id) { - conn = talloc_get_type(cma_id->context, struct ibw_conn); - if (conn) { - conn->state = IBWC_ERROR; - pctx->connstate_func(NULL, conn); - } - } else { - ctx->state = IBWS_ERROR; - pctx->connstate_func(ctx, NULL); - } - - if ((rc=rdma_ack_cm_event(event))!=0) { - DEBUG(0, ("rdma_ack_cm_event failed with %d\n", rc)); - } - } - - return; -} - -static void ibw_event_handler_verbs(struct event_context *ev, - struct fd_event *fde, uint16_t flags, void *private_data) -{ - struct ibw_conn *conn = talloc_get_type(private_data, struct ibw_conn); - struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); - - struct ibv_wc wc; - int rc; - struct ibv_cq *ev_cq; - void *ev_ctx; - - DEBUG(10, ("ibw_event_handler_verbs(%u)\n", (uint32_t)flags)); - - /* TODO: check whether if it's good to have more channels here... */ - rc = ibv_get_cq_event(pconn->verbs_channel, &ev_cq, &ev_ctx); - if (rc) { - sprintf(ibw_lasterr, "Failed to get cq_event with %d\n", rc); - goto error; - } - if (ev_cq != pconn->cq) { - sprintf(ibw_lasterr, "ev_cq(%p) != pconn->cq(%p)\n", ev_cq, pconn->cq); - goto error; - } - rc = ibv_req_notify_cq(pconn->cq, 0); - if (rc) { - sprintf(ibw_lasterr, "Couldn't request CQ notification (%d)\n", rc); - goto error; - } - - while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) { - if (wc.status) { - sprintf(ibw_lasterr, "cq completion failed status=%d, opcode=%d, rc=%d\n", - wc.status, wc.opcode, rc); - goto error; - } - - switch(wc.opcode) { - case IBV_WC_SEND: - DEBUG(10, ("send completion\n")); - if (ibw_wc_send(conn, &wc)) - goto error; - break; - - case IBV_WC_RDMA_WRITE: - DEBUG(10, ("rdma write completion\n")); - break; - - case IBV_WC_RDMA_READ: - DEBUG(10, ("rdma read completion\n")); - break; - - case IBV_WC_RECV: - DEBUG(10, ("recv completion\n")); - if (ibw_wc_recv(conn, &wc)) - goto error; - break; - - default: - sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode); - goto error; - } - } - if (rc!=0) { - sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc); - goto error; - } - - ibv_ack_cq_events(pconn->cq, 1); - - return; -error: - ibv_ack_cq_events(pconn->cq, 1); - - DEBUG(0, (ibw_lasterr)); - - if (conn->state!=IBWC_ERROR) { - conn->state = IBWC_ERROR; - pctx->connstate_func(NULL, conn); - } -} - -static int ibw_process_queue(struct ibw_conn *conn) -{ - struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - struct ibw_ctx_priv *pctx; - struct ibw_wr *p; - int rc; - uint32_t msg_size; - - if (pconn->queue==NULL) - return 0; /* NOP */ - - p = pconn->queue; - - /* we must have at least 1 fragment to send */ - assert(p->queued_ref_cnt>0); - p->queued_ref_cnt--; - - pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); - msg_size = (p->queued_ref_cnt) ? pctx->opts.recv_bufsize : p->queued_rlen; - - assert(p->queued_msg!=NULL); - assert(msg_size!=0); - - DEBUG(10, ("ibw_process_queue refcnt=%d msgsize=%u\n", - p->queued_ref_cnt, msg_size)); - - rc = ibw_send_packet(conn, p->queued_msg, p, msg_size); - - /* was this the last fragment? */ - if (p->queued_ref_cnt) { - p->queued_msg += pctx->opts.recv_bufsize; - } else { - DLIST_REMOVE2(pconn->queue, p, qprev, qnext); - p->queued_msg = NULL; - } - - return rc; -} - -static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc) -{ - struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); - struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - struct ibw_wr *p; - int send_index; - - DEBUG(10, ("ibw_wc_send(cmid: %p, wr_id: %u, bl: %u)\n", - pconn->cm_id, (uint32_t)wc->wr_id, (uint32_t)wc->byte_len)); - - assert(pconn->cm_id->qp->qp_num==wc->qp_num); - assert(wc->wr_id >= pctx->opts.max_recv_wr); - send_index = wc->wr_id - pctx->opts.max_recv_wr; - pconn->wr_sent--; - - if (send_index < pctx->opts.max_send_wr) { - DEBUG(10, ("ibw_wc_send#1 %u\n", (int)wc->wr_id)); - p = pconn->wr_index[send_index]; - if (p->buf_large!=NULL) { - if (p->ref_cnt) { - /* awaiting more of it... */ - p->ref_cnt--; - } else { - ibw_free_mr(&p->buf_large, &p->mr_large); - DLIST_REMOVE(pconn->wr_list_used, p); - DLIST_ADD(pconn->wr_list_avail, p); - } - } else { /* nasty - but necessary */ - DLIST_REMOVE(pconn->wr_list_used, p); - DLIST_ADD(pconn->wr_list_avail, p); - } - } else { /* "extra" request - not optimized */ - DEBUG(10, ("ibw_wc_send#2 %u\n", (int)wc->wr_id)); - for(p=pconn->extra_sent; p!=NULL; p=p->next) - if ((p->wr_id + pctx->opts.max_recv_wr)==(int)wc->wr_id) - break; - if (p==NULL) { - sprintf(ibw_lasterr, "failed to find wr_id %d\n", (int)wc->wr_id); - return -1; - } - if (p->ref_cnt) { - p->ref_cnt--; - } else { - ibw_free_mr(&p->buf_large, &p->mr_large); - DLIST_REMOVE(pconn->extra_sent, p); - DLIST_ADD(pconn->extra_avail, p); - } - } - - return ibw_process_queue(conn); -} - -static int ibw_append_to_part(struct ibw_conn_priv *pconn, - struct ibw_part *part, char **pp, uint32_t add_len, int info) -{ - DEBUG(10, ("ibw_append_to_part: cmid=%p, (bs=%u, len=%u, tr=%u), al=%u, i=%u\n", - pconn->cm_id, part->bufsize, part->len, part->to_read, add_len, info)); - - /* allocate more if necessary - it's an "evergrowing" buffer... */ - if (part->len + add_len > part->bufsize) { - if (part->buf==NULL) { - assert(part->len==0); - part->buf = talloc_size(pconn, add_len); - if (part->buf==NULL) { - sprintf(ibw_lasterr, "recv talloc_size error (%u) #%d\n", - add_len, info); - return -1; - } - part->bufsize = add_len; - } else { - part->buf = talloc_realloc_size(pconn, - part->buf, part->len + add_len); - if (part->buf==NULL) { - sprintf(ibw_lasterr, "recv realloc error (%u + %u) #%d\n", - part->len, add_len, info); - return -1; - } - } - part->bufsize = part->len + add_len; - } - - /* consume pp */ - memcpy(part->buf + part->len, *pp, add_len); - *pp += add_len; - part->len += add_len; - part->to_read -= add_len; - - return 0; -} - -static int ibw_wc_mem_threshold(struct ibw_conn_priv *pconn, - struct ibw_part *part, uint32_t threshold) -{ - DEBUG(10, ("ibw_wc_mem_threshold: cmid=%p, (bs=%u, len=%u, tr=%u), thr=%u\n", - pconn->cm_id, part->bufsize, part->len, part->to_read, threshold)); - - if (part->bufsize > threshold) { - DEBUG(3, ("ibw_wc_mem_threshold: cmid=%p, %u > %u\n", - pconn->cm_id, part->bufsize, threshold)); - talloc_free(part->buf); - part->buf = talloc_size(pconn, threshold); - if (part->buf==NULL) { - sprintf(ibw_lasterr, "talloc_size failed\n"); - return -1; - } - part->bufsize = threshold; - } - return 0; -} - -static int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc) -{ - struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); - struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - struct ibw_part *part = &pconn->part; - char *p; - uint32_t remain = wc->byte_len; - - DEBUG(10, ("ibw_wc_recv: cmid=%p, wr_id: %u, bl: %u\n", - pconn->cm_id, (uint32_t)wc->wr_id, remain)); - - assert(pconn->cm_id->qp->qp_num==wc->qp_num); - assert((int)wc->wr_id < pctx->opts.max_recv_wr); - assert(wc->byte_len <= pctx->opts.recv_bufsize); - - p = pconn->buf_recv + ((int)wc->wr_id * pctx->opts.recv_bufsize); - - while(remain) { - /* here always true: (part->len!=0 && part->to_read!=0) || - (part->len==0 && part->to_read==0) */ - if (part->len) { /* is there a partial msg to be continued? */ - int read_len = (part->to_read<=remain) ? part->to_read : remain; - if (ibw_append_to_part(pconn, part, &p, read_len, 421)) - goto error; - remain -= read_len; - - if (part->len<=sizeof(uint32_t) && part->to_read==0) { - assert(part->len==sizeof(uint32_t)); - /* set it again now... */ - part->to_read = *((uint32_t *)(part->buf)); /* TODO: ntohl */ - if (part->to_read<sizeof(uint32_t)) { - sprintf(ibw_lasterr, "got msglen=%u #2\n", part->to_read); - goto error; - } - part->to_read -= sizeof(uint32_t); /* it's already read */ - } - - if (part->to_read==0) { - pctx->receive_func(conn, part->buf, part->len); - part->len = 0; /* tells not having partial data (any more) */ - if (ibw_wc_mem_threshold(pconn, part, pctx->opts.recv_threshold)) - goto error; - } - } else { - if (remain>=sizeof(uint32_t)) { - uint32_t msglen = *(uint32_t *)p; /* TODO: ntohl */ - if (msglen<sizeof(uint32_t)) { - sprintf(ibw_lasterr, "got msglen=%u\n", msglen); - goto error; - } - - /* mostly awaited case: */ - if (msglen<=remain) { - pctx->receive_func(conn, p, msglen); - p += msglen; - remain -= msglen; - } else { - part->to_read = msglen; - /* part->len is already 0 */ - if (ibw_append_to_part(pconn, part, &p, remain, 422)) - goto error; - remain = 0; /* to be continued ... */ - /* part->to_read > 0 here */ - } - } else { /* edge case: */ - part->to_read = sizeof(uint32_t); - /* part->len is already 0 */ - if (ibw_append_to_part(pconn, part, &p, remain, 423)) - goto error; - remain = 0; - /* part->to_read > 0 here */ - } - } - } /* <remain> is always decreased at least by 1 */ - - if (ibw_refill_cq_recv(conn)) - goto error; - - return 0; - -error: - DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr)); - return -1; -} - -static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct ibw_opts *opts) -{ - int i; - const char *name, *value; - - DEBUG(10, ("ibw_process_init_attrs: nattr: %d\n", nattr)); - - opts->max_send_wr = IBW_MAX_SEND_WR; - opts->max_recv_wr = IBW_MAX_RECV_WR; - opts->recv_bufsize = IBW_RECV_BUFSIZE; - opts->recv_threshold = IBW_RECV_THRESHOLD; - - for(i=0; i<nattr; i++) { - name = attr[i].name; - value = attr[i].value; - - assert(name!=NULL && value!=NULL); - if (strcmp(name, "max_send_wr")==0) - opts->max_send_wr = atoi(value); - else if (strcmp(name, "max_recv_wr")==0) - opts->max_recv_wr = atoi(value); - else if (strcmp(name, "recv_bufsize")==0) - opts->recv_bufsize = atoi(value); - else if (strcmp(name, "recv_threshold")==0) - opts->recv_threshold = atoi(value); - else { - sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name); - return -1; - } - } - return 0; -} - -struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr, - void *ctx_userdata, - ibw_connstate_fn_t ibw_connstate, - ibw_receive_fn_t ibw_receive, - struct event_context *ectx) -{ - struct ibw_ctx *ctx = talloc_zero(NULL, struct ibw_ctx); - struct ibw_ctx_priv *pctx; - int rc; - - DEBUG(10, ("ibw_init(ctx_userdata: %p, ectx: %p)\n", ctx_userdata, ectx)); - - /* initialize basic data structures */ - memset(ibw_lasterr, 0, IBW_LASTERR_BUFSIZE); - - assert(ctx!=NULL); - ibw_lasterr[0] = '\0'; - talloc_set_destructor(ctx, ibw_ctx_destruct); - ctx->ctx_userdata = ctx_userdata; - - pctx = talloc_zero(ctx, struct ibw_ctx_priv); - talloc_set_destructor(pctx, ibw_ctx_priv_destruct); - ctx->internal = (void *)pctx; - assert(pctx!=NULL); - - pctx->connstate_func = ibw_connstate; - pctx->receive_func = ibw_receive; - - pctx->ectx = ectx; - - /* process attributes */ - if (ibw_process_init_attrs(attr, nattr, &pctx->opts)) - goto cleanup; - - /* init cm */ - pctx->cm_channel = rdma_create_event_channel(); - if (!pctx->cm_channel) { - sprintf(ibw_lasterr, "rdma_create_event_channel error %d\n", errno); - goto cleanup; - } - - pctx->cm_channel_event = event_add_fd(pctx->ectx, pctx, - pctx->cm_channel->fd, EVENT_FD_READ, ibw_event_handler_cm, ctx); - -#if RDMA_USER_CM_MAX_ABI_VERSION >= 2 - rc = rdma_create_id(pctx->cm_channel, &pctx->cm_id, ctx, RDMA_PS_TCP); -#else - rc = rdma_create_id(pctx->cm_channel, &pctx->cm_id, ctx); -#endif - if (rc) { - rc = errno; - sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc); - goto cleanup; - } - DEBUG(10, ("created cm_id %p\n", pctx->cm_id)); - - pctx->pagesize = sysconf(_SC_PAGESIZE); - - return ctx; - /* don't put code here */ -cleanup: - DEBUG(0, (ibw_lasterr)); - - if (ctx) - talloc_free(ctx); - - return NULL; -} - -int ibw_stop(struct ibw_ctx *ctx) -{ - struct ibw_ctx_priv *pctx = (struct ibw_ctx_priv *)ctx->internal; - struct ibw_conn *p; - - DEBUG(10, ("ibw_stop\n")); - - for(p=ctx->conn_list; p!=NULL; p=p->next) { - if (ctx->state==IBWC_ERROR || ctx->state==IBWC_CONNECTED) { - if (ibw_disconnect(p)) - return -1; - } - } - - ctx->state = IBWS_STOPPED; - pctx->connstate_func(ctx, NULL); - - return 0; -} - -int ibw_bind(struct ibw_ctx *ctx, struct sockaddr_in *my_addr) -{ - struct ibw_ctx_priv *pctx = (struct ibw_ctx_priv *)ctx->internal; - int rc; - - DEBUG(10, ("ibw_bind: addr=%s, port=%u\n", - inet_ntoa(my_addr->sin_addr), ntohs(my_addr->sin_port))); - rc = rdma_bind_addr(pctx->cm_id, (struct sockaddr *) my_addr); - if (rc) { - sprintf(ibw_lasterr, "rdma_bind_addr error %d\n", rc); - DEBUG(0, (ibw_lasterr)); - return rc; - } - DEBUG(10, ("rdma_bind_addr successful\n")); - - return 0; -} - -int ibw_listen(struct ibw_ctx *ctx, int backlog) -{ - struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv); - int rc; - - DEBUG(10, ("ibw_listen\n")); - rc = rdma_listen(pctx->cm_id, backlog); - if (rc) { - sprintf(ibw_lasterr, "rdma_listen failed: %d\n", rc); - DEBUG(0, (ibw_lasterr)); - return rc; - } - - return 0; -} - -int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata) -{ - struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - struct rdma_conn_param conn_param; - int rc; - - DEBUG(10, ("ibw_accept: cmid=%p\n", pconn->cm_id)); - conn->conn_userdata = conn_userdata; - - memset(&conn_param, 0, sizeof(struct rdma_conn_param)); - conn_param.responder_resources = 1; - conn_param.initiator_depth = 1; - rc = rdma_accept(pconn->cm_id, &conn_param); - if (rc) { - sprintf(ibw_lasterr, "rdma_accept failed %d\n", rc); - DEBUG(0, (ibw_lasterr)); - return -1;; - } - - pconn->is_accepted = 1; - - /* continued at RDMA_CM_EVENT_ESTABLISHED */ - - return 0; -} - -int ibw_connect(struct ibw_conn *conn, struct sockaddr_in *serv_addr, void *conn_userdata) -{ - struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); - struct ibw_conn_priv *pconn = NULL; - int rc; - - assert(conn!=NULL); - - conn->conn_userdata = conn_userdata; - pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - DEBUG(10, ("ibw_connect: addr=%s, port=%u\n", inet_ntoa(serv_addr->sin_addr), - ntohs(serv_addr->sin_port))); - - /* clean previous - probably half - initialization */ - if (ibw_conn_priv_destruct(pconn)) { - DEBUG(0, ("ibw_connect/ibw_pconn_destruct failed for cm_id=%p\n", pconn->cm_id)); - return -1; - } - - /* init cm */ -#if RDMA_USER_CM_MAX_ABI_VERSION >= 2 - rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP); -#else - rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn); -#endif - if (rc) { - rc = errno; - sprintf(ibw_lasterr, "ibw_connect/rdma_create_id error %d\n", rc); - talloc_free(conn); - return -1; - } - DEBUG(10, ("ibw_connect: rdma_create_id succeeded, cm_id=%p\n", pconn->cm_id)); - - rc = rdma_resolve_addr(pconn->cm_id, NULL, (struct sockaddr *) serv_addr, 2000); - if (rc) { - sprintf(ibw_lasterr, "rdma_resolve_addr error %d\n", rc); - DEBUG(0, (ibw_lasterr)); - talloc_free(conn); - return -1; - } - - /* continued at RDMA_CM_EVENT_ADDR_RESOLVED */ - - return 0; -} - -int ibw_disconnect(struct ibw_conn *conn) -{ - int rc; - struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - - DEBUG(10, ("ibw_disconnect: cmid=%p\n", pconn->cm_id)); - - assert(pconn!=NULL); - - switch(conn->state) { - case IBWC_ERROR: - ibw_conn_priv_destruct(pconn); /* do this here right now */ - break; - case IBWC_CONNECTED: - rc = rdma_disconnect(pconn->cm_id); - if (rc) { - sprintf(ibw_lasterr, "ibw_disconnect failed with %d\n", rc); - DEBUG(0, (ibw_lasterr)); - return rc; - } - break; - default: - DEBUG(9, ("invalid state for disconnect: %d\n", conn->state)); - break; - } - - return 0; -} - -int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, uint32_t len) -{ - struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); - struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - struct ibw_wr *p = pconn->wr_list_avail; - - if (p!=NULL) { - DEBUG(10, ("ibw_alloc_send_buf#1: cmid=%p, len=%d\n", pconn->cm_id, len)); - - DLIST_REMOVE(pconn->wr_list_avail, p); - DLIST_ADD(pconn->wr_list_used, p); - - if (len <= pctx->opts.recv_bufsize) { - *buf = (void *)p->buf; - } else { - p->buf_large = ibw_alloc_mr(pctx, pconn, len, &p->mr_large); - if (p->buf_large==NULL) { - sprintf(ibw_lasterr, "ibw_alloc_mr#1 failed\n"); - goto error; - } - *buf = (void *)p->buf_large; - } - /* p->wr_id is already filled in ibw_init_memory */ - } else { - DEBUG(10, ("ibw_alloc_send_buf#2: cmid=%p, len=%d\n", pconn->cm_id, len)); - /* not optimized */ - p = pconn->extra_avail; - if (!p) { - p = pconn->extra_avail = talloc_zero(pconn, struct ibw_wr); - talloc_set_destructor(p, ibw_wr_destruct); - if (p==NULL) { - sprintf(ibw_lasterr, "talloc_zero failed (emax: %u)\n", pconn->extra_max); - goto error; - } - p->wr_id = pctx->opts.max_send_wr + pconn->extra_max; - pconn->extra_max++; - switch(pconn->extra_max) { - case 1: DEBUG(2, ("warning: queue performed\n")); break; - case 10: DEBUG(0, ("warning: queue reached 10\n")); break; - case 100: DEBUG(0, ("warning: queue reached 100\n")); break; - case 1000: DEBUG(0, ("warning: queue reached 1000\n")); break; - default: break; - } - } - - p->buf_large = ibw_alloc_mr(pctx, pconn, len, &p->mr_large); - if (p->buf_large==NULL) { - sprintf(ibw_lasterr, "ibw_alloc_mr#2 failed\n"); - goto error; - } - *buf = (void *)p->buf_large; - - DLIST_REMOVE(pconn->extra_avail, p); - /* we don't have prepared index for this, so that - * we will have to find this by wr_id later on */ - DLIST_ADD(pconn->extra_sent, p); - } - - *key = (void *)p; - - return 0; -error: - DEBUG(0, ("ibw_alloc_send_buf error: %s", ibw_lasterr)); - return -1; -} - - -static int ibw_send_packet(struct ibw_conn *conn, void *buf, struct ibw_wr *p, uint32_t len) -{ - struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); - struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - int rc; - - /* can we send it right now? */ - if (pconn->wr_sent<pctx->opts.max_send_wr) { - struct ibv_send_wr *bad_wr; - struct ibv_sge list = { - .addr = (uintptr_t)buf, - .length = len, - .lkey = pconn->mr_send->lkey - }; - struct ibv_send_wr wr = { - .wr_id = p->wr_id + pctx->opts.max_recv_wr, - .sg_list = &list, - .num_sge = 1, - .opcode = IBV_WR_SEND, - .send_flags = IBV_SEND_SIGNALED, - }; - - if (p->buf_large==NULL) { - DEBUG(10, ("ibw_send#normal(cmid: %p, wrid: %u, n: %d)\n", - pconn->cm_id, (uint32_t)wr.wr_id, len)); - } else { - DEBUG(10, ("ibw_send#large(cmid: %p, wrid: %u, n: %d)\n", - pconn->cm_id, (uint32_t)wr.wr_id, len)); - list.lkey = p->mr_large->lkey; - } - - rc = ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr); - if (rc) { - sprintf(ibw_lasterr, "ibv_post_send error %d (%d)\n", - rc, pconn->wr_sent); - goto error; - } - - pconn->wr_sent++; - - return rc; - } /* else put the request into our own queue: */ - - DEBUG(10, ("ibw_send#queued(cmid: %p, len: %u)\n", pconn->cm_id, len)); - - /* TODO: clarify how to continue when state==IBWC_STOPPED */ - - /* to be sent by ibw_wc_send */ - /* regardless "normal" or [a part of] "large" packet */ - if (!p->queued_ref_cnt) { - DLIST_ADD_END2(pconn->queue, p, struct ibw_wr *, - qprev, qnext); /* TODO: optimize */ - p->queued_msg = buf; - } - p->queued_ref_cnt++; - p->queued_rlen = len; /* last wins; see ibw_wc_send */ - - return 0; -error: - DEBUG(0, (ibw_lasterr)); - return -1; -} - -int ibw_send(struct ibw_conn *conn, void *buf, void *key, uint32_t len) -{ - struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); - struct ibw_wr *p = talloc_get_type(key, struct ibw_wr); - int rc; - - assert(len>=sizeof(uint32_t)); - assert((*((uint32_t *)buf)==len)); /* TODO: htonl */ - - if (len > pctx->opts.recv_bufsize) { - struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - int rlen = len; - char *packet = (char *)buf; - uint32_t recv_bufsize = pctx->opts.recv_bufsize; - - DEBUG(10, ("ibw_send#frag(cmid: %p, buf: %p, len: %u)\n", - pconn->cm_id, buf, len)); - - /* single threaded => no race here: */ - assert(p->ref_cnt==0); - while(rlen > recv_bufsize) { - rc = ibw_send_packet(conn, packet, p, recv_bufsize); - if (rc) - return rc; - packet += recv_bufsize; - rlen -= recv_bufsize; - p->ref_cnt++; /* not good to have it in ibw_send_packet */ - } - if (rlen) { - rc = ibw_send_packet(conn, packet, p, rlen); - p->ref_cnt++; /* not good to have it in ibw_send_packet */ - } - p->ref_cnt--; /* for the same handling */ - } else { - assert(p->ref_cnt==0); - assert(p->queued_ref_cnt==0); - - rc = ibw_send_packet(conn, buf, p, len); - } - return rc; -} - -int ibw_cancel_send_buf(struct ibw_conn *conn, void *buf, void *key) -{ - struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); - struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - struct ibw_wr *p = talloc_get_type(key, struct ibw_wr); - - assert(p!=NULL); - assert(buf!=NULL); - assert(conn!=NULL); - - if (p->buf_large!=NULL) - ibw_free_mr(&p->buf_large, &p->mr_large); - - /* parallel case */ - if (p->wr_id < pctx->opts.max_send_wr) { - DEBUG(10, ("ibw_cancel_send_buf#1 %u", (int)p->wr_id)); - DLIST_REMOVE(pconn->wr_list_used, p); - DLIST_ADD(pconn->wr_list_avail, p); - } else { /* "extra" packet */ - DEBUG(10, ("ibw_cancel_send_buf#2 %u", (int)p->wr_id)); - DLIST_REMOVE(pconn->extra_sent, p); - DLIST_ADD(pconn->extra_avail, p); - } - - return 0; -} - -const char *ibw_getLastError(void) -{ - return ibw_lasterr; -} diff --git a/source4/cluster/ctdb/ib/ibwrapper.h b/source4/cluster/ctdb/ib/ibwrapper.h deleted file mode 100644 index 0b880b3aab..0000000000 --- a/source4/cluster/ctdb/ib/ibwrapper.h +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Unix SMB/CIFS implementation. - * Wrap Infiniband calls. - * - * Copyright (C) Sven Oehme <oehmes@de.ibm.com> 2006 - * - * Major code contributions by Peter Somogyi <psomogyi@gamax.hu> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, see <http://www.gnu.org/licenses/>. - */ - -/* Server communication state */ -enum ibw_state_ctx { - IBWS_INIT = 0, /* ctx start - after ibw_init */ - IBWS_READY, /* after ibw_bind & ibw_listen */ - IBWS_CONNECT_REQUEST, /* after [IBWS_READY + incoming request] */ - /* => [(ibw_accept)IBWS_READY | (ibw_disconnect)STOPPED | ERROR] */ - IBWS_STOPPED, /* normal stop <= ibw_disconnect+(IBWS_READY | IBWS_CONNECT_REQUEST) */ - IBWS_ERROR /* abnormal state; ibw_stop must be called after this */ -}; - -/* Connection state */ -struct ibw_ctx { - void *ctx_userdata; /* see ibw_init */ - - enum ibw_state_ctx state; - void *internal; - - struct ibw_conn *conn_list; /* 1st elem of double linked list */ -}; - -enum ibw_state_conn { - IBWC_INIT = 0, /* conn start - internal state */ - IBWC_CONNECTED, /* after ibw_accept or ibw_connect */ - IBWC_DISCONNECTED, /* after ibw_disconnect */ - IBWC_ERROR -}; - -struct ibw_conn { - struct ibw_ctx *ctx; - enum ibw_state_conn state; - - void *conn_userdata; /* see ibw_connect and ibw_accept */ - void *internal; - - struct ibw_conn *prev, *next; -}; - -/* - * (name, value) pair for array param of ibw_init - */ -struct ibw_initattr { - const char *name; - const char *value; -}; - -/* - * Callback function definition which should inform you about - * connection state change - * This callback is invoked whenever server or client connection changes. - * Both <conn> and <ctx> can be NULL if their state didn't change. - * Return nonzero on error. - */ -typedef int (*ibw_connstate_fn_t)(struct ibw_ctx *ctx, struct ibw_conn *conn); - -/* - * Callback function definition which should process incoming packets - * This callback is invoked whenever any message arrives. - * Return nonzero on error. - * - * Important: you mustn't store buf pointer for later use. - * Process its contents before returning. - */ -typedef int (*ibw_receive_fn_t)(struct ibw_conn *conn, void *buf, int n); - -/* - * settings: array of (name, value) pairs - * where name is one of: - * max_send_wr [default is 256] - * max_recv_wr [default is 1024] - * <...> - * - * Must be called _ONCE_ for each node. - * - * max_msg_size is the maximum size of a message - * (max_send_wr + max_recv_wr) * max_msg_size bytes allocated per connection - * - * returns non-NULL on success - * - * talloc_free must be called for the result in IBWS_STOPPED; - * it will close resources by destructor - * connections(ibw_conn *) must have been closed prior talloc_free - */ -struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr, - void *ctx_userdata, - ibw_connstate_fn_t ibw_connstate, - ibw_receive_fn_t ibw_receive, - struct event_context *ectx); - -/* - * Must be called in states of (IBWS_ERROR, IBWS_READY, IBWS_CONNECT_REQUEST) - * - * It will send out disconnect requests and free up ibw_conn structures. - * The ctx->state will transit to IBWS_STOPPED after every conn are disconnected. - * During that time, you mustn't send/recv/disconnect any more. - * Only after ctx->state=IBWS_STOPPED you can talloc_free the ctx. - */ -int ibw_stop(struct ibw_ctx *ctx); - -/*************** connection initiation - like stream sockets *****/ - -/* - * works like socket bind - * needs a normal internet address here - * - * return 0 on success - */ -int ibw_bind(struct ibw_ctx *ctx, struct sockaddr_in *my_addr); - -/* - * works like socket listen - * non-blocking - * enables accepting incoming connections (after IBWS_READY) - * (it doesn't touch ctx->state by itself) - * - * returns 0 on success - */ -int ibw_listen(struct ibw_ctx *ctx, int backlog); - -/* - * works like socket accept - * initializes a connection to a client - * must be called when state=IBWS_CONNECT_REQUEST - * - * returns 0 on success - * - * You have +1 waiting here: you will get ibw_conn (having the - * same <conn_userdata> member) structure in ibw_connstate_fn_t. - * - * Important: you won't get remote IP address (only internal conn info) - */ -int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata); - -/* - * Create a new connection structure - * available for queueing ibw_send - * - * <parent> is needed to be notified by talloc destruct action. - */ -struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx, TALLOC_CTX *mem_ctx); - -/* - * Needs a normal internet address here - * can be called within IBWS_READY|IBWS_CONNECT_REQUEST - * - * returns non-NULL on success - * - * You have +1 waiting here: you will get ibw_conn (having the - * same <conn_userdata> member) structure in ibw_connstate_fn_t. - */ -int ibw_connect(struct ibw_conn *conn, struct sockaddr_in *serv_addr, void *conn_userdata); - -/* - * Sends out a disconnect request. - * You should process fds after calling this function - * and then process it with ibw_process_event normally - * until you get conn->state = IBWC_DISCONNECTED - * - * You mustn't talloc_free <conn> yet right after this, - * first wait for IBWC_DISCONNECTED. - */ -int ibw_disconnect(struct ibw_conn *conn); - -/************ Infiniband specific event loop wrapping ******************/ - -/* - * You have to use this buf to fill in before send. - * It's just to avoid memcpy.in ibw_send. - * Use the same (buf, key) pair with ibw_send. - * Don't use more space than maxsize (see ibw_init). - * - * Returns 0 on success. - */ -int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, uint32_t len); - -/* - * Send the message in one - * Can be invoked any times (should fit into buffers) and at any time - * (in conn->state=IBWC_CONNECTED) - * n must be less or equal than max_msg_size (see ibw_init) - * - * You mustn't use (buf, key) any more for sending. - */ -int ibw_send(struct ibw_conn *conn, void *buf, void *key, uint32_t len); - -/* - * Call this after ibw_alloc_send_buf - * when you won't call ibw_send for (buf, key) - * You mustn't use (buf, key) any more. - */ -int ibw_cancel_send_buf(struct ibw_conn *conn, void *buf, void *key); - -/* - * Retrieves the last error - * result: always non-zero, mustn't be freed (static) - */ -const char *ibw_getLastError(void); diff --git a/source4/cluster/ctdb/ib/ibwrapper_internal.h b/source4/cluster/ctdb/ib/ibwrapper_internal.h deleted file mode 100644 index 20aef7fd86..0000000000 --- a/source4/cluster/ctdb/ib/ibwrapper_internal.h +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Unix SMB/CIFS implementation. - * Wrap Infiniband calls. - * - * Copyright (C) Sven Oehme <oehmes@de.ibm.com> 2006 - * - * Major code contributions by Peter Somogyi <psomogyi@gamax.hu> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, see <http://www.gnu.org/licenses/>. - */ - -struct ibw_opts { - uint32_t max_send_wr; - uint32_t max_recv_wr; - uint32_t recv_bufsize; - uint32_t recv_threshold; -}; - -struct ibw_wr { - char *buf; /* initialized in ibw_init_memory once per connection */ - int wr_id; /* position in wr_index list; also used as wr id */ - - char *buf_large; /* allocated specially for "large" message */ - struct ibv_mr *mr_large; - int ref_cnt; /* reference count for ibw_wc_send to know when to release */ - - char *queued_msg; /* set at ibw_send - can be different than above */ - int queued_ref_cnt; /* instead of adding the same to the queue again */ - uint32_t queued_rlen; /* last wins when queued_ref_cnt>0; or simple msg size */ - - struct ibw_wr *next, *prev; /* in wr_list_avail or wr_list_used */ - /* or extra_sent or extra_avail */ - struct ibw_wr *qnext, *qprev; /* in queue */ -}; - -struct ibw_ctx_priv { - struct event_context *ectx; - - struct ibw_opts opts; - - struct rdma_cm_id *cm_id; /* server cm id */ - - struct rdma_event_channel *cm_channel; - struct fd_event *cm_channel_event; - - ibw_connstate_fn_t connstate_func; /* see ibw_init */ - ibw_receive_fn_t receive_func; /* see ibw_init */ - - long pagesize; /* sysconf result for memalign */ -}; - -struct ibw_part { - char *buf; /* talloced memory buffer */ - uint32_t bufsize; /* allocated size of buf - always grows */ - uint32_t len; /* message part length */ - uint32_t to_read; /* 4 or *((uint32_t)buf) if len>=sizeof(uint32_t) */ -}; - -struct ibw_conn_priv { - struct ibv_comp_channel *verbs_channel; - struct fd_event *verbs_channel_event; - - struct rdma_cm_id *cm_id; /* client's cm id */ - struct ibv_pd *pd; - int is_accepted; - - struct ibv_cq *cq; /* qp is in cm_id */ - - char *buf_send; /* max_send_wr * avg_send_size */ - struct ibv_mr *mr_send; - struct ibw_wr *wr_list_avail; - struct ibw_wr *wr_list_used; - struct ibw_wr **wr_index; /* array[0..(qsize-1)] of (ibw_wr *) */ - int wr_sent; /* # of send wrs in the CQ */ - - struct ibw_wr *extra_sent; - struct ibw_wr *extra_avail; - int extra_max; /* max wr_id in the queue */ - - struct ibw_wr *queue; - - /* buf_recv is a ring buffer */ - char *buf_recv; /* max_recv_wr * avg_recv_size */ - struct ibv_mr *mr_recv; - int recv_index; /* index of the next recv buffer when refilling */ - struct ibw_part part; -}; - -/* remove an element from a list - element doesn't have to be in list. */ -#define DLIST_REMOVE2(list, p, prev, next) \ -do { \ - if ((p) == (list)) { \ - (list) = (p)->next; \ - if (list) (list)->prev = NULL; \ - } else { \ - if ((p)->prev) (p)->prev->next = (p)->next; \ - if ((p)->next) (p)->next->prev = (p)->prev; \ - } \ - if ((p) != (list)) (p)->next = (p)->prev = NULL; \ -} while (0) - -/* hook into the end of the list - needs a tmp pointer */ -#define DLIST_ADD_END2(list, p, type, prev, next) \ -do { \ - if (!(list)) { \ - (list) = (p); \ - (p)->next = (p)->prev = NULL; \ - } else { \ - type tmp; \ - for (tmp = (list); tmp->next; tmp = tmp->next) ; \ - tmp->next = (p); \ - (p)->next = NULL; \ - (p)->prev = tmp; \ - } \ -} while (0) diff --git a/source4/cluster/ctdb/ib/ibwrapper_test.c b/source4/cluster/ctdb/ib/ibwrapper_test.c deleted file mode 100644 index 1be37ddd28..0000000000 --- a/source4/cluster/ctdb/ib/ibwrapper_test.c +++ /dev/null @@ -1,659 +0,0 @@ -/* - * Unix SMB/CIFS implementation. - * Test the infiniband wrapper. - * - * Copyright (C) Sven Oehme <oehmes@de.ibm.com> 2006 - * - * Major code contributions by Peter Somogyi <psomogyi@gamax.hu> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, see <http://www.gnu.org/licenses/>. - */ - -#include <stdlib.h> -#include <string.h> -#include <stdio.h> -#include <errno.h> -#include <sys/types.h> -#include <netinet/in.h> -#include <sys/socket.h> -#include <netdb.h> -#include <arpa/inet.h> -#include <malloc.h> -#include <assert.h> -#include <unistd.h> -#include <signal.h> -#include <sys/time.h> -#include <time.h> - -#include "includes.h" -#include "lib/events/events.h" -#include "ib/ibwrapper.h" - -struct ibwtest_ctx { - int is_server; - char *id; /* my id */ - - struct ibw_initattr *attrs; - int nattrs; - char *opts; /* option string */ - - struct sockaddr_in *addrs; /* dynamic array of dest addrs */ - int naddrs; - - unsigned int nsec; /* delta times between messages in nanosec */ - unsigned int sleep_usec; /* microsecs to sleep in the main loop to emulate overloading */ - uint32_t maxsize; /* maximum variable message size */ - - int cnt; - int nsent; - - int nmsg; /* number of messages to send (client) */ - - int kill_me; - int stopping; - int error; - struct ibw_ctx *ibwctx; - - struct timeval start_time, end_time; -}; - -struct ibwtest_conn { - char *id; -}; - -enum testopcode { - TESTOP_SEND_ID = 1, - TESTOP_SEND_TEXT = 2, - TESTOP_SEND_RND = 3 -}; - -int ibwtest_connect_everybody(struct ibwtest_ctx *tcx) -{ - struct ibw_conn *conn; - struct ibwtest_conn *tconn = talloc_zero(tcx, struct ibwtest_conn); - int i; - - for(i=0; i<tcx->naddrs; i++) { - conn = ibw_conn_new(tcx->ibwctx, tconn); - if (ibw_connect(conn, &tcx->addrs[i], tconn)) { - fprintf(stderr, "ibw_connect error at %d\n", i); - return -1; - } - } - DEBUG(10, ("sent %d connect request...\n", tcx->naddrs)); - - return 0; -} - -int ibwtest_send_id(struct ibw_conn *conn) -{ - struct ibwtest_ctx *tcx = talloc_get_type(conn->ctx->ctx_userdata, struct ibwtest_ctx); - char *buf; - void *key; - uint32_t len; - - DEBUG(10, ("ibwtest_send_id\n")); - len = sizeof(uint32_t)+strlen(tcx->id)+2; - if (ibw_alloc_send_buf(conn, (void **)&buf, &key, len)) { - DEBUG(0, ("send_id: ibw_alloc_send_buf failed\n")); - return -1; - } - - /* first sizeof(uint32_t) size bytes are for length */ - *((uint32_t *)buf) = len; - buf[sizeof(uint32_t)] = (char)TESTOP_SEND_ID; - strcpy(buf+sizeof(uint32_t)+1, tcx->id); - - if (ibw_send(conn, buf, key, len)) { - DEBUG(0, ("send_id: ibw_send error\n")); - return -1; - } - tcx->nsent++; - - return 0; -} - -int ibwtest_send_test_msg(struct ibwtest_ctx *tcx, struct ibw_conn *conn, const char *msg) -{ - char *buf, *p; - void *key; - uint32_t len; - - if (conn->state!=IBWC_CONNECTED) - return 0; /* not yet up */ - - len = strlen(msg) + 2 + sizeof(uint32_t); - if (ibw_alloc_send_buf(conn, (void **)&buf, &key, len)) { - fprintf(stderr, "send_test_msg: ibw_alloc_send_buf failed\n"); - return -1; - } - - *((uint32_t *)buf) = len; - p = buf; - p += sizeof(uint32_t); - p[0] = (char)TESTOP_SEND_TEXT; - p++; - strcpy(p, msg); - - if (ibw_send(conn, buf, key, len)) { - DEBUG(0, ("send_test_msg: ibw_send error\n")); - return -1; - } - tcx->nsent++; - - return 0; -} - -unsigned char ibwtest_fill_random(unsigned char *buf, uint32_t size) -{ - uint32_t i = size; - unsigned char sum = 0; - unsigned char value; - while(i) { - i--; - value = (unsigned char)(256.0 * (rand() / (RAND_MAX + 1.0))); - buf[i] = value; - sum += value; - } - return sum; -} - -unsigned char ibwtest_get_sum(unsigned char *buf, uint32_t size) -{ - uint32_t i = size; - unsigned char sum = 0; - - while(i) { - i--; - sum += buf[i]; - } - return sum; -} - -int ibwtest_do_varsize_scenario_conn_size(struct ibwtest_ctx *tcx, struct ibw_conn *conn, uint32_t size) -{ - unsigned char *buf; - void *key; - uint32_t len; - unsigned char sum; - - len = sizeof(uint32_t) + 1 + size + 1; - if (ibw_alloc_send_buf(conn, (void **)&buf, &key, len)) { - DEBUG(0, ("varsize/ibw_alloc_send_buf failed\n")); - return -1; - } - *((uint32_t *)buf) = len; - buf[sizeof(uint32_t)] = TESTOP_SEND_RND; - sum = ibwtest_fill_random(buf + sizeof(uint32_t) + 1, size); - buf[sizeof(uint32_t) + 1 + size] = sum; - if (ibw_send(conn, buf, key, len)) { - DEBUG(0, ("varsize/ibw_send failed\n")); - return -1; - } - tcx->nsent++; - - return 0; -} - -int ibwtest_do_varsize_scenario_conn(struct ibwtest_ctx *tcx, struct ibw_conn *conn) -{ - uint32_t size; - int i; - - for(i=0; i<tcx->nmsg; i++) - { - //size = (uint32_t)((float)(tcx->maxsize) * (rand() / (RAND_MAX + 1.0))); - size = (uint32_t)((float)(tcx->maxsize) * ((float)(i+1)/(float)tcx->nmsg)); - if (ibwtest_do_varsize_scenario_conn_size(tcx, conn, size)) - return -1; - } - return 0; -} - -/*int ibwtest_do_varsize_scenario(ibwtest_ctx *tcx) -{ - int rc; - struct ibw_conn *conn; - - for(conn=tcx->ibwctx->conn_list; conn!=NULL; conn=conn->next) { - if (conn->state==IBWC_CONNECTED) { - rc = ibwtest_do_varsize_scenario_conn(tcx, conn); - if (rc) - tcx->error = rc; - } - } -}*/ - -int ibwtest_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn) -{ - struct ibwtest_ctx *tcx = NULL; /* userdata */ - struct ibwtest_conn *tconn = NULL; /* userdata */ - - if (ctx) { - tcx = talloc_get_type(ctx->ctx_userdata, struct ibwtest_ctx); - - switch(ctx->state) { - case IBWS_INIT: - DEBUG(10, ("test IBWS_INIT\n")); - break; - case IBWS_READY: - DEBUG(10, ("test IBWS_READY\n")); - break; - case IBWS_CONNECT_REQUEST: - DEBUG(10, ("test IBWS_CONNECT_REQUEST\n")); - tconn = talloc_zero(conn, struct ibwtest_conn); - if (ibw_accept(ctx, conn, tconn)) { - DEBUG(0, ("error accepting the connect request\n")); - } - break; - case IBWS_STOPPED: - DEBUG(10, ("test IBWS_STOPPED\n")); - tcx->kill_me = 1; /* main loop can exit */ - break; - case IBWS_ERROR: - DEBUG(10, ("test IBWS_ERROR\n")); - ibw_stop(tcx->ibwctx); - break; - default: - assert(0); - break; - } - } - - if (conn) { - tconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn); - switch(conn->state) { - case IBWC_INIT: - DEBUG(10, ("test IBWC_INIT\n")); - break; - case IBWC_CONNECTED: - if (gettimeofday(&tcx->start_time, NULL)) { - DEBUG(0, ("gettimeofday error %d", errno)); - return -1; - } - ibwtest_send_id(conn); - break; - case IBWC_DISCONNECTED: - DEBUG(10, ("test IBWC_DISCONNECTED\n")); - talloc_free(conn); - break; - case IBWC_ERROR: - DEBUG(10, ("test IBWC_ERROR %s\n", ibw_getLastError())); - break; - default: - assert(0); - break; - } - } - return 0; -} - -int ibwtest_receive_handler(struct ibw_conn *conn, void *buf, int n) -{ - struct ibwtest_conn *tconn; - enum testopcode op; - struct ibwtest_ctx *tcx = talloc_get_type(conn->ctx->ctx_userdata, struct ibwtest_ctx); - int rc = 0; - - assert(conn!=NULL); - assert(n>=sizeof(uint32_t)+1); - tconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn); - - op = (enum testopcode)((char *)buf)[sizeof(uint32_t)]; - if (op==TESTOP_SEND_ID) { - tconn->id = talloc_strdup(tconn, ((char *)buf)+sizeof(uint32_t)+1); - } - if (op==TESTOP_SEND_ID || op==TESTOP_SEND_TEXT) { - DEBUG(11, ("[%d]msg from %s: \"%s\"(%d)\n", op, - tconn->id ? tconn->id : "NULL", ((char *)buf)+sizeof(uint32_t)+1, n)); - } - - if (tcx->is_server) { - if (op==TESTOP_SEND_RND) { - unsigned char sum; - sum = ibwtest_get_sum((unsigned char *)buf + sizeof(uint32_t) + 1, - n - sizeof(uint32_t) - 2); - DEBUG(11, ("[%d]msg varsize %u/sum %u from %s\n", - op, - n - sizeof(uint32_t) - 2, - (uint32_t)sum, - tconn->id ? tconn->id : "NULL")); - if (sum!=((unsigned char *)buf)[n-1]) { - DEBUG(0, ("ERROR: checksum mismatch %u!=%u\n", - (uint32_t)sum, (uint32_t)((unsigned char *)buf)[n-1])); - ibw_stop(tcx->ibwctx); - goto error; - } - } else if (op!=TESTOP_SEND_ID) { - char *buf2; - void *key2; - - /* bounce message regardless what it is */ - if (ibw_alloc_send_buf(conn, (void **)&buf2, &key2, n)) { - fprintf(stderr, "ibw_alloc_send_buf error #2\n"); - goto error; - } - memcpy(buf2, buf, n); - if (ibw_send(conn, buf2, key2, n)) { - fprintf(stderr, "ibw_send error #2\n"); - goto error; - } - tcx->nsent++; - } - } else { /* client: */ - if (op==TESTOP_SEND_ID && tcx->maxsize) { - /* send them in one blow */ - rc = ibwtest_do_varsize_scenario_conn(tcx, conn); - } - - if (tcx->nmsg) { - char msg[26]; - sprintf(msg, "hello world %d", tcx->nmsg--); - rc = ibwtest_send_test_msg(tcx, conn, msg); - if (tcx->nmsg==0) { - ibw_stop(tcx->ibwctx); - tcx->stopping = 1; - } - } - } - - if (rc) - tcx->error = rc; - - return rc; -error: - return -1; -} - -void ibwtest_timeout_handler(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private_data) -{ - struct ibwtest_ctx *tcx = talloc_get_type(private_data, struct ibwtest_ctx); - int rc; - - if (!tcx->is_server) { - struct ibw_conn *conn; - char msg[50]; - - /* fill it with something variable... */ - sprintf(msg, "hello world %d", tcx->cnt++); - - /* send something to everybody... */ - for(conn=tcx->ibwctx->conn_list; conn!=NULL; conn=conn->next) { - if (conn->state==IBWC_CONNECTED) { - rc = ibwtest_send_test_msg(tcx, conn, msg); - if (rc) - tcx->error = rc; - } - } - } /* else allow main loop run */ -} - -static struct ibwtest_ctx *testctx = NULL; - -void ibwtest_sigint_handler(int sig) -{ - DEBUG(0, ("got SIGINT\n")); - if (testctx) { - if (testctx->ibwctx->state==IBWS_READY || - testctx->ibwctx->state==IBWS_CONNECT_REQUEST || - testctx->ibwctx->state==IBWS_ERROR) - { - if (testctx->stopping) { - DEBUG(10, ("forcing exit...\n")); - testctx->kill_me = 1; - } else { - /* mostly expected case */ - ibw_stop(testctx->ibwctx); - testctx->stopping = 1; - } - } else - testctx->kill_me = 1; - } -} - -int ibwtest_parse_attrs(struct ibwtest_ctx *tcx, char *optext, - struct ibw_initattr **pattrs, int *nattrs, char op) -{ - int i = 0, n = 1; - int porcess_next = 1; - char *p, *q; - struct ibw_initattr *attrs = NULL; - - *pattrs = NULL; - for(p = optext; *p!='\0'; p++) { - if (*p==',') - n++; - } - - attrs = (struct ibw_initattr *)talloc_size(tcx, - n * sizeof(struct ibw_initattr)); - for(p = optext; *p!='\0'; p++) { - if (porcess_next) { - attrs[i].name = p; - q = strchr(p, ':'); - if (q==NULL) { - fprintf(stderr, "-%c format error\n", op); - return -1; - } - *q = '\0'; - attrs[i].value = q + 1; - - porcess_next = 0; - i++; - p = q; /* ++ at end */ - } - if (*p==',') { - *p = '\0'; /* ++ at end */ - porcess_next = 1; - } - } - *pattrs = attrs; - *nattrs = n; - - return 0; -} - -static int ibwtest_get_address(const char *address, struct in_addr *addr) -{ - if (inet_pton(AF_INET, address, addr) <= 0) { - struct hostent *he = gethostbyname(address); - if (he == NULL || he->h_length > sizeof(*addr)) { - DEBUG(0, ("invalid nework address '%s'\n", address)); - return -1; - } - memcpy(addr, he->h_addr, he->h_length); - } - return 0; -} - -int ibwtest_getdests(struct ibwtest_ctx *tcx, char op) -{ - int i; - struct ibw_initattr *attrs = NULL; - struct sockaddr_in *p; - char *tmp; - - tmp = talloc_strdup(tcx, optarg); - /* hack to reuse the above ibw_initattr parser */ - if (ibwtest_parse_attrs(tcx, tmp, &attrs, &tcx->naddrs, op)) - return -1; - - tcx->addrs = talloc_size(tcx, - tcx->naddrs * sizeof(struct sockaddr_in)); - for(i=0; i<tcx->naddrs; i++) { - p = tcx->addrs + i; - p->sin_family = AF_INET; - if (ibwtest_get_address(attrs[i].name, &p->sin_addr)) - return -1; - p->sin_port = htons(atoi(attrs[i].value)); - } - - return 0; -} - -int ibwtest_init_server(struct ibwtest_ctx *tcx) -{ - if (tcx->naddrs!=1) { - fprintf(stderr, "incorrect number of addrs(%d!=1)\n", tcx->naddrs); - return -1; - } - - if (ibw_bind(tcx->ibwctx, &tcx->addrs[0])) { - DEBUG(0, ("ERROR: ibw_bind failed\n")); - return -1; - } - - if (ibw_listen(tcx->ibwctx, 1)) { - DEBUG(0, ("ERROR: ibw_listen failed\n")); - return -1; - } - - /* continued at IBWS_READY */ - return 0; -} - -void ibwtest_usage(struct ibwtest_ctx *tcx, char *name) -{ - printf("Usage:\n"); - printf("\t%s -i <id> -o {name:value} -d {addr:port} -t nsec -s\n", name); - printf("\t-i <id> is a free text, acting as a server id, max 23 chars [mandatory]\n"); - printf("\t-o name1:value1,name2:value2,... is a list of (name, value) pairs\n"); - printf("\t-a addr1:port1,addr2:port2,... is a list of destination ip addresses\n"); - printf("\t-t nsec delta time between sends in nanosec [default %d]\n", tcx->nsec); - printf("\t\t send message periodically and endless when nsec is non-zero\n"); - printf("\t-s server mode (you have to give exactly one -d address:port in this case)\n"); - printf("\t-n number of messages to send [default %d]\n", tcx->nmsg); - printf("\t-l usec time to sleep in the main loop [default %d]\n", tcx->sleep_usec); - printf("\t-v max variable msg size in bytes [default %d], 0=don't send var. size\n", tcx->maxsize); - printf("\t-d LogLevel [default %d]\n", LogLevel); - printf("Press ctrl+C to stop the program.\n"); -} - -int main(int argc, char *argv[]) -{ - int rc, op; - int result = 1; - struct event_context *ev = NULL; - struct ibwtest_ctx *tcx = NULL; - float usec; - - tcx = talloc_zero(NULL, struct ibwtest_ctx); - memset(tcx, 0, sizeof(struct ibwtest_ctx)); - tcx->nsec = 0; - tcx->nmsg = 1000; - LogLevel = 0; - - /* here is the only case we can't avoid using global... */ - testctx = tcx; - signal(SIGINT, ibwtest_sigint_handler); - srand((unsigned)time(NULL)); - - while ((op=getopt(argc, argv, "i:o:d:m:st:n:l:v:a:")) != -1) { - switch (op) { - case 'i': - tcx->id = talloc_strdup(tcx, optarg); - break; - case 'o': - tcx->opts = talloc_strdup(tcx, optarg); - if (ibwtest_parse_attrs(tcx, tcx->opts, &tcx->attrs, - &tcx->nattrs, op)) - goto cleanup; - break; - case 'a': - if (ibwtest_getdests(tcx, op)) - goto cleanup; - break; - case 's': - tcx->is_server = 1; - break; - case 't': - tcx->nsec = (unsigned int)atoi(optarg); - break; - case 'n': - tcx->nmsg = atoi(optarg); - break; - case 'l': - tcx->sleep_usec = (unsigned int)atoi(optarg); - break; - case 'v': - tcx->maxsize = (unsigned int)atoi(optarg); - break; - case 'd': - LogLevel = atoi(optarg); - break; - default: - fprintf(stderr, "ERROR: unknown option -%c\n", (char)op); - ibwtest_usage(tcx, argv[0]); - goto cleanup; - } - } - if (tcx->id==NULL) { - ibwtest_usage(tcx, argv[0]); - goto cleanup; - } - - ev = s4_event_context_init(NULL); - assert(ev); - - tcx->ibwctx = ibw_init(tcx->attrs, tcx->nattrs, - tcx, - ibwtest_connstate_handler, - ibwtest_receive_handler, - ev - ); - if (!tcx->ibwctx) - goto cleanup; - - if (tcx->is_server) - rc = ibwtest_init_server(tcx); - else - rc = ibwtest_connect_everybody(tcx); - if (rc) - goto cleanup; - - while(!tcx->kill_me && !tcx->error) { - if (tcx->nsec) { - event_add_timed(ev, tcx, timeval_current_ofs(0, tcx->nsec), - ibwtest_timeout_handler, tcx); - } - - event_loop_once(ev); - - if (tcx->sleep_usec) - usleep(tcx->sleep_usec); - } - - if (!tcx->is_server && tcx->nsent!=0 && !tcx->error) { - if (gettimeofday(&tcx->end_time, NULL)) { - DEBUG(0, ("gettimeofday error %d\n", errno)); - goto cleanup; - } - usec = (tcx->end_time.tv_sec - tcx->start_time.tv_sec) * 1000000 + - (tcx->end_time.tv_usec - tcx->start_time.tv_usec); - printf("usec: %f, nmsg: %d, usec/nmsg: %f\n", - usec, tcx->nsent, usec/(float)tcx->nsent); - } - - if (!tcx->error) - result = 0; /* everything OK */ - -cleanup: - if (tcx) - talloc_free(tcx); - if (ev) - talloc_free(ev); - DEBUG(0, ("exited with code %d\n", result)); - return result; -} |