From 6ac61e6707079c8339ef8fa5f1c65ab173f3a79a Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Fri, 3 Apr 2009 18:49:24 +0200 Subject: tsocket: add tstream_readv_pdu_queue_send/recv() metze --- lib/tsocket/tsocket.h | 8 +++ lib/tsocket/tsocket_helpers.c | 124 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+) diff --git a/lib/tsocket/tsocket.h b/lib/tsocket/tsocket.h index ed56995572..8b0c80becf 100644 --- a/lib/tsocket/tsocket.h +++ b/lib/tsocket/tsocket.h @@ -214,6 +214,14 @@ struct tevent_req *tstream_readv_pdu_send(TALLOC_CTX *mem_ctx, void *next_vector_private); int tstream_readv_pdu_recv(struct tevent_req *req, int *perrno); +struct tevent_req *tstream_readv_pdu_queue_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tstream_context *stream, + struct tevent_queue *queue, + tstream_readv_pdu_next_vector_t next_vector_fn, + void *next_vector_private); +int tstream_readv_pdu_queue_recv(struct tevent_req *req, int *perrno); + struct tevent_req *tstream_writev_queue_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct tstream_context *stream, diff --git a/lib/tsocket/tsocket_helpers.c b/lib/tsocket/tsocket_helpers.c index 99cb25b396..6c673a354d 100644 --- a/lib/tsocket/tsocket_helpers.c +++ b/lib/tsocket/tsocket_helpers.c @@ -320,6 +320,130 @@ int tstream_readv_pdu_recv(struct tevent_req *req, int *perrno) return ret; } +struct tstream_readv_pdu_queue_state { + /* this structs are owned by the caller */ + struct { + struct tevent_context *ev; + struct tstream_context *stream; + tstream_readv_pdu_next_vector_t next_vector_fn; + void *next_vector_private; + } caller; + int ret; +}; + +static void tstream_readv_pdu_queue_trigger(struct tevent_req *req, + void *private_data); +static void tstream_readv_pdu_queue_done(struct tevent_req *subreq); + +/** + * @brief Queue a dgram blob for sending through the socket + * @param[in] mem_ctx The memory context for the result + * @param[in] ev The tevent_context to run on + * @param[in] stream The stream to send data through + * @param[in] queue The existing send queue + * @param[in] next_vector_fn The next vector function + * @param[in] next_vector_private The private_data of the next vector function + * @retval The async request handle + * + * This function queues a blob for sending to destination through an existing + * dgram socket. The async callback is triggered when the whole blob is + * delivered to the underlying system socket. + * + * The caller needs to make sure that all non-scalar input parameters hang + * arround for the whole lifetime of the request. + */ +struct tevent_req *tstream_readv_pdu_queue_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tstream_context *stream, + struct tevent_queue *queue, + tstream_readv_pdu_next_vector_t next_vector_fn, + void *next_vector_private) +{ + struct tevent_req *req; + struct tstream_readv_pdu_queue_state *state; + bool ok; + + req = tevent_req_create(mem_ctx, &state, + struct tstream_readv_pdu_queue_state); + if (!req) { + return NULL; + } + + state->caller.ev = ev; + state->caller.stream = stream; + state->caller.next_vector_fn = next_vector_fn; + state->caller.next_vector_private = next_vector_private; + state->ret = -1; + + ok = tevent_queue_add(queue, + ev, + req, + tstream_readv_pdu_queue_trigger, + NULL); + if (!ok) { + tevent_req_nomem(NULL, req); + goto post; + } + + return req; + + post: + return tevent_req_post(req, ev); +} + +static void tstream_readv_pdu_queue_trigger(struct tevent_req *req, + void *private_data) +{ + struct tstream_readv_pdu_queue_state *state = tevent_req_data(req, + struct tstream_readv_pdu_queue_state); + struct tevent_req *subreq; + + subreq = tstream_readv_pdu_send(state, + state->caller.ev, + state->caller.stream, + state->caller.next_vector_fn, + state->caller.next_vector_private); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, tstream_readv_pdu_queue_done ,req); +} + +static void tstream_readv_pdu_queue_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data(subreq, + struct tevent_req); + struct tstream_readv_pdu_queue_state *state = tevent_req_data(req, + struct tstream_readv_pdu_queue_state); + int ret; + int sys_errno; + + ret = tstream_readv_pdu_recv(subreq, &sys_errno); + talloc_free(subreq); + if (ret == -1) { + tevent_req_error(req, sys_errno); + return; + } + state->ret = ret; + + tevent_req_done(req); +} + +int tstream_readv_pdu_queue_recv(struct tevent_req *req, int *perrno) +{ + struct tstream_readv_pdu_queue_state *state = tevent_req_data(req, + struct tstream_readv_pdu_queue_state); + int ret; + + ret = tsocket_simple_int_recv(req, perrno); + if (ret == 0) { + ret = state->ret; + } + + tevent_req_received(req); + return ret; +} + struct tstream_writev_queue_state { /* this structs are owned by the caller */ struct { -- cgit