summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/async_req/async_sock.c118
-rw-r--r--lib/async_req/async_sock.h10
2 files changed, 128 insertions, 0 deletions
diff --git a/lib/async_req/async_sock.c b/lib/async_req/async_sock.c
index 67776ff67f..02e4c9eb4b 100644
--- a/lib/async_req/async_sock.c
+++ b/lib/async_req/async_sock.c
@@ -765,3 +765,121 @@ ssize_t writev_recv(struct tevent_req *req, int *perrno)
}
return state->total_size;
}
+
+struct read_packet_state {
+ int fd;
+ uint8_t *buf;
+ size_t nread;
+ ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
+ void *private_data;
+};
+
+static void read_packet_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags, void *private_data);
+
+struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ int fd, size_t initial,
+ ssize_t (*more)(uint8_t *buf,
+ size_t buflen,
+ void *private_data),
+ void *private_data)
+{
+ struct tevent_req *result;
+ struct read_packet_state *state;
+ struct tevent_fd *fde;
+
+ result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
+ if (result == NULL) {
+ return NULL;
+ }
+ state->fd = fd;
+ state->nread = 0;
+ state->more = more;
+ state->private_data = private_data;
+
+ state->buf = talloc_array(state, uint8_t, initial);
+ if (state->buf == NULL) {
+ goto fail;
+ }
+
+ fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
+ result);
+ if (fde == NULL) {
+ goto fail;
+ }
+ return result;
+ fail:
+ TALLOC_FREE(result);
+ return NULL;
+}
+
+static void read_packet_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags, void *private_data)
+{
+ struct tevent_req *req = talloc_get_type_abort(
+ private_data, struct tevent_req);
+ struct read_packet_state *state = talloc_get_type_abort(
+ req->private_state, struct read_packet_state);
+ size_t total = talloc_get_size(state->buf);
+ ssize_t nread, more;
+ uint8_t *tmp;
+
+ nread = read(state->fd, state->buf+state->nread, total-state->nread);
+ if (nread == -1) {
+ tevent_req_error(req, errno);
+ return;
+ }
+ if (nread == 0) {
+ tevent_req_error(req, EPIPE);
+ return;
+ }
+
+ state->nread += nread;
+ if (state->nread < total) {
+ /* Come back later */
+ return;
+ }
+
+ /*
+ * We got what was initially requested. See if "more" asks for -- more.
+ */
+ if (state->more == NULL) {
+ /* Nobody to ask, this is a async read_data */
+ tevent_req_done(req);
+ return;
+ }
+
+ more = state->more(state->buf, total, state->private_data);
+ if (more == -1) {
+ /* We got an invalid packet, tell the caller */
+ tevent_req_error(req, EIO);
+ return;
+ }
+ if (more == 0) {
+ /* We're done, full packet received */
+ tevent_req_done(req);
+ return;
+ }
+
+ tmp = TALLOC_REALLOC_ARRAY(state, state->buf, uint8_t, total+more);
+ if (tevent_req_nomem(tmp, req)) {
+ return;
+ }
+ state->buf = tmp;
+}
+
+ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+ uint8_t **pbuf, int *perrno)
+{
+ struct read_packet_state *state = talloc_get_type_abort(
+ req->private_state, struct read_packet_state);
+
+ if (tevent_req_is_unix_error(req, perrno)) {
+ return -1;
+ }
+ *pbuf = talloc_move(mem_ctx, &state->buf);
+ return talloc_get_size(*pbuf);
+}
diff --git a/lib/async_req/async_sock.h b/lib/async_req/async_sock.h
index 6a862c45c6..0cf4e4ecf5 100644
--- a/lib/async_req/async_sock.h
+++ b/lib/async_req/async_sock.h
@@ -51,4 +51,14 @@ struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
int fd, struct iovec *iov, int count);
ssize_t writev_recv(struct tevent_req *req, int *perrno);
+struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ int fd, size_t initial,
+ ssize_t (*more)(uint8_t *buf,
+ size_t buflen,
+ void *private_data),
+ void *private_data);
+ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+ uint8_t **pbuf, int *perrno);
+
#endif