/* Unix SMB/CIFS implementation. Copyright (C) Stefan Metzmacher 2009 ** NOTE! The following LGPL license applies to the tevent ** library. This does NOT imply that all of Samba is released ** under the LGPL This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, see <http://www.gnu.org/licenses/>. */ #include "replace.h" #include "system/filesys.h" #include "tsocket.h" #include "tsocket_internal.h" struct tdgram_sendto_queue_state { /* this structs are owned by the caller */ struct { struct tevent_context *ev; struct tdgram_context *dgram; const uint8_t *buf; size_t len; const struct tsocket_address *dst; } caller; ssize_t ret; }; static void tdgram_sendto_queue_trigger(struct tevent_req *req, void *private_data); static void tdgram_sendto_queue_done(struct tevent_req *subreq); /** * @brief Queue a dgram blob for sending through the socket * @param[in] mem_ctx The memory context for the result * @param[in] ev The event context the operation should work on * @param[in] dgram The tdgram_context to send the message buffer * @param[in] queue The existing dgram queue * @param[in] buf The message buffer * @param[in] len The message length * @param[in] dst The destination socket address * @retval The async request handle * * This function queues a blob for sending to destination through an existing * dgram socket. The async callback is triggered when the whole blob is * delivered to the underlying system socket. * * The caller needs to make sure that all non-scalar input parameters hang * arround for the whole lifetime of the request. */ struct tevent_req *tdgram_sendto_queue_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct tdgram_context *dgram, struct tevent_queue *queue, const uint8_t *buf, size_t len, struct tsocket_address *dst) { struct tevent_req *req; struct tdgram_sendto_queue_state *state; bool ok; req = tevent_req_create(mem_ctx, &state, struct tdgram_sendto_queue_state); if (!req) { return NULL; } state->caller.ev = ev; state->caller.dgram = dgram; state->caller.buf = buf; state->caller.len = len; state->caller.dst = dst; state->ret = -1; ok = tevent_queue_add(queue, ev, req, tdgram_sendto_queue_trigger, NULL); if (!ok) { tevent_req_nomem(NULL, req); goto post; } return req; post: tevent_req_post(req, ev); return req; } static void tdgram_sendto_queue_trigger(struct tevent_req *req, void *private_data) { struct tdgram_sendto_queue_state *state = tevent_req_data(req, struct tdgram_sendto_queue_state); struct tevent_req *subreq; subreq = tdgram_sendto_send(state, state->caller.ev, state->caller.dgram, state->caller.buf, state->caller.len, state->caller.dst); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, tdgram_sendto_queue_done, req); } static void tdgram_sendto_queue_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data(subreq, struct tevent_req); struct tdgram_sendto_queue_state *state = tevent_req_data(req, struct tdgram_sendto_queue_state); ssize_t ret; int sys_errno; ret = tdgram_sendto_recv(subreq, &sys_errno); talloc_free(subreq); if (ret == -1) { tevent_req_error(req, sys_errno); return; } state->ret = ret; tevent_req_done(req); } ssize_t tdgram_sendto_queue_recv(struct tevent_req *req, int *perrno) { struct tdgram_sendto_queue_state *state = tevent_req_data(req, struct tdgram_sendto_queue_state); ssize_t ret; ret = tsocket_simple_int_recv(req, perrno); if (ret == 0) { ret = state->ret; } tevent_req_received(req); return ret; } struct tstream_readv_pdu_state { /* this structs are owned by the caller */ struct { struct tevent_context *ev; struct tstream_context *stream; tstream_readv_pdu_next_vector_t next_vector_fn; void *next_vector_private; } caller; /* * Each call to the callback resets iov and count * the callback allocated the iov as child of our state, * that means we are allowed to modify and free it. * * we should call the callback every time we filled the given * vector and ask for a new vector. We return if the callback * ask for 0 bytes. */ struct iovec *vector; size_t count; /* * the total number of bytes we read, * the return value of the _recv function */ int total_read; }; static void tstream_readv_pdu_ask_for_next_vector(struct tevent_req *req); static void tstream_readv_pdu_readv_done(struct tevent_req *subreq); struct tevent_req *tstream_readv_pdu_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct tstream_context *stream, tstream_readv_pdu_next_vector_t next_vector_fn, void *next_vector_private) { struct tevent_req *req; struct tstream_readv_pdu_state *state; req = tevent_req_create(mem_ctx, &state, struct tstream_readv_pdu_state); if (!req) { return NULL; } state->caller.ev = ev; state->caller.stream = stream; state->caller.next_vector_fn = next_vector_fn; state->caller.next_vector_private = next_vector_private; state->vector = NULL; state->count = 0; state->total_read = 0; tstream_readv_pdu_ask_for_next_vector(req); if (!tevent_req_is_in_progress(req)) { goto post; } return req; post: return tevent_req_post(req, ev); } static void tstream_readv_pdu_ask_for_next_vector(struct tevent_req *req) { struct tstream_readv_pdu_state *state = tevent_req_data(req, struct tstream_readv_pdu_state); int ret; size_t to_read = 0; size_t i; struct tevent_req *subreq; TALLOC_FREE(state->vector); state->count = 0; ret = state->caller.next_vector_fn(state->caller.stream, state->caller.next_vector_private, state, &state->vector, &state->count); if (ret == -1) { tevent_req_error(req, errno); return; } if (state->count == 0) { tevent_req_done(req); return; } for (i=0; i < state->count; i++) { size_t tmp = to_read; tmp += state->vector[i].iov_len; if (tmp < to_read) { tevent_req_error(req, EMSGSIZE); return; } to_read = tmp; } /* * this is invalid the next vector function should have * reported count == 0. */ if (to_read == 0) { tevent_req_error(req, EINVAL); return; } if (state->total_read + to_read < state->total_read) { tevent_req_error(req, EMSGSIZE); return; } subreq = tstream_readv_send(state, state->caller.ev, state->caller.stream, state->vector, state->count); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, tstream_readv_pdu_readv_done, req); } static void tstream_readv_pdu_readv_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data(subreq, struct tevent_req); struct tstream_readv_pdu_state *state = tevent_req_data(req, struct tstream_readv_pdu_state); int ret; int sys_errno; ret = tstream_readv_recv(subreq, &sys_errno); if (ret == -1) { tevent_req_error(req, sys_errno); return; } state->total_read += ret; /* ask the callback for a new vector we should fill */ tstream_readv_pdu_ask_for_next_vector(req); } int tstream_readv_pdu_recv(struct tevent_req *req, int *perrno) { struct tstream_readv_pdu_state *state = tevent_req_data(req, struct tstream_readv_pdu_state); int ret; ret = tsocket_simple_int_recv(req, perrno); if (ret == 0) { ret = state->total_read; } tevent_req_received(req); return ret; } struct tstream_readv_pdu_queue_state { /* this structs are owned by the caller */ struct { struct tevent_context *ev; struct tstream_context *stream; tstream_readv_pdu_next_vector_t next_vector_fn; void *next_vector_private; } caller; int ret; }; static void tstream_readv_pdu_queue_trigger(struct tevent_req *req, void *private_data); static void tstream_readv_pdu_queue_done(struct tevent_req *subreq); /** * @brief Queue a dgram blob for sending through the socket * @param[in] mem_ctx The memory context for the result * @param[in] ev The tevent_context to run on * @param[in] stream The stream to send data through * @param[in] queue The existing send queue * @param[in] next_vector_fn The next vector function * @param[in] next_vector_private The private_data of the next vector function * @retval The async request handle * * This function queues a blob for sending to destination through an existing * dgram socket. The async callback is triggered when the whole blob is * delivered to the underlying system socket. * * The caller needs to make sure that all non-scalar input parameters hang * arround for the whole lifetime of the request. */ struct tevent_req *tstream_readv_pdu_queue_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct tstream_context *stream, struct tevent_queue *queue, tstream_readv_pdu_next_vector_t next_vector_fn, void *next_vector_private) { struct tevent_req *req; struct tstream_readv_pdu_queue_state *state; bool ok; req = tevent_req_create(mem_ctx, &state, struct tstream_readv_pdu_queue_state); if (!req) { return NULL; } state->caller.ev = ev; state->caller.stream = stream; state->caller.next_vector_fn = next_vector_fn; state->caller.next_vector_private = next_vector_private; state->ret = -1; ok = tevent_queue_add(queue, ev, req, tstream_readv_pdu_queue_trigger, NULL); if (!ok) { tevent_req_nomem(NULL, req); goto post; } return req; post: return tevent_req_post(req, ev); } static void tstream_readv_pdu_queue_trigger(struct tevent_req *req, void *private_data) { struct tstream_readv_pdu_queue_state *state = tevent_req_data(req, struct tstream_readv_pdu_queue_state); struct tevent_req *subreq; subreq = tstream_readv_pdu_send(state, state->caller.ev, state->caller.stream, state->caller.next_vector_fn, state->caller.next_vector_private); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, tstream_readv_pdu_queue_done ,req); } static void tstream_readv_pdu_queue_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data(subreq, struct tevent_req); struct tstream_readv_pdu_queue_state *state = tevent_req_data(req, struct tstream_readv_pdu_queue_state); int ret; int sys_errno; ret = tstream_readv_pdu_recv(subreq, &sys_errno); talloc_free(subreq); if (ret == -1) { tevent_req_error(req, sys_errno); return; } state->ret = ret; tevent_req_done(req); } int tstream_readv_pdu_queue_recv(struct tevent_req *req, int *perrno) { struct tstream_readv_pdu_queue_state *state = tevent_req_data(req, struct tstream_readv_pdu_queue_state); int ret; ret = tsocket_simple_int_recv(req, perrno); if (ret == 0) { ret = state->ret; } tevent_req_received(req); return ret; } struct tstream_writev_queue_state { /* this structs are owned by the caller */ struct { struct tevent_context *ev; struct tstream_context *stream; const struct iovec *vector; size_t count; } caller; int ret; }; static void tstream_writev_queue_trigger(struct tevent_req *req, void *private_data); static void tstream_writev_queue_done(struct tevent_req *subreq); /** * @brief Queue a dgram blob for sending through the socket * @param[in] mem_ctx The memory context for the result * @param[in] ev The tevent_context to run on * @param[in] stream The stream to send data through * @param[in] queue The existing send queue * @param[in] vector The iovec vector so write * @param[in] count The size of the vector * @retval The async request handle * * This function queues a blob for sending to destination through an existing * dgram socket. The async callback is triggered when the whole blob is * delivered to the underlying system socket. * * The caller needs to make sure that all non-scalar input parameters hang * arround for the whole lifetime of the request. */ struct tevent_req *tstream_writev_queue_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct tstream_context *stream, struct tevent_queue *queue, const struct iovec *vector, size_t count) { struct tevent_req *req; struct tstream_writev_queue_state *state; bool ok; req = tevent_req_create(mem_ctx, &state, struct tstream_writev_queue_state); if (!req) { return NULL; } state->caller.ev = ev; state->caller.stream = stream; state->caller.vector = vector; state->caller.count = count; state->ret = -1; ok = tevent_queue_add(queue, ev, req, tstream_writev_queue_trigger, NULL); if (!ok) { tevent_req_nomem(NULL, req); goto post; } return req; post: return tevent_req_post(req, ev); } static void tstream_writev_queue_trigger(struct tevent_req *req, void *private_data) { struct tstream_writev_queue_state *state = tevent_req_data(req, struct tstream_writev_queue_state); struct tevent_req *subreq; subreq = tstream_writev_send(state, state->caller.ev, state->caller.stream, state->caller.vector, state->caller.count); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, tstream_writev_queue_done ,req); } static void tstream_writev_queue_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data(subreq, struct tevent_req); struct tstream_writev_queue_state *state = tevent_req_data(req, struct tstream_writev_queue_state); int ret; int sys_errno; ret = tstream_writev_recv(subreq, &sys_errno); talloc_free(subreq); if (ret == -1) { tevent_req_error(req, sys_errno); return; } state->ret = ret; tevent_req_done(req); } int tstream_writev_queue_recv(struct tevent_req *req, int *perrno) { struct tstream_writev_queue_state *state = tevent_req_data(req, struct tstream_writev_queue_state); int ret; ret = tsocket_simple_int_recv(req, perrno); if (ret == 0) { ret = state->ret; } tevent_req_received(req); return ret; }