From 8d98070a9f0a1a17d05e381b8e4c5a8f7a7e8233 Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Thu, 26 Mar 2009 14:27:45 +0100 Subject: tsocket: split out a smaller tdgram_context abstraction The idea is to have a tdgram and a tstream abstraction which only provide tevent_req based io functions. metze --- lib/tsocket/tsocket.c | 264 +++++++++++++ lib/tsocket/tsocket.h | 43 +++ lib/tsocket/tsocket_bsd.c | 844 +++++++++++++++++++++++++++++++++++++++++ lib/tsocket/tsocket_internal.h | 41 ++ 4 files changed, 1192 insertions(+) (limited to 'lib') diff --git a/lib/tsocket/tsocket.c b/lib/tsocket/tsocket.c index 1a12e691a9..922429a1c1 100644 --- a/lib/tsocket/tsocket.c +++ b/lib/tsocket/tsocket.c @@ -229,3 +229,267 @@ int _tsocket_address_create_socket(const struct tsocket_address *addr, return addr->ops->create_socket(addr, type, mem_ctx, sock, location); } +struct tdgram_context { + const char *location; + const struct tdgram_context_ops *ops; + void *private_data; +}; + +struct tdgram_context *_tdgram_context_create(TALLOC_CTX *mem_ctx, + const struct tdgram_context_ops *ops, + void *pstate, + size_t psize, + const char *type, + const char *location) +{ + struct tdgram_context *dgram; + void **ppstate = (void **)pstate; + void *state; + + dgram = talloc(mem_ctx, struct tdgram_context); + if (dgram == NULL) { + return NULL; + } + dgram->location = location; + dgram->ops = ops; + + state = talloc_size(dgram, psize); + if (state == NULL) { + talloc_free(dgram); + return NULL; + } + talloc_set_name_const(state, type); + + dgram->private_data = state; + + *ppstate = state; + return dgram; +} + +void *_tdgram_context_data(struct tdgram_context *dgram) +{ + return dgram->private_data; +} + +struct tdgram_recvfrom_state { + const struct tdgram_context_ops *ops; + uint8_t *buf; + size_t len; + struct tsocket_address *src; +}; + +static void tdgram_recvfrom_done(struct tevent_req *subreq); + +struct tevent_req *tdgram_recvfrom_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tdgram_context *dgram) +{ + struct tevent_req *req; + struct tdgram_recvfrom_state *state; + struct tevent_req *subreq; + + req = tevent_req_create(mem_ctx, &state, + struct tdgram_recvfrom_state); + if (req == NULL) { + return NULL; + } + + state->ops = dgram->ops; + + subreq = state->ops->recvfrom_send(state, ev, dgram); + if (tevent_req_nomem(subreq, req)) { + goto post; + } + tevent_req_set_callback(subreq, tdgram_recvfrom_done, req); + + return req; + + post: + tevent_req_post(req, ev); + return req; +} + +static void tdgram_recvfrom_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data(subreq, + struct tevent_req); + struct tdgram_recvfrom_state *state = tevent_req_data(req, + struct tdgram_recvfrom_state); + ssize_t ret; + int sys_errno; + + ret = state->ops->recvfrom_recv(subreq, &sys_errno, state, + &state->buf, &state->src); + if (ret == -1) { + tevent_req_error(req, sys_errno); + return; + } + + state->len = ret; + + tevent_req_done(req); +} + +ssize_t tdgram_recvfrom_recv(struct tevent_req *req, + int *perrno, + TALLOC_CTX *mem_ctx, + uint8_t **buf, + struct tsocket_address **src) +{ + struct tdgram_recvfrom_state *state = tevent_req_data(req, + struct tdgram_recvfrom_state); + ssize_t ret; + + ret = tsocket_simple_int_recv(req, perrno); + if (ret == 0) { + *buf = talloc_move(mem_ctx, &state->buf); + ret = state->len; + if (src) { + *src = talloc_move(mem_ctx, &state->src); + } + } + + tevent_req_received(req); + return ret; +} + +struct tdgram_sendto_state { + const struct tdgram_context_ops *ops; + ssize_t ret; +}; + +static void tdgram_sendto_done(struct tevent_req *subreq); + +struct tevent_req *tdgram_sendto_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tdgram_context *dgram, + const uint8_t *buf, size_t len, + const struct tsocket_address *dst) +{ + struct tevent_req *req; + struct tdgram_sendto_state *state; + struct tevent_req *subreq; + + req = tevent_req_create(mem_ctx, &state, + struct tdgram_sendto_state); + if (req == NULL) { + return NULL; + } + + state->ops = dgram->ops; + state->ret = -1; + + subreq = state->ops->sendto_send(state, ev, dgram, + buf, len, dst); + if (tevent_req_nomem(subreq, req)) { + goto post; + } + tevent_req_set_callback(subreq, tdgram_sendto_done, req); + + return req; + + post: + tevent_req_post(req, ev); + return req; +} + +static void tdgram_sendto_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data(subreq, + struct tevent_req); + struct tdgram_sendto_state *state = tevent_req_data(req, + struct tdgram_sendto_state); + ssize_t ret; + int sys_errno; + + ret = state->ops->sendto_recv(subreq, &sys_errno); + if (ret == -1) { + tevent_req_error(req, sys_errno); + return; + } + + state->ret = ret; + + tevent_req_done(req); +} + +ssize_t tdgram_sendto_recv(struct tevent_req *req, + int *perrno) +{ + struct tdgram_sendto_state *state = tevent_req_data(req, + struct tdgram_sendto_state); + ssize_t ret; + + ret = tsocket_simple_int_recv(req, perrno); + if (ret == 0) { + ret = state->ret; + } + + tevent_req_received(req); + return ret; +} + +struct tdgram_disconnect_state { + const struct tdgram_context_ops *ops; +}; + +static void tdgram_disconnect_done(struct tevent_req *subreq); + +struct tevent_req *tdgram_disconnect_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tdgram_context *dgram) +{ + struct tevent_req *req; + struct tdgram_disconnect_state *state; + struct tevent_req *subreq; + + req = tevent_req_create(mem_ctx, &state, + struct tdgram_disconnect_state); + if (req == NULL) { + return NULL; + } + + state->ops = dgram->ops; + + subreq = state->ops->disconnect_send(state, ev, dgram); + if (tevent_req_nomem(subreq, req)) { + goto post; + } + tevent_req_set_callback(subreq, tdgram_disconnect_done, req); + + return req; + + post: + tevent_req_post(req, ev); + return req; +} + +static void tdgram_disconnect_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data(subreq, + struct tevent_req); + struct tdgram_disconnect_state *state = tevent_req_data(req, + struct tdgram_disconnect_state); + int ret; + int sys_errno; + + ret = state->ops->disconnect_recv(subreq, &sys_errno); + if (ret == -1) { + tevent_req_error(req, sys_errno); + return; + } + + tevent_req_done(req); +} + +int tdgram_disconnect_recv(struct tevent_req *req, + int *perrno) +{ + int ret; + + ret = tsocket_simple_int_recv(req, perrno); + + tevent_req_received(req); + return ret; +} + diff --git a/lib/tsocket/tsocket.h b/lib/tsocket/tsocket.h index 9bcfb5cb7e..077fd1ef35 100644 --- a/lib/tsocket/tsocket.h +++ b/lib/tsocket/tsocket.h @@ -29,6 +29,7 @@ struct tsocket_context; struct tsocket_address; +struct tdgram_context; struct iovec; enum tsocket_type { @@ -120,6 +121,32 @@ int _tsocket_address_create_socket(const struct tsocket_address *addr, _tsocket_address_create_socket(addr, type, mem_ctx, sock,\ __location__) +/* + * tdgram_context related functions + */ +struct tevent_req *tdgram_recvfrom_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tdgram_context *dgram); +ssize_t tdgram_recvfrom_recv(struct tevent_req *req, + int *perrno, + TALLOC_CTX *mem_ctx, + uint8_t **buf, + struct tsocket_address **src); + +struct tevent_req *tdgram_sendto_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tdgram_context *dgram, + const uint8_t *buf, size_t len, + const struct tsocket_address *dst); +ssize_t tdgram_sendto_recv(struct tevent_req *req, + int *perrno); + +struct tevent_req *tdgram_disconnect_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tdgram_context *dgram); +int tdgram_disconnect_recv(struct tevent_req *req, + int *perrno); + /* * BSD sockets: inet, inet6 and unix */ @@ -160,6 +187,22 @@ int _tsocket_context_bsd_wrap_existing(TALLOC_CTX *mem_ctx, _tsocket_context_bsd_wrap_existing(mem_ctx, fd, cod, _sock, \ __location__) +int _tdgram_inet_udp_socket(const struct tsocket_address *local, + const struct tsocket_address *remote, + TALLOC_CTX *mem_ctx, + struct tdgram_context **dgram, + const char *location); +#define tdgram_inet_udp_socket(local, remote, mem_ctx, dgram) \ + _tdgram_inet_udp_socket(local, remote, mem_ctx, dgram, __location__) + +int _tdgram_unix_dgram_socket(const struct tsocket_address *local, + const struct tsocket_address *remote, + TALLOC_CTX *mem_ctx, + struct tdgram_context **dgram, + const char *location); +#define tdgram_unix_dgram_socket(local, remote, mem_ctx, dgram) \ + _tdgram_unix_dgram_socket(local, remote, mem_ctx, dgram, __location__) + /* * Async helpers */ diff --git a/lib/tsocket/tsocket_bsd.c b/lib/tsocket/tsocket_bsd.c index 8254f5d9d3..6c60ef2ebd 100644 --- a/lib/tsocket/tsocket_bsd.c +++ b/lib/tsocket/tsocket_bsd.c @@ -24,9 +24,168 @@ #include "replace.h" #include "system/filesys.h" #include "system/network.h" +#include "system/filesys.h" #include "tsocket.h" #include "tsocket_internal.h" +static int tsocket_bsd_error_from_errno(int ret, + int sys_errno, + bool *retry) +{ + *retry = false; + + if (ret >= 0) { + return 0; + } + + if (ret != -1) { + return EIO; + } + + if (sys_errno == 0) { + return EIO; + } + + if (sys_errno == EINTR) { + *retry = true; + return sys_errno; + } + + if (sys_errno == EINPROGRESS) { + *retry = true; + return sys_errno; + } + + if (sys_errno == EAGAIN) { + *retry = true; + return sys_errno; + } + +#ifdef EWOULDBLOCK + if (sys_errno == EWOULDBLOCK) { + *retry = true; + return sys_errno; + } +#endif + + return sys_errno; +} + +static int tsocket_bsd_common_prepare_fd(int fd, bool high_fd) +{ + int i; + int sys_errno = 0; + int fds[3]; + int num_fds = 0; + + int result, flags; + + if (fd == -1) { + return -1; + } + + /* first make a fd >= 3 */ + if (high_fd) { + while (fd < 3) { + fds[num_fds++] = fd; + fd = dup(fd); + if (fd == -1) { + sys_errno = errno; + break; + } + } + for (i=0; i= 0) { + flags |= FD_CLOEXEC; + result = fcntl(fd, F_SETFD, flags); + } + if (result < 0) { + goto fail; + } +#endif + return fd; + + fail: + if (fd != -1) { + sys_errno = errno; + close(fd); + errno = sys_errno; + } + return -1; +} + +static ssize_t tsocket_bsd_pending(int fd) +{ + int ret; + int value = 0; + + ret = ioctl(fd, FIONREAD, &value); + if (ret == -1) { + return ret; + } + + if (ret == 0) { + if (value == 0) { + int error=0; + socklen_t len = sizeof(error); + /* + * if no data is available check if the socket + * is in error state. For dgram sockets + * it's the way to return ICMP error messages + * of connected sockets to the caller. + */ + ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, + &error, &len); + if (ret == -1) { + return ret; + } + if (error != 0) { + errno = error; + return -1; + } + } + return value; + } + + /* this should not be reached */ + errno = EIO; + return -1; +} + static const struct tsocket_context_ops tsocket_context_bsd_ops; static const struct tsocket_address_ops tsocket_address_bsd_ops; @@ -1125,3 +1284,688 @@ static const struct tsocket_context_ops tsocket_context_bsd_ops = { .disconnect = tsocket_context_bsd_disconnect }; + +struct tdgram_bsd { + int fd; + + void *event_ptr; + struct tevent_fd *fde; + + void *readable_private; + void (*readable_handler)(void *private_data); + void *writeable_private; + void (*writeable_handler)(void *private_data); + + struct tevent_req *read_req; + struct tevent_req *write_req; +}; + +static void tdgram_bsd_fde_handler(struct tevent_context *ev, + struct tevent_fd *fde, + uint16_t flags, + void *private_data) +{ + struct tdgram_bsd *bsds = talloc_get_type_abort(private_data, + struct tdgram_bsd); + + if (flags & TEVENT_FD_WRITE) { + bsds->writeable_handler(bsds->writeable_private); + return; + } + if (flags & TEVENT_FD_READ) { + bsds->readable_handler(bsds->readable_private); + return; + } +} + +static int tdgram_bsd_set_readable_handler(struct tdgram_bsd *bsds, + struct tevent_context *ev, + void (*handler)(void *private_data), + void *private_data) +{ + if (ev == NULL) { + if (handler) { + errno = EINVAL; + return -1; + } + + bsds->readable_handler = NULL; + bsds->readable_private = NULL; + TEVENT_FD_NOT_READABLE(bsds->fde); + + if (bsds->fde && !bsds->writeable_handler) { + /* we don't need the fd event anymore */ + bsds->event_ptr = NULL; + TALLOC_FREE(bsds->fde); + } + return 0; + } + + if (bsds->fde == NULL) { + bsds->fde = tevent_add_fd(ev, bsds, + bsds->fd, TEVENT_FD_READ, + tdgram_bsd_fde_handler, + bsds); + if (!bsds->fde) { + return -1; + } + + /* cache the event context we're running on */ + bsds->event_ptr = ev; + } + + /* read and write must use the same tevent_context */ + if (bsds->event_ptr != ev) { + errno = EINVAL; + return -1; + } + + TEVENT_FD_READABLE(bsds->fde); + bsds->readable_handler = handler; + bsds->readable_private = private_data; + + return 0; +} + +static int tdgram_bsd_set_writeable_handler(struct tdgram_bsd *bsds, + struct tevent_context *ev, + void (*handler)(void *private_data), + void *private_data) +{ + if (ev == NULL) { + if (handler) { + errno = EINVAL; + return -1; + } + + bsds->writeable_handler = NULL; + bsds->writeable_private = NULL; + TEVENT_FD_NOT_WRITEABLE(bsds->fde); + + if (bsds->fde && !bsds->readable_handler) { + /* we don't need the fd event anymore */ + bsds->event_ptr = NULL; + TALLOC_FREE(bsds->fde); + } + return 0; + } + + if (bsds->fde == NULL) { + bsds->fde = tevent_add_fd(ev, bsds, + bsds->fd, TEVENT_FD_WRITE, + tdgram_bsd_fde_handler, + bsds); + if (!bsds->fde) { + return -1; + } + + /* cache the event context we're running on */ + bsds->event_ptr = ev; + } + + /* read and write must use the same tevent_context */ + if (bsds->event_ptr != ev) { + errno = EINVAL; + return -1; + } + + TEVENT_FD_WRITEABLE(bsds->fde); + bsds->writeable_handler = handler; + bsds->writeable_private = private_data; + + return 0; +} + +struct tdgram_bsd_recvfrom_state { + struct tdgram_context *dgram; + + uint8_t *buf; + size_t len; + struct tsocket_address *src; +}; + +static int tdgram_bsd_recvfrom_destructor(struct tdgram_bsd_recvfrom_state *state) +{ + struct tdgram_bsd *bsds = tdgram_context_data(state->dgram, + struct tdgram_bsd); + + bsds->read_req = NULL; + tdgram_bsd_set_readable_handler(bsds, NULL, NULL, NULL); + + return 0; +} + +static void tdgram_bsd_recvfrom_handler(void *private_data); + +static struct tevent_req *tdgram_bsd_recvfrom_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tdgram_context *dgram) +{ + struct tevent_req *req; + struct tdgram_bsd_recvfrom_state *state; + struct tdgram_bsd *bsds = tdgram_context_data(dgram, struct tdgram_bsd); + int ret; + + req = tevent_req_create(mem_ctx, &state, + struct tdgram_bsd_recvfrom_state); + if (!req) { + return NULL; + } + + state->dgram = dgram; + state->buf = NULL; + state->len = 0; + state->src = NULL; + + if (bsds->read_req) { + tevent_req_error(req, EBUSY); + goto post; + } + bsds->read_req = req; + + talloc_set_destructor(state, tdgram_bsd_recvfrom_destructor); + + if (bsds->fd == -1) { + tevent_req_error(req, ENOTCONN); + goto post; + } + + ret = tdgram_bsd_set_readable_handler(bsds, ev, + tdgram_bsd_recvfrom_handler, + req); + if (ret == -1) { + tevent_req_error(req, errno); + goto post; + } + + return req; + + post: + tevent_req_post(req, ev); + return req; +} + +static void tdgram_bsd_recvfrom_handler(void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort(private_data, + struct tevent_req); + struct tdgram_bsd_recvfrom_state *state = tevent_req_data(req, + struct tdgram_bsd_recvfrom_state); + struct tdgram_context *dgram = state->dgram; + struct tdgram_bsd *bsds = tdgram_context_data(dgram, struct tdgram_bsd); + struct tsocket_address_bsd *bsda; + ssize_t ret; + struct sockaddr *sa = NULL; + socklen_t sa_len = 0; + int err; + bool retry; + + ret = tsocket_bsd_pending(bsds->fd); + if (ret == 0) { + /* retry later */ + return; + } + err = tsocket_bsd_error_from_errno(ret, errno, &retry); + if (retry) { + /* retry later */ + return; + } + if (tevent_req_error(req, err)) { + return; + } + + state->buf = talloc_array(state, uint8_t, ret); + if (tevent_req_nomem(state->buf, req)) { + return; + } + state->len = ret; + + state->src = tsocket_address_create(state, + &tsocket_address_bsd_ops, + &bsda, + struct tsocket_address_bsd, + __location__ "bsd_recvfrom"); + if (tevent_req_nomem(state->src, req)) { + return; + } + + ZERO_STRUCTP(bsda); + + sa = &bsda->u.sa; + sa_len = sizeof(bsda->u.ss); + + ret = recvfrom(bsds->fd, state->buf, state->len, 0, sa, &sa_len); + err = tsocket_error_from_errno(ret, errno, &retry); + if (retry) { + /* retry later */ + return; + } + if (tevent_req_error(req, err)) { + return; + } + + if (ret != state->len) { + tevent_req_error(req, EIO); + return; + } + + tevent_req_done(req); +} + +static ssize_t tdgram_bsd_recvfrom_recv(struct tevent_req *req, + int *perrno, + TALLOC_CTX *mem_ctx, + uint8_t **buf, + struct tsocket_address **src) +{ + struct tdgram_bsd_recvfrom_state *state = tevent_req_data(req, + struct tdgram_bsd_recvfrom_state); + ssize_t ret; + + ret = tsocket_simple_int_recv(req, perrno); + if (ret == 0) { + *buf = talloc_move(mem_ctx, &state->buf); + ret = state->len; + if (src) { + *src = talloc_move(mem_ctx, &state->src); + } + } + + tevent_req_received(req); + return ret; +} + +struct tdgram_bsd_sendto_state { + struct tdgram_context *dgram; + + const uint8_t *buf; + size_t len; + const struct tsocket_address *dst; + + ssize_t ret; +}; + +static int tdgram_bsd_sendto_destructor(struct tdgram_bsd_sendto_state *state) +{ + struct tdgram_bsd *bsds = tdgram_context_data(state->dgram, + struct tdgram_bsd); + + bsds->write_req = NULL; + tdgram_bsd_set_writeable_handler(bsds, NULL, NULL, NULL); + return 0; +} + +static void tdgram_bsd_sendto_handler(void *private_data); + +static struct tevent_req *tdgram_bsd_sendto_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tdgram_context *dgram, + const uint8_t *buf, + size_t len, + const struct tsocket_address *dst) +{ + struct tevent_req *req; + struct tdgram_bsd_sendto_state *state; + struct tdgram_bsd *bsds = tdgram_context_data(dgram, struct tdgram_bsd); + int ret; + + req = tevent_req_create(mem_ctx, &state, + struct tdgram_bsd_sendto_state); + if (!req) { + return NULL; + } + + state->dgram = dgram; + state->buf = buf; + state->len = len; + state->dst = dst; + state->ret = -1; + + if (bsds->write_req) { + tevent_req_error(req, EBUSY); + goto post; + } + bsds->write_req = req; + + talloc_set_destructor(state, tdgram_bsd_sendto_destructor); + + if (bsds->fd == -1) { + tevent_req_error(req, ENOTCONN); + goto post; + } + + ret = tdgram_bsd_set_writeable_handler(bsds, ev, + tdgram_bsd_sendto_handler, + req); + if (ret == -1) { + tevent_req_error(req, errno); + goto post; + } + + return req; + + post: + tevent_req_post(req, ev); + return req; +} + +static void tdgram_bsd_sendto_handler(void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort(private_data, + struct tevent_req); + struct tdgram_bsd_sendto_state *state = tevent_req_data(req, + struct tdgram_bsd_sendto_state); + struct tdgram_context *dgram = state->dgram; + struct tdgram_bsd *bsds = tdgram_context_data(dgram, struct tdgram_bsd); + struct sockaddr *sa = NULL; + socklen_t sa_len = 0; + ssize_t ret; + int err; + bool retry; + + if (state->dst) { + struct tsocket_address_bsd *bsda = + talloc_get_type(state->dst->private_data, + struct tsocket_address_bsd); + + sa = &bsda->u.sa; + sa_len = sizeof(bsda->u.ss); + } + + ret = sendto(bsds->fd, state->buf, state->len, 0, sa, sa_len); + err = tsocket_error_from_errno(ret, errno, &retry); + if (retry) { + /* retry later */ + return; + } + if (tevent_req_error(req, err)) { + return; + } + + state->ret = ret; + + tevent_req_done(req); +} + +static ssize_t tdgram_bsd_sendto_recv(struct tevent_req *req, int *perrno) +{ + struct tdgram_bsd_sendto_state *state = tevent_req_data(req, + struct tdgram_bsd_sendto_state); + ssize_t ret; + + ret = tsocket_simple_int_recv(req, perrno); + if (ret == 0) { + ret = state->ret; + } + + tevent_req_received(req); + return ret; +} + +struct tdgram_bsd_disconnect_state { + int ret; +}; + +static struct tevent_req *tdgram_bsd_disconnect_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tdgram_context *dgram) +{ + struct tdgram_bsd *bsds = tdgram_context_data(dgram, struct tdgram_bsd); + struct tevent_req *req; + struct tdgram_bsd_disconnect_state *state; + int ret; + int err; + bool dummy; + + req = tevent_req_create(mem_ctx, &state, + struct tdgram_bsd_disconnect_state); + if (req == NULL) { + return NULL; + } + state->ret = -1; + + if (bsds->read_req || bsds->write_req) { + tevent_req_error(req, EBUSY); + goto post; + } + + if (bsds->fd == -1) { + tevent_req_error(req, ENOTCONN); + goto post; + } + + state->ret = close(bsds->fd); + bsds->fd = -1; + err = tsocket_error_from_errno(ret, errno, &dummy); + if (tevent_req_error(req, err)) { + goto post; + } + + tevent_req_done(req); +post: + tevent_req_post(req, ev); + return req; +} + +static int tdgram_bsd_disconnect_recv(struct tevent_req *req, + int *perrno) +{ + struct tdgram_bsd_disconnect_state *state = tevent_req_data(req, + struct tdgram_bsd_disconnect_state); + int ret; + + ret = tsocket_simple_int_recv(req, perrno); + if (ret == 0) { + ret = state->ret; + } + + tevent_req_received(req); + return ret; +} + +static const struct tdgram_context_ops tdgram_bsd_ops = { + .name = "bsd", + + .recvfrom_send = tdgram_bsd_recvfrom_send, + .recvfrom_recv = tdgram_bsd_recvfrom_recv, + + .sendto_send = tdgram_bsd_sendto_send, + .sendto_recv = tdgram_bsd_sendto_recv, + + .disconnect_send = tdgram_bsd_disconnect_send, + .disconnect_recv = tdgram_bsd_disconnect_recv, +}; + +static int tdgram_bsd_destructor(struct tdgram_bsd *bsds) +{ + TALLOC_FREE(bsds->fde); + if (bsds->fd != -1) { + close(bsds->fd); + bsds->fd = -1; + } + return 0; +} + +static int tdgram_bsd_dgram_socket(const struct tsocket_address *local, + const struct tsocket_address *remote, + TALLOC_CTX *mem_ctx, + struct tdgram_context **_dgram, + const char *location) +{ + struct tsocket_address_bsd *lbsda = + talloc_get_type_abort(local->private_data, + struct tsocket_address_bsd); + struct tsocket_address_bsd *rbsda = NULL; + struct tdgram_context *dgram; + struct tdgram_bsd *bsds; + int fd; + int ret; + bool do_bind = false; + bool do_reuseaddr = false; + + if (remote) { + rbsda = talloc_get_type_abort(remote->private_data, + struct tsocket_address_bsd); + } + + switch (lbsda->u.sa.sa_family) { + case AF_UNIX: + if (lbsda->u.un.sun_path[0] != 0) { + do_reuseaddr = true; + do_bind = true; + } + break; + case AF_INET: + if (lbsda->u.in.sin_port != 0) { + do_reuseaddr = true; + do_bind = true; + } + if (lbsda->u.in.sin_addr.s_addr == INADDR_ANY) { + do_bind = true; + } + break; +#ifdef HAVE_IPV6 + case AF_INET6: + if (lbsda->u.in6.sin6_port != 0) { + do_reuseaddr = true; + do_bind = true; + } + if (memcmp(&in6addr_any, + &lbsda->u.in6.sin6_addr, + sizeof(in6addr_any)) != 0) { + do_bind = true; + } + break; +#endif + default: + errno = EINVAL; + return -1; + } + + fd = socket(lbsda->u.sa.sa_family, SOCK_DGRAM, 0); + if (fd < 0) { + return fd; + } + + fd = tsocket_bsd_common_prepare_fd(fd, true); + if (fd < 0) { + return fd; + } + + dgram = tdgram_context_create(mem_ctx, + &tdgram_bsd_ops, + &bsds, + struct tdgram_bsd, + location); + if (!dgram) { + int saved_errno = errno; + close(fd); + errno = saved_errno; + return -1; + } + ZERO_STRUCTP(bsds); + bsds->fd = fd; + talloc_set_destructor(bsds, tdgram_bsd_destructor); + + if (lbsda->broadcast) { + int val = 1; + + ret = setsockopt(fd, SOL_SOCKET, SO_BROADCAST, + (const void *)&val, sizeof(val)); + if (ret == -1) { + int saved_errno = errno; + talloc_free(dgram); + errno = saved_errno; + return ret; + } + } + + if (do_reuseaddr) { + int val = 1; + + ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, + (const void *)&val, sizeof(val)); + if (ret == -1) { + int saved_errno = errno; + talloc_free(dgram); + errno = saved_errno; + return ret; + } + } + + if (do_bind) { + ret = bind(fd, &lbsda->u.sa, sizeof(lbsda->u.ss)); + if (ret == -1) { + int saved_errno = errno; + talloc_free(dgram); + errno = saved_errno; + return ret; + } + } + + if (rbsda) { + ret = connect(fd, &rbsda->u.sa, sizeof(rbsda->u.ss)); + if (ret == -1) { + int saved_errno = errno; + talloc_free(dgram); + errno = saved_errno; + return ret; + } + } + + *_dgram = dgram; + return 0; +} + +int _tdgram_inet_udp_socket(const struct tsocket_address *local, + const struct tsocket_address *remote, + TALLOC_CTX *mem_ctx, + struct tdgram_context **dgram, + const char *location) +{ + struct tsocket_address_bsd *lbsda = + talloc_get_type_abort(local->private_data, + struct tsocket_address_bsd); + int ret; + + switch (lbsda->u.sa.sa_family) { + case AF_INET: + break; +#ifdef HAVE_IPV6 + case AF_INET6: + break; +#endif + default: + errno = EINVAL; + return -1; + } + + ret = tdgram_bsd_dgram_socket(local, remote, mem_ctx, dgram, location); + + return ret; +} + +int _tdgram_unix_dgram_socket(const struct tsocket_address *local, + const struct tsocket_address *remote, + TALLOC_CTX *mem_ctx, + struct tdgram_context **dgram, + const char *location) +{ + struct tsocket_address_bsd *lbsda = + talloc_get_type_abort(local->private_data, + struct tsocket_address_bsd); + int ret; + + switch (lbsda->u.sa.sa_family) { + case AF_UNIX: + break; + default: + errno = EINVAL; + return -1; + } + + ret = tdgram_bsd_dgram_socket(local, remote, mem_ctx, dgram, location); + + return ret; +} + diff --git a/lib/tsocket/tsocket_internal.h b/lib/tsocket/tsocket_internal.h index e4a4908f3e..d1f240eba0 100644 --- a/lib/tsocket/tsocket_internal.h +++ b/lib/tsocket/tsocket_internal.h @@ -149,6 +149,47 @@ struct tsocket_address *_tsocket_address_create(TALLOC_CTX *mem_ctx, _tsocket_address_create(mem_ctx, ops, state, sizeof(type), \ #type, location) +struct tdgram_context_ops { + const char *name; + + struct tevent_req *(*recvfrom_send)(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tdgram_context *dgram); + ssize_t (*recvfrom_recv)(struct tevent_req *req, + int *perrno, + TALLOC_CTX *mem_ctx, + uint8_t **buf, + struct tsocket_address **src); + + struct tevent_req *(*sendto_send)(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tdgram_context *dgram, + const uint8_t *buf, size_t len, + const struct tsocket_address *dst); + ssize_t (*sendto_recv)(struct tevent_req *req, + int *perrno); + + struct tevent_req *(*disconnect_send)(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tdgram_context *dgram); + int (*disconnect_recv)(struct tevent_req *req, + int *perrno); +}; + +struct tdgram_context *_tdgram_context_create(TALLOC_CTX *mem_ctx, + const struct tdgram_context_ops *ops, + void *pstate, + size_t psize, + const char *type, + const char *location); +#define tdgram_context_create(mem_ctx, ops, state, type, location) \ + _tdgram_context_create(mem_ctx, ops, state, sizeof(type), \ + #type, location) + +void *_tdgram_context_data(struct tdgram_context *dgram); +#define tdgram_context_data(_req, _type) \ + talloc_get_type_abort(_tdgram_context_data(_req), _type) + int tsocket_error_from_errno(int ret, int sys_errno, bool *retry); int tsocket_simple_int_recv(struct tevent_req *req, int *perrno); int tsocket_common_prepare_fd(int fd, bool high_fd); -- cgit From 85742dbc0651a3413e90afa18023cd55ae72e6db Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Sat, 28 Mar 2009 23:25:28 +0100 Subject: tsocket: add tdgram_sendto_queue_send/recv() metze --- lib/tsocket/tsocket.c | 130 ++++++++++++++++++++++++++++++++++++++++++++++++++ lib/tsocket/tsocket.h | 13 +++++ 2 files changed, 143 insertions(+) (limited to 'lib') diff --git a/lib/tsocket/tsocket.c b/lib/tsocket/tsocket.c index 922429a1c1..a8f3a3909b 100644 --- a/lib/tsocket/tsocket.c +++ b/lib/tsocket/tsocket.c @@ -493,3 +493,133 @@ int tdgram_disconnect_recv(struct tevent_req *req, return ret; } +struct tdgram_sendto_queue_state { + /* this structs are owned by the caller */ + struct { + struct tevent_context *ev; + struct tdgram_context *dgram; + const uint8_t *buf; + size_t len; + const struct tsocket_address *dst; + } caller; + ssize_t ret; +}; + +static void tdgram_sendto_queue_trigger(struct tevent_req *req, + void *private_data); +static void tdgram_sendto_queue_done(struct tevent_req *subreq); + +/** + * @brief Queue a dgram blob for sending through the socket + * @param[in] mem_ctx The memory context for the result + * @param[in] ev The event context the operation should work on + * @param[in] dgram The tdgram_context to send the message buffer + * @param[in] queue The existing dgram queue + * @param[in] buf The message buffer + * @param[in] len The message length + * @param[in] dst The destination socket address + * @retval The async request handle + * + * This function queues a blob for sending to destination through an existing + * dgram socket. The async callback is triggered when the whole blob is + * delivered to the underlying system socket. + * + * The caller needs to make sure that all non-scalar input parameters hang + * arround for the whole lifetime of the request. + */ +struct tevent_req *tdgram_sendto_queue_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tdgram_context *dgram, + struct tevent_queue *queue, + const uint8_t *buf, + size_t len, + struct tsocket_address *dst) +{ + struct tevent_req *req; + struct tdgram_sendto_queue_state *state; + bool ok; + + req = tevent_req_create(mem_ctx, &state, + struct tdgram_sendto_queue_state); + if (!req) { + return NULL; + } + + state->caller.ev = ev; + state->caller.dgram = dgram; + state->caller.buf = buf; + state->caller.len = len; + state->caller.dst = dst; + state->ret = -1; + + ok = tevent_queue_add(queue, + ev, + req, + tdgram_sendto_queue_trigger, + NULL); + if (!ok) { + tevent_req_nomem(NULL, req); + goto post; + } + + return req; + + post: + tevent_req_post(req, ev); + return req; +} + +static void tdgram_sendto_queue_trigger(struct tevent_req *req, + void *private_data) +{ + struct tdgram_sendto_queue_state *state = tevent_req_data(req, + struct tdgram_sendto_queue_state); + struct tevent_req *subreq; + + subreq = tdgram_sendto_send(state, + state->caller.ev, + state->caller.dgram, + state->caller.buf, + state->caller.len, + state->caller.dst); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, tdgram_sendto_queue_done, req); +} + +static void tdgram_sendto_queue_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data(subreq, + struct tevent_req); + struct tdgram_sendto_queue_state *state = tevent_req_data(req, + struct tdgram_sendto_queue_state); + ssize_t ret; + int sys_errno; + + ret = tdgram_sendto_recv(subreq, &sys_errno); + talloc_free(subreq); + if (ret == -1) { + tevent_req_error(req, sys_errno); + return; + } + state->ret = ret; + + tevent_req_done(req); +} + +ssize_t tdgram_sendto_queue_recv(struct tevent_req *req, int *perrno) +{ + struct tdgram_sendto_queue_state *state = tevent_req_data(req, + struct tdgram_sendto_queue_state); + ssize_t ret; + + ret = tsocket_simple_int_recv(req, perrno); + if (ret == 0) { + ret = state->ret; + } + + tevent_req_received(req); + return ret; +} + diff --git a/lib/tsocket/tsocket.h b/lib/tsocket/tsocket.h index 077fd1ef35..ec891c34dd 100644 --- a/lib/tsocket/tsocket.h +++ b/lib/tsocket/tsocket.h @@ -259,5 +259,18 @@ struct tevent_req *tsocket_readv_send(struct tsocket_context *sock, void *private_data); int tsocket_readv_recv(struct tevent_req *req, int *perrno); +/* + * Queue helpers + */ + +struct tevent_req *tdgram_sendto_queue_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tdgram_context *dgram, + struct tevent_queue *queue, + const uint8_t *buf, + size_t len, + struct tsocket_address *dst); +ssize_t tdgram_sendto_queue_recv(struct tevent_req *req, int *perrno); + #endif /* _TSOCKET_H */ -- cgit From c59ee5a139421762adb6f3f4bbfc21723c2ce407 Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Thu, 2 Apr 2009 10:36:03 +0200 Subject: tsocket: optimize tdgram_bsd a lot The desire is to do as less syscalls during the tdgram_sendto_send/recv() and tdgram_recvfrom_send/recv() operations. 1. we first try the sendto()/recvfrom() syscall and only use a fd event if we got EAGAIN. 2. we cache the fd event and only change it's flags if really needed. For the highload case we do almost no epoll_ctl() and epoll_wait()/select() syscalls anymore. This speeds up the LDAP-BENCH-CLDAP test by more than 20%. (With a modified version of this test which let the server skip any ldb calls and just return success I'm getting about 8000 requests per second, while I'm getting just about 6000 requests per second without optimization) metze --- lib/tsocket/tsocket_bsd.c | 81 +++++++++++++++++++++++++++++++---------------- 1 file changed, 54 insertions(+), 27 deletions(-) (limited to 'lib') diff --git a/lib/tsocket/tsocket_bsd.c b/lib/tsocket/tsocket_bsd.c index 6c60ef2ebd..db1fd38bdb 100644 --- a/lib/tsocket/tsocket_bsd.c +++ b/lib/tsocket/tsocket_bsd.c @@ -1313,6 +1313,10 @@ static void tdgram_bsd_fde_handler(struct tevent_context *ev, return; } if (flags & TEVENT_FD_READ) { + if (!bsds->readable_handler) { + TEVENT_FD_NOT_READABLE(bsds->fde); + return; + } bsds->readable_handler(bsds->readable_private); return; } @@ -1328,19 +1332,25 @@ static int tdgram_bsd_set_readable_handler(struct tdgram_bsd *bsds, errno = EINVAL; return -1; } - + if (!bsds->readable_handler) { + return 0; + } bsds->readable_handler = NULL; bsds->readable_private = NULL; - TEVENT_FD_NOT_READABLE(bsds->fde); - if (bsds->fde && !bsds->writeable_handler) { - /* we don't need the fd event anymore */ - bsds->event_ptr = NULL; - TALLOC_FREE(bsds->fde); - } return 0; } + /* read and write must use the same tevent_context */ + if (bsds->event_ptr != ev) { + if (bsds->readable_handler || bsds->writeable_handler) { + errno = EINVAL; + return -1; + } + bsds->event_ptr = NULL; + TALLOC_FREE(bsds->fde); + } + if (bsds->fde == NULL) { bsds->fde = tevent_add_fd(ev, bsds, bsds->fd, TEVENT_FD_READ, @@ -1352,15 +1362,10 @@ static int tdgram_bsd_set_readable_handler(struct tdgram_bsd *bsds, /* cache the event context we're running on */ bsds->event_ptr = ev; + } else if (!bsds->readable_handler) { + TEVENT_FD_READABLE(bsds->fde); } - /* read and write must use the same tevent_context */ - if (bsds->event_ptr != ev) { - errno = EINVAL; - return -1; - } - - TEVENT_FD_READABLE(bsds->fde); bsds->readable_handler = handler; bsds->readable_private = private_data; @@ -1377,19 +1382,26 @@ static int tdgram_bsd_set_writeable_handler(struct tdgram_bsd *bsds, errno = EINVAL; return -1; } - + if (!bsds->writeable_handler) { + return 0; + } bsds->writeable_handler = NULL; bsds->writeable_private = NULL; TEVENT_FD_NOT_WRITEABLE(bsds->fde); - if (bsds->fde && !bsds->readable_handler) { - /* we don't need the fd event anymore */ - bsds->event_ptr = NULL; - TALLOC_FREE(bsds->fde); - } return 0; } + /* read and write must use the same tevent_context */ + if (bsds->event_ptr != ev) { + if (bsds->readable_handler || bsds->writeable_handler) { + errno = EINVAL; + return -1; + } + bsds->event_ptr = NULL; + TALLOC_FREE(bsds->fde); + } + if (bsds->fde == NULL) { bsds->fde = tevent_add_fd(ev, bsds, bsds->fd, TEVENT_FD_WRITE, @@ -1401,15 +1413,10 @@ static int tdgram_bsd_set_writeable_handler(struct tdgram_bsd *bsds, /* cache the event context we're running on */ bsds->event_ptr = ev; + } else if (!bsds->writeable_handler) { + TEVENT_FD_WRITEABLE(bsds->fde); } - /* read and write must use the same tevent_context */ - if (bsds->event_ptr != ev) { - errno = EINVAL; - return -1; - } - - TEVENT_FD_WRITEABLE(bsds->fde); bsds->writeable_handler = handler; bsds->writeable_private = private_data; @@ -1470,6 +1477,16 @@ static struct tevent_req *tdgram_bsd_recvfrom_send(TALLOC_CTX *mem_ctx, goto post; } + /* + * this is a fast path, not waiting for the + * socket to become explicit readable gains + * about 10%-20% performance in benchmark tests. + */ + tdgram_bsd_recvfrom_handler(req); + if (!tevent_req_is_in_progress(req)) { + goto post; + } + ret = tdgram_bsd_set_readable_handler(bsds, ev, tdgram_bsd_recvfrom_handler, req); @@ -1634,6 +1651,16 @@ static struct tevent_req *tdgram_bsd_sendto_send(TALLOC_CTX *mem_ctx, goto post; } + /* + * this is a fast path, not waiting for the + * socket to become explicit writeable gains + * about 10%-20% performance in benchmark tests. + */ + tdgram_bsd_sendto_handler(req); + if (!tevent_req_is_in_progress(req)) { + goto post; + } + ret = tdgram_bsd_set_writeable_handler(bsds, ev, tdgram_bsd_sendto_handler, req); -- cgit From 3bbad34a02350c96cb44d53da510c6273b6910d7 Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Thu, 2 Apr 2009 21:06:27 +0200 Subject: tsocket: remove DGRAM support from tsocket_context metze --- lib/tsocket/config.mk | 2 - lib/tsocket/tsocket.c | 15 --- lib/tsocket/tsocket.h | 32 ----- lib/tsocket/tsocket_bsd.c | 72 ----------- lib/tsocket/tsocket_internal.h | 8 -- lib/tsocket/tsocket_recvfrom.c | 164 ------------------------- lib/tsocket/tsocket_sendto.c | 271 ----------------------------------------- 7 files changed, 564 deletions(-) delete mode 100644 lib/tsocket/tsocket_recvfrom.c delete mode 100644 lib/tsocket/tsocket_sendto.c (limited to 'lib') diff --git a/lib/tsocket/config.mk b/lib/tsocket/config.mk index c35f0afd6f..2e05f544c9 100644 --- a/lib/tsocket/config.mk +++ b/lib/tsocket/config.mk @@ -5,8 +5,6 @@ LIBTSOCKET_OBJ_FILES = $(addprefix ../lib/tsocket/, \ tsocket.o \ tsocket_helpers.o \ tsocket_bsd.o \ - tsocket_recvfrom.o \ - tsocket_sendto.o \ tsocket_connect.o \ tsocket_writev.o \ tsocket_readv.o) diff --git a/lib/tsocket/tsocket.c b/lib/tsocket/tsocket.c index a8f3a3909b..076c6474a0 100644 --- a/lib/tsocket/tsocket.c +++ b/lib/tsocket/tsocket.c @@ -118,21 +118,6 @@ int tsocket_writev(struct tsocket_context *sock, return sock->ops->writev_data(sock, vector, count); } -ssize_t tsocket_recvfrom(struct tsocket_context *sock, - uint8_t *data, size_t len, - TALLOC_CTX *addr_ctx, - struct tsocket_address **src_addr) -{ - return sock->ops->recvfrom_data(sock, data, len, addr_ctx, src_addr); -} - -ssize_t tsocket_sendto(struct tsocket_context *sock, - const uint8_t *data, size_t len, - const struct tsocket_address *dest_addr) -{ - return sock->ops->sendto_data(sock, data, len, dest_addr); -} - int tsocket_get_status(const struct tsocket_context *sock) { return sock->ops->get_status(sock); diff --git a/lib/tsocket/tsocket.h b/lib/tsocket/tsocket.h index ec891c34dd..8f69490012 100644 --- a/lib/tsocket/tsocket.h +++ b/lib/tsocket/tsocket.h @@ -34,7 +34,6 @@ struct iovec; enum tsocket_type { TSOCKET_TYPE_STREAM = 1, - TSOCKET_TYPE_DGRAM, TSOCKET_TYPE_MESSAGE }; @@ -68,14 +67,6 @@ int tsocket_readv(struct tsocket_context *sock, int tsocket_writev(struct tsocket_context *sock, const struct iovec *vector, size_t count); -ssize_t tsocket_recvfrom(struct tsocket_context *sock, - uint8_t *data, size_t len, - TALLOC_CTX *addr_ctx, - struct tsocket_address **src_addr); -ssize_t tsocket_sendto(struct tsocket_context *sock, - const uint8_t *data, size_t len, - const struct tsocket_address *dest_addr); - int tsocket_get_status(const struct tsocket_context *sock); int _tsocket_get_local_address(const struct tsocket_context *sock, @@ -207,29 +198,6 @@ int _tdgram_unix_dgram_socket(const struct tsocket_address *local, * Async helpers */ -struct tevent_req *tsocket_recvfrom_send(struct tsocket_context *sock, - TALLOC_CTX *mem_ctx); -ssize_t tsocket_recvfrom_recv(struct tevent_req *req, - int *perrno, - TALLOC_CTX *mem_ctx, - uint8_t **buf, - struct tsocket_address **src); - -struct tevent_req *tsocket_sendto_send(struct tsocket_context *sock, - TALLOC_CTX *mem_ctx, - const uint8_t *buf, - size_t len, - const struct tsocket_address *dst); -ssize_t tsocket_sendto_recv(struct tevent_req *req, int *perrno); - -struct tevent_req *tsocket_sendto_queue_send(TALLOC_CTX *mem_ctx, - struct tsocket_context *sock, - struct tevent_queue *queue, - const uint8_t *buf, - size_t len, - struct tsocket_address *dst); -ssize_t tsocket_sendto_queue_recv(struct tevent_req *req, int *perrno); - struct tevent_req *tsocket_connect_send(struct tsocket_context *sock, TALLOC_CTX *mem_ctx, const struct tsocket_address *dst); diff --git a/lib/tsocket/tsocket_bsd.c b/lib/tsocket/tsocket_bsd.c index db1fd38bdb..4ccaff46e3 100644 --- a/lib/tsocket/tsocket_bsd.c +++ b/lib/tsocket/tsocket_bsd.c @@ -615,9 +615,6 @@ static int tsocket_address_bsd_create_socket(const struct tsocket_address *addr, } bsd_type = SOCK_STREAM; break; - case TSOCKET_TYPE_DGRAM: - bsd_type = SOCK_DGRAM; - break; default: errno = EPROTONOSUPPORT; return -1; @@ -944,73 +941,6 @@ static int tsocket_context_bsd_writev_data(struct tsocket_context *sock, return ret; } -static ssize_t tsocket_context_bsd_recvfrom_data(struct tsocket_context *sock, - uint8_t *data, size_t len, - TALLOC_CTX *addr_ctx, - struct tsocket_address **remote) -{ - struct tsocket_context_bsd *bsds = talloc_get_type(sock->private_data, - struct tsocket_context_bsd); - struct tsocket_address *addr = NULL; - struct tsocket_address_bsd *bsda; - ssize_t ret; - struct sockaddr *sa = NULL; - socklen_t sa_len = 0; - - if (remote) { - addr = tsocket_address_create(addr_ctx, - &tsocket_address_bsd_ops, - &bsda, - struct tsocket_address_bsd, - __location__ "recvfrom"); - if (!addr) { - return -1; - } - - ZERO_STRUCTP(bsda); - - sa = &bsda->u.sa; - sa_len = sizeof(bsda->u.ss); - } - - ret = recvfrom(bsds->fd, data, len, 0, sa, &sa_len); - if (ret < 0) { - int saved_errno = errno; - talloc_free(addr); - errno = saved_errno; - return ret; - } - - if (remote) { - *remote = addr; - } - return ret; -} - -static ssize_t tsocket_context_bsd_sendto_data(struct tsocket_context *sock, - const uint8_t *data, size_t len, - const struct tsocket_address *remote) -{ - struct tsocket_context_bsd *bsds = talloc_get_type(sock->private_data, - struct tsocket_context_bsd); - struct sockaddr *sa = NULL; - socklen_t sa_len = 0; - ssize_t ret; - - if (remote) { - struct tsocket_address_bsd *bsda = - talloc_get_type(remote->private_data, - struct tsocket_address_bsd); - - sa = &bsda->u.sa; - sa_len = sizeof(bsda->u.ss); - } - - ret = sendto(bsds->fd, data, len, 0, sa, sa_len); - - return ret; -} - static int tsocket_context_bsd_get_status(const struct tsocket_context *sock) { struct tsocket_context_bsd *bsds = talloc_get_type(sock->private_data, @@ -1272,8 +1202,6 @@ static const struct tsocket_context_ops tsocket_context_bsd_ops = { .pending_data = tsocket_context_bsd_pending_data, .readv_data = tsocket_context_bsd_readv_data, .writev_data = tsocket_context_bsd_writev_data, - .recvfrom_data = tsocket_context_bsd_recvfrom_data, - .sendto_data = tsocket_context_bsd_sendto_data, .get_status = tsocket_context_bsd_get_status, .get_local_address = tsocket_context_bsd_get_local_address, diff --git a/lib/tsocket/tsocket_internal.h b/lib/tsocket/tsocket_internal.h index d1f240eba0..893394405f 100644 --- a/lib/tsocket/tsocket_internal.h +++ b/lib/tsocket/tsocket_internal.h @@ -57,14 +57,6 @@ struct tsocket_context_ops { int (*writev_data)(struct tsocket_context *sock, const struct iovec *vector, size_t count); - ssize_t (*recvfrom_data)(struct tsocket_context *sock, - uint8_t *data, size_t len, - TALLOC_CTX *addr_ctx, - struct tsocket_address **remote_addr); - ssize_t (*sendto_data)(struct tsocket_context *sock, - const uint8_t *data, size_t len, - const struct tsocket_address *remote_addr); - /* info */ int (*get_status)(const struct tsocket_context *sock); int (*get_local_address)(const struct tsocket_context *sock, diff --git a/lib/tsocket/tsocket_recvfrom.c b/lib/tsocket/tsocket_recvfrom.c deleted file mode 100644 index 467738cfc2..0000000000 --- a/lib/tsocket/tsocket_recvfrom.c +++ /dev/null @@ -1,164 +0,0 @@ -/* - Unix SMB/CIFS implementation. - - Copyright (C) Stefan Metzmacher 2009 - - ** NOTE! The following LGPL license applies to the tevent - ** library. This does NOT imply that all of Samba is released - ** under the LGPL - - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation; either - version 3 of the License, or (at your option) any later version. - - This library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, see . -*/ - -#include "replace.h" -#include "system/network.h" -#include "tsocket.h" -#include "tsocket_internal.h" - -struct tsocket_recvfrom_state { - /* this structs are owned by the caller */ - struct { - struct tsocket_context *sock; - } caller; - - uint8_t *buf; - size_t len; - struct tsocket_address *src; -}; - -static int tsocket_recvfrom_state_destructor(struct tsocket_recvfrom_state *state) -{ - if (state->caller.sock) { - tsocket_set_readable_handler(state->caller.sock, NULL, NULL); - } - ZERO_STRUCT(state->caller); - - return 0; -} - -static void tsocket_recvfrom_handler(struct tsocket_context *sock, - void *private_data); - -struct tevent_req *tsocket_recvfrom_send(struct tsocket_context *sock, - TALLOC_CTX *mem_ctx) -{ - struct tevent_req *req; - struct tsocket_recvfrom_state *state; - int ret; - int err; - bool dummy; - - req = tevent_req_create(mem_ctx, &state, - struct tsocket_recvfrom_state); - if (!req) { - return NULL; - } - - state->caller.sock = sock; - state->buf = NULL; - state->len = 0; - state->src = NULL; - - talloc_set_destructor(state, tsocket_recvfrom_state_destructor); - - ret = tsocket_set_readable_handler(sock, - tsocket_recvfrom_handler, - req); - err = tsocket_error_from_errno(ret, errno, &dummy); - if (tevent_req_error(req, err)) { - goto post; - } - - return req; - - post: - return tevent_req_post(req, sock->event.ctx); -} - -static void tsocket_recvfrom_handler(struct tsocket_context *sock, - void *private_data) -{ - struct tevent_req *req = talloc_get_type(private_data, - struct tevent_req); - struct tsocket_recvfrom_state *state = tevent_req_data(req, - struct tsocket_recvfrom_state); - ssize_t ret; - int err; - bool retry; - - ret = tsocket_pending(state->caller.sock); - if (ret == 0) { - /* retry later */ - return; - } - err = tsocket_error_from_errno(ret, errno, &retry); - if (retry) { - /* retry later */ - return; - } - if (tevent_req_error(req, err)) { - return; - } - - state->buf = talloc_array(state, uint8_t, ret); - if (tevent_req_nomem(state->buf, req)) { - return; - } - state->len = ret; - - ret = tsocket_recvfrom(state->caller.sock, - state->buf, - state->len, - state, - &state->src); - err = tsocket_error_from_errno(ret, errno, &retry); - if (retry) { - /* retry later */ - return; - } - if (tevent_req_error(req, err)) { - return; - } - - if (ret != state->len) { - tevent_req_error(req, EIO); - return; - } - - tevent_req_done(req); -} - -ssize_t tsocket_recvfrom_recv(struct tevent_req *req, - int *perrno, - TALLOC_CTX *mem_ctx, - uint8_t **buf, - struct tsocket_address **src) -{ - struct tsocket_recvfrom_state *state = tevent_req_data(req, - struct tsocket_recvfrom_state); - ssize_t ret; - - ret = tsocket_simple_int_recv(req, perrno); - if (ret == 0) { - *buf = talloc_move(mem_ctx, &state->buf); - ret = state->len; - if (src) { - *src = talloc_move(mem_ctx, &state->src); - } - } - - tevent_req_received(req); - return ret; -} - diff --git a/lib/tsocket/tsocket_sendto.c b/lib/tsocket/tsocket_sendto.c deleted file mode 100644 index 9c0a76bf16..0000000000 --- a/lib/tsocket/tsocket_sendto.c +++ /dev/null @@ -1,271 +0,0 @@ -/* - Unix SMB/CIFS implementation. - - Copyright (C) Stefan Metzmacher 2009 - - ** NOTE! The following LGPL license applies to the tevent - ** library. This does NOT imply that all of Samba is released - ** under the LGPL - - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation; either - version 3 of the License, or (at your option) any later version. - - This library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, see . -*/ - -#include "replace.h" -#include "system/network.h" -#include "tsocket.h" -#include "tsocket_internal.h" - -struct tsocket_sendto_state { - /* this structs are owned by the caller */ - struct { - struct tsocket_context *sock; - const uint8_t *buf; - size_t len; - const struct tsocket_address *dst; - } caller; - - ssize_t ret; -}; - -static int tsocket_sendto_state_destructor(struct tsocket_sendto_state *state) -{ - if (state->caller.sock) { - tsocket_set_writeable_handler(state->caller.sock, NULL, NULL); - } - ZERO_STRUCT(state->caller); - - return 0; -} - -static void tsocket_sendto_handler(struct tsocket_context *sock, - void *private_data); - -struct tevent_req *tsocket_sendto_send(struct tsocket_context *sock, - TALLOC_CTX *mem_ctx, - const uint8_t *buf, - size_t len, - const struct tsocket_address *dst) -{ - struct tevent_req *req; - struct tsocket_sendto_state *state; - int ret; - int err; - bool dummy; - - req = tevent_req_create(mem_ctx, &state, - struct tsocket_sendto_state); - if (!req) { - return NULL; - } - - state->caller.sock = sock; - state->caller.buf = buf; - state->caller.len = len; - state->caller.dst = dst; - state->ret = -1; - - /* - * this is a fast path, not waiting for the - * socket to become explicit writeable gains - * about 10%-20% performance in benchmark tests. - */ - tsocket_sendto_handler(sock, req); - if (!tevent_req_is_in_progress(req)) { - goto post; - } - - talloc_set_destructor(state, tsocket_sendto_state_destructor); - - ret = tsocket_set_writeable_handler(sock, - tsocket_sendto_handler, - req); - err = tsocket_error_from_errno(ret, errno, &dummy); - if (tevent_req_error(req, err)) { - goto post; - } - - return req; - - post: - return tevent_req_post(req, sock->event.ctx); -} - -static void tsocket_sendto_handler(struct tsocket_context *sock, - void *private_data) -{ - struct tevent_req *req = talloc_get_type(private_data, - struct tevent_req); - struct tsocket_sendto_state *state = tevent_req_data(req, - struct tsocket_sendto_state); - ssize_t ret; - int err; - bool retry; - - ret = tsocket_sendto(state->caller.sock, - state->caller.buf, - state->caller.len, - state->caller.dst); - err = tsocket_error_from_errno(ret, errno, &retry); - if (retry) { - /* retry later */ - return; - } - if (tevent_req_error(req, err)) { - return; - } - - state->ret = ret; - - tevent_req_done(req); -} - -ssize_t tsocket_sendto_recv(struct tevent_req *req, int *perrno) -{ - struct tsocket_sendto_state *state = tevent_req_data(req, - struct tsocket_sendto_state); - ssize_t ret; - - ret = tsocket_simple_int_recv(req, perrno); - if (ret == 0) { - ret = state->ret; - } - - tevent_req_received(req); - return ret; -} - -struct tsocket_sendto_queue_state { - /* this structs are owned by the caller */ - struct { - struct tsocket_context *sock; - const uint8_t *buf; - size_t len; - const struct tsocket_address *dst; - } caller; - ssize_t ret; -}; - -static void tsocket_sendto_queue_trigger(struct tevent_req *req, - void *private_data); -static void tsocket_sendto_queue_done(struct tevent_req *subreq); - -/** - * @brief Queue a dgram blob for sending through the socket - * @param[in] mem_ctx The memory context for the result - * @param[in] sock The socket to send the message buffer - * @param[in] queue The existing dgram queue - * @param[in] buf The message buffer - * @param[in] len The message length - * @param[in] dst The destination socket address - * @retval The async request handle - * - * This function queues a blob for sending to destination through an existing - * dgram socket. The async callback is triggered when the whole blob is - * delivered to the underlying system socket. - * - * The caller needs to make sure that all non-scalar input parameters hang - * arround for the whole lifetime of the request. - */ -struct tevent_req *tsocket_sendto_queue_send(TALLOC_CTX *mem_ctx, - struct tsocket_context *sock, - struct tevent_queue *queue, - const uint8_t *buf, - size_t len, - struct tsocket_address *dst) -{ - struct tevent_req *req; - struct tsocket_sendto_queue_state *state; - bool ok; - - req = tevent_req_create(mem_ctx, &state, - struct tsocket_sendto_queue_state); - if (!req) { - return NULL; - } - - state->caller.sock = sock; - state->caller.buf = buf; - state->caller.len = len; - state->caller.dst = dst; - state->ret = -1; - - ok = tevent_queue_add(queue, - sock->event.ctx, - req, - tsocket_sendto_queue_trigger, - NULL); - if (!ok) { - tevent_req_nomem(NULL, req); - goto post; - } - - return req; - - post: - return tevent_req_post(req, sock->event.ctx); -} - -static void tsocket_sendto_queue_trigger(struct tevent_req *req, - void *private_data) -{ - struct tsocket_sendto_queue_state *state = tevent_req_data(req, - struct tsocket_sendto_queue_state); - struct tevent_req *subreq; - - subreq = tsocket_sendto_send(state->caller.sock, - state, - state->caller.buf, - state->caller.len, - state->caller.dst); - if (tevent_req_nomem(subreq, req)) { - return; - } - tevent_req_set_callback(subreq, tsocket_sendto_queue_done ,req); -} - -static void tsocket_sendto_queue_done(struct tevent_req *subreq) -{ - struct tevent_req *req = tevent_req_callback_data(subreq, - struct tevent_req); - struct tsocket_sendto_queue_state *state = tevent_req_data(req, - struct tsocket_sendto_queue_state); - ssize_t ret; - int sys_errno; - - ret = tsocket_sendto_recv(subreq, &sys_errno); - talloc_free(subreq); - if (ret == -1) { - tevent_req_error(req, sys_errno); - return; - } - state->ret = ret; - - tevent_req_done(req); -} - -ssize_t tsocket_sendto_queue_recv(struct tevent_req *req, int *perrno) -{ - struct tsocket_sendto_queue_state *state = tevent_req_data(req, - struct tsocket_sendto_queue_state); - ssize_t ret; - - ret = tsocket_simple_int_recv(req, perrno); - if (ret == 0) { - ret = state->ret; - } - - tevent_req_received(req); - return ret; -} - -- cgit