summaryrefslogtreecommitdiff
path: root/lib/async_req/async_sock.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/async_req/async_sock.c')
-rw-r--r--lib/async_req/async_sock.c583
1 files changed, 354 insertions, 229 deletions
diff --git a/lib/async_req/async_sock.c b/lib/async_req/async_sock.c
index 02ae880683..3563421e0e 100644
--- a/lib/async_req/async_sock.c
+++ b/lib/async_req/async_sock.c
@@ -22,6 +22,7 @@
#include "lib/tevent/tevent.h"
#include "lib/async_req/async_req.h"
#include "lib/async_req/async_sock.h"
+#include "lib/util/tevent_unix.h"
#include <fcntl.h>
#ifndef TALLOC_FREE
@@ -33,10 +34,7 @@
*/
enum async_syscall_type {
ASYNC_SYSCALL_SEND,
- ASYNC_SYSCALL_SENDALL,
ASYNC_SYSCALL_RECV,
- ASYNC_SYSCALL_RECVALL,
- ASYNC_SYSCALL_CONNECT
};
/**
@@ -54,36 +52,12 @@ struct async_syscall_state {
size_t length;
int flags;
} param_send;
- struct param_sendall {
- int fd;
- const void *buffer;
- size_t length;
- int flags;
- size_t sent;
- } param_sendall;
struct param_recv {
int fd;
void *buffer;
size_t length;
int flags;
} param_recv;
- struct param_recvall {
- int fd;
- void *buffer;
- size_t length;
- int flags;
- size_t received;
- } param_recvall;
- struct param_connect {
- /**
- * connect needs to be done on a nonblocking
- * socket. Keep the old flags around
- */
- long old_sockflags;
- int fd;
- const struct sockaddr *address;
- socklen_t address_len;
- } param_connect;
} param;
union {
@@ -337,109 +311,6 @@ struct async_req *async_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
}
/**
- * fde event handler for the "sendall" syscall group
- * @param[in] ev The event context that sent us here
- * @param[in] fde The file descriptor event associated with the send
- * @param[in] flags Can only be TEVENT_FD_WRITE here
- * @param[in] priv private data, "struct async_req *" in this case
- */
-
-static void async_sendall_callback(struct tevent_context *ev,
- struct tevent_fd *fde, uint16_t flags,
- void *priv)
-{
- struct async_req *req = talloc_get_type_abort(
- priv, struct async_req);
- struct async_syscall_state *state = talloc_get_type_abort(
- req->private_data, struct async_syscall_state);
- struct param_sendall *p = &state->param.param_sendall;
-
- if (state->syscall_type != ASYNC_SYSCALL_SENDALL) {
- async_req_error(req, EIO);
- return;
- }
-
- state->result.result_ssize_t = send(p->fd,
- (const char *)p->buffer + p->sent,
- p->length - p->sent, p->flags);
- state->sys_errno = errno;
-
- if (state->result.result_ssize_t == -1) {
- async_req_error(req, state->sys_errno);
- return;
- }
-
- if (state->result.result_ssize_t == 0) {
- async_req_error(req, EOF);
- return;
- }
-
- p->sent += state->result.result_ssize_t;
- if (p->sent > p->length) {
- async_req_error(req, EIO);
- return;
- }
-
- if (p->sent == p->length) {
- TALLOC_FREE(state->fde);
- async_req_done(req);
- }
-}
-
-/**
- * @brief Send all bytes to a socket
- * @param[in] mem_ctx The memory context to hang the result off
- * @param[in] ev The event context to work from
- * @param[in] fd The socket to send to
- * @param[in] buffer The buffer to send
- * @param[in] length How many bytes to send
- * @param[in] flags flags passed to send(2)
- *
- * async_sendall calls send(2) as long as it is necessary to send all of the
- * "length" bytes
- */
-
-struct async_req *sendall_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
- int fd, const void *buffer, size_t length,
- int flags)
-{
- struct async_req *result;
- struct async_syscall_state *state;
-
- result = async_fde_syscall_new(
- mem_ctx, ev, ASYNC_SYSCALL_SENDALL,
- fd, TEVENT_FD_WRITE, async_sendall_callback,
- &state);
- if (result == NULL) {
- return NULL;
- }
-
- state->param.param_sendall.fd = fd;
- state->param.param_sendall.buffer = buffer;
- state->param.param_sendall.length = length;
- state->param.param_sendall.flags = flags;
- state->param.param_sendall.sent = 0;
-
- return result;
-}
-
-ssize_t sendall_recv(struct async_req *req, int *perr)
-{
- struct async_syscall_state *state = talloc_get_type_abort(
- req->private_data, struct async_syscall_state);
- int err;
-
- err = async_req_simple_recv_errno(req);
-
- if (err != 0) {
- *perr = err;
- return -1;
- }
-
- return state->result.result_ssize_t;
-}
-
-/**
* fde event handler for the "recv" syscall
* @param[in] ev The event context that sent us here
* @param[in] fde The file descriptor event associated with the recv
@@ -507,106 +378,138 @@ struct async_req *async_recv(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
return result;
}
-/**
- * fde event handler for the "recvall" syscall group
- * @param[in] ev The event context that sent us here
- * @param[in] fde The file descriptor event associated with the recv
- * @param[in] flags Can only be TEVENT_FD_READ here
- * @param[in] priv private data, "struct async_req *" in this case
- */
+struct async_send_state {
+ int fd;
+ const void *buf;
+ size_t len;
+ int flags;
+ ssize_t sent;
+};
-static void async_recvall_callback(struct tevent_context *ev,
- struct tevent_fd *fde, uint16_t flags,
- void *priv)
+static void async_send_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags, void *private_data);
+
+struct tevent_req *async_send_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ int fd, const void *buf, size_t len,
+ int flags)
{
- struct async_req *req = talloc_get_type_abort(
- priv, struct async_req);
- struct async_syscall_state *state = talloc_get_type_abort(
- req->private_data, struct async_syscall_state);
- struct param_recvall *p = &state->param.param_recvall;
+ struct tevent_req *result;
+ struct async_send_state *state;
+ struct tevent_fd *fde;
- if (state->syscall_type != ASYNC_SYSCALL_RECVALL) {
- async_req_error(req, EIO);
- return;
+ result = tevent_req_create(mem_ctx, &state, struct async_send_state);
+ if (result == NULL) {
+ return result;
}
+ state->fd = fd;
+ state->buf = buf;
+ state->len = len;
+ state->flags = flags;
- state->result.result_ssize_t = recv(p->fd,
- (char *)p->buffer + p->received,
- p->length - p->received, p->flags);
- state->sys_errno = errno;
-
- if (state->result.result_ssize_t == -1) {
- async_req_error(req, state->sys_errno);
- return;
+ fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, async_send_handler,
+ result);
+ if (fde == NULL) {
+ TALLOC_FREE(result);
+ return NULL;
}
+ return result;
+}
- if (state->result.result_ssize_t == 0) {
- async_req_error(req, EIO);
+static void async_send_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags, void *private_data)
+{
+ struct tevent_req *req = talloc_get_type_abort(
+ private_data, struct tevent_req);
+ struct async_send_state *state = talloc_get_type_abort(
+ req->private_state, struct async_send_state);
+
+ state->sent = send(state->fd, state->buf, state->len, state->flags);
+ if (state->sent == -1) {
+ tevent_req_error(req, errno);
return;
}
+ tevent_req_done(req);
+}
- p->received += state->result.result_ssize_t;
- if (p->received > p->length) {
- async_req_error(req, EIO);
- return;
- }
+ssize_t async_send_recv(struct tevent_req *req, int *perrno)
+{
+ struct async_send_state *state = talloc_get_type_abort(
+ req->private_state, struct async_send_state);
- if (p->received == p->length) {
- TALLOC_FREE(state->fde);
- async_req_done(req);
+ if (tevent_req_is_unix_error(req, perrno)) {
+ return -1;
}
+ return state->sent;
}
-/**
- * Receive a specified number of bytes from a socket
- * @param[in] mem_ctx The memory context to hang the result off
- * @param[in] ev The event context to work from
- * @param[in] fd The socket to recv from
- * @param[in] buffer The buffer to recv into
- * @param[in] length How many bytes to recv
- * @param[in] flags flags passed to recv(2)
- *
- * async_recvall will call recv(2) until "length" bytes are received
- */
+struct async_recv_state {
+ int fd;
+ void *buf;
+ size_t len;
+ int flags;
+ ssize_t received;
+};
-struct async_req *recvall_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
- int fd, void *buffer, size_t length,
- int flags)
+static void async_recv_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags, void *private_data);
+
+struct tevent_req *async_recv_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ int fd, void *buf, size_t len, int flags)
{
- struct async_req *result;
- struct async_syscall_state *state;
+ struct tevent_req *result;
+ struct async_recv_state *state;
+ struct tevent_fd *fde;
- result = async_fde_syscall_new(
- mem_ctx, ev, ASYNC_SYSCALL_RECVALL,
- fd, TEVENT_FD_READ, async_recvall_callback,
- &state);
+ result = tevent_req_create(mem_ctx, &state, struct async_recv_state);
if (result == NULL) {
- return NULL;
+ return result;
}
+ state->fd = fd;
+ state->buf = buf;
+ state->len = len;
+ state->flags = flags;
- state->param.param_recvall.fd = fd;
- state->param.param_recvall.buffer = buffer;
- state->param.param_recvall.length = length;
- state->param.param_recvall.flags = flags;
- state->param.param_recvall.received = 0;
-
+ fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, async_recv_handler,
+ result);
+ if (fde == NULL) {
+ TALLOC_FREE(result);
+ return NULL;
+ }
return result;
}
-ssize_t recvall_recv(struct async_req *req, int *perr)
+static void async_recv_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags, void *private_data)
{
- struct async_syscall_state *state = talloc_get_type_abort(
- req->private_data, struct async_syscall_state);
- int err;
+ struct tevent_req *req = talloc_get_type_abort(
+ private_data, struct tevent_req);
+ struct async_recv_state *state = talloc_get_type_abort(
+ req->private_state, struct async_recv_state);
+
+ state->received = recv(state->fd, state->buf, state->len,
+ state->flags);
+ if (state->received == -1) {
+ tevent_req_error(req, errno);
+ return;
+ }
+ tevent_req_done(req);
+}
- err = async_req_simple_recv_errno(req);
+ssize_t async_recv_recv(struct tevent_req *req, int *perrno)
+{
+ struct async_recv_state *state = talloc_get_type_abort(
+ req->private_state, struct async_recv_state);
- if (err != 0) {
- *perr = err;
+ if (tevent_req_is_unix_error(req, perrno)) {
return -1;
}
-
- return state->result.result_ssize_t;
+ return state->received;
}
struct async_connect_state {
@@ -633,17 +536,18 @@ static void async_connect_connected(struct tevent_context *ev,
* connect in an async state. This will be reset when the request is finished.
*/
-struct async_req *async_connect_send(TALLOC_CTX *mem_ctx,
- struct tevent_context *ev,
- int fd, const struct sockaddr *address,
- socklen_t address_len)
+struct tevent_req *async_connect_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ int fd, const struct sockaddr *address,
+ socklen_t address_len)
{
- struct async_req *result;
+ struct tevent_req *result;
struct async_connect_state *state;
struct tevent_fd *fde;
- if (!async_req_setup(mem_ctx, &result, &state,
- struct async_connect_state)) {
+ result = tevent_req_create(
+ mem_ctx, &state, struct async_connect_state);
+ if (result == NULL) {
return NULL;
}
@@ -664,8 +568,8 @@ struct async_req *async_connect_send(TALLOC_CTX *mem_ctx,
state->result = connect(fd, address, address_len);
if (state->result == 0) {
- state->sys_errno = 0;
- goto post_status;
+ errno = 0;
+ goto post_errno;
}
/**
@@ -686,22 +590,20 @@ struct async_req *async_connect_send(TALLOC_CTX *mem_ctx,
fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ | TEVENT_FD_WRITE,
async_connect_connected, result);
if (fde == NULL) {
- state->sys_errno = ENOMEM;
- goto post_status;
+ errno = ENOMEM;
+ goto post_errno;
}
return result;
post_errno:
state->sys_errno = errno;
- post_status:
fcntl(fd, F_SETFL, state->old_sockflags);
- if (!async_post_error(result, ev, state->sys_errno)) {
- goto fail;
+ if (state->sys_errno == 0) {
+ tevent_req_done(result);
+ } else {
+ tevent_req_error(result, state->sys_errno);
}
- return result;
- fail:
- TALLOC_FREE(result);
- return NULL;
+ return tevent_req_post(result, ev);
}
/**
@@ -716,10 +618,10 @@ static void async_connect_connected(struct tevent_context *ev,
struct tevent_fd *fde, uint16_t flags,
void *priv)
{
- struct async_req *req = talloc_get_type_abort(
- priv, struct async_req);
+ struct tevent_req *req = talloc_get_type_abort(
+ priv, struct tevent_req);
struct async_connect_state *state = talloc_get_type_abort(
- req->private_data, struct async_connect_state);
+ req->private_state, struct async_connect_state);
TALLOC_FREE(fde);
@@ -743,27 +645,27 @@ static void async_connect_connected(struct tevent_context *ev,
DEBUG(10, ("connect returned %s\n", strerror(errno)));
fcntl(state->fd, F_SETFL, state->old_sockflags);
- async_req_error(req, state->sys_errno);
+ tevent_req_error(req, state->sys_errno);
return;
}
state->sys_errno = 0;
- async_req_done(req);
+ tevent_req_done(req);
}
-int async_connect_recv(struct async_req *req, int *perrno)
+int async_connect_recv(struct tevent_req *req, int *perrno)
{
struct async_connect_state *state = talloc_get_type_abort(
- req->private_data, struct async_connect_state);
+ req->private_state, struct async_connect_state);
int err;
fcntl(state->fd, F_SETFL, state->old_sockflags);
-
- if (async_req_is_errno(req, &err)) {
+ if (tevent_req_is_unix_error(req, &err)) {
*perrno = err;
return -1;
}
+
if (state->sys_errno == 0) {
return 0;
}
@@ -771,3 +673,226 @@ int async_connect_recv(struct async_req *req, int *perrno)
*perrno = state->sys_errno;
return -1;
}
+
+struct writev_state {
+ struct tevent_context *ev;
+ int fd;
+ struct iovec *iov;
+ int count;
+ size_t total_size;
+};
+
+static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
+ uint16_t flags, void *private_data);
+
+struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+ int fd, struct iovec *iov, int count)
+{
+ struct tevent_req *result;
+ struct writev_state *state;
+ struct tevent_fd *fde;
+
+ result = tevent_req_create(mem_ctx, &state, struct writev_state);
+ if (result == NULL) {
+ return NULL;
+ }
+ state->ev = ev;
+ state->fd = fd;
+ state->total_size = 0;
+ state->count = count;
+ state->iov = (struct iovec *)talloc_memdup(
+ state, iov, sizeof(struct iovec) * count);
+ if (state->iov == NULL) {
+ goto fail;
+ }
+
+ fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, writev_handler,
+ result);
+ if (fde == NULL) {
+ goto fail;
+ }
+ return result;
+
+ fail:
+ TALLOC_FREE(result);
+ return NULL;
+}
+
+static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
+ uint16_t flags, void *private_data)
+{
+ struct tevent_req *req = talloc_get_type_abort(
+ private_data, struct tevent_req);
+ struct writev_state *state = talloc_get_type_abort(
+ req->private_state, struct writev_state);
+ size_t to_write, written;
+ int i;
+
+ to_write = 0;
+
+ for (i=0; i<state->count; i++) {
+ to_write += state->iov[i].iov_len;
+ }
+
+ written = sys_writev(state->fd, state->iov, state->count);
+ if (written == -1) {
+ tevent_req_error(req, errno);
+ return;
+ }
+ if (written == 0) {
+ tevent_req_error(req, EPIPE);
+ return;
+ }
+ state->total_size += written;
+
+ if (written == to_write) {
+ tevent_req_done(req);
+ return;
+ }
+
+ /*
+ * We've written less than we were asked to, drop stuff from
+ * state->iov.
+ */
+
+ while (written > 0) {
+ if (written < state->iov[0].iov_len) {
+ state->iov[0].iov_base =
+ (char *)state->iov[0].iov_base + written;
+ state->iov[0].iov_len -= written;
+ break;
+ }
+ written = state->iov[0].iov_len;
+ state->iov += 1;
+ state->count -= 1;
+ }
+}
+
+ssize_t writev_recv(struct tevent_req *req, int *perrno)
+{
+ struct writev_state *state = talloc_get_type_abort(
+ req->private_state, struct writev_state);
+
+ if (tevent_req_is_unix_error(req, perrno)) {
+ return -1;
+ }
+ return state->total_size;
+}
+
+struct read_packet_state {
+ int fd;
+ uint8_t *buf;
+ size_t nread;
+ ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
+ void *private_data;
+};
+
+static void read_packet_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags, void *private_data);
+
+struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ int fd, size_t initial,
+ ssize_t (*more)(uint8_t *buf,
+ size_t buflen,
+ void *private_data),
+ void *private_data)
+{
+ struct tevent_req *result;
+ struct read_packet_state *state;
+ struct tevent_fd *fde;
+
+ result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
+ if (result == NULL) {
+ return NULL;
+ }
+ state->fd = fd;
+ state->nread = 0;
+ state->more = more;
+ state->private_data = private_data;
+
+ state->buf = talloc_array(state, uint8_t, initial);
+ if (state->buf == NULL) {
+ goto fail;
+ }
+
+ fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
+ result);
+ if (fde == NULL) {
+ goto fail;
+ }
+ return result;
+ fail:
+ TALLOC_FREE(result);
+ return NULL;
+}
+
+static void read_packet_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags, void *private_data)
+{
+ struct tevent_req *req = talloc_get_type_abort(
+ private_data, struct tevent_req);
+ struct read_packet_state *state = talloc_get_type_abort(
+ req->private_state, struct read_packet_state);
+ size_t total = talloc_get_size(state->buf);
+ ssize_t nread, more;
+ uint8_t *tmp;
+
+ nread = read(state->fd, state->buf+state->nread, total-state->nread);
+ if (nread == -1) {
+ tevent_req_error(req, errno);
+ return;
+ }
+ if (nread == 0) {
+ tevent_req_error(req, EPIPE);
+ return;
+ }
+
+ state->nread += nread;
+ if (state->nread < total) {
+ /* Come back later */
+ return;
+ }
+
+ /*
+ * We got what was initially requested. See if "more" asks for -- more.
+ */
+ if (state->more == NULL) {
+ /* Nobody to ask, this is a async read_data */
+ tevent_req_done(req);
+ return;
+ }
+
+ more = state->more(state->buf, total, state->private_data);
+ if (more == -1) {
+ /* We got an invalid packet, tell the caller */
+ tevent_req_error(req, EIO);
+ return;
+ }
+ if (more == 0) {
+ /* We're done, full packet received */
+ tevent_req_done(req);
+ return;
+ }
+
+ tmp = TALLOC_REALLOC_ARRAY(state, state->buf, uint8_t, total+more);
+ if (tevent_req_nomem(tmp, req)) {
+ return;
+ }
+ state->buf = tmp;
+}
+
+ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+ uint8_t **pbuf, int *perrno)
+{
+ struct read_packet_state *state = talloc_get_type_abort(
+ req->private_state, struct read_packet_state);
+
+ if (tevent_req_is_unix_error(req, perrno)) {
+ return -1;
+ }
+ *pbuf = talloc_move(mem_ctx, &state->buf);
+ return talloc_get_size(*pbuf);
+}