From f6740aa7ad1b553410ffbc9fb54916d6a385a753 Mon Sep 17 00:00:00 2001 From: Volker Lendecke Date: Sat, 17 Jan 2009 13:33:34 +0100 Subject: Make rpc_api_pipe async --- source3/rpc_client/cli_pipe.c | 284 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 284 insertions(+) diff --git a/source3/rpc_client/cli_pipe.c b/source3/rpc_client/cli_pipe.c index 4c0cb78a04..c924436faa 100644 --- a/source3/rpc_client/cli_pipe.c +++ b/source3/rpc_client/cli_pipe.c @@ -1265,6 +1265,8 @@ static NTSTATUS cli_api_pipe(TALLOC_CTX *mem_ctx, struct rpc_pipe_client *cli, ****************************************************************************/ +#if 0 + static NTSTATUS rpc_api_pipe(TALLOC_CTX *mem_ctx, struct rpc_pipe_client *cli, prs_struct *data, /* Outgoing pdu fragment, already formatted for send. */ prs_struct *rbuf, /* Incoming reply - return as an NDR stream. */ @@ -1398,6 +1400,288 @@ static NTSTATUS rpc_api_pipe(TALLOC_CTX *mem_ctx, struct rpc_pipe_client *cli, prs_mem_free(rbuf); return ret; } +#endif + +struct rpc_api_pipe_state { + struct event_context *ev; + struct rpc_pipe_client *cli; + uint8_t expected_pkt_type; + + prs_struct incoming_frag; + struct rpc_hdr_info rhdr; + + prs_struct incoming_pdu; /* Incoming reply */ + uint32_t incoming_pdu_offset; +}; + +static int rpc_api_pipe_state_destructor(struct rpc_api_pipe_state *state) +{ + prs_mem_free(&state->incoming_frag); + prs_mem_free(&state->incoming_pdu); + return 0; +} + +static void rpc_api_pipe_trans_done(struct async_req *subreq); +static void rpc_api_pipe_got_pdu(struct async_req *subreq); + +static struct async_req *rpc_api_pipe_send(TALLOC_CTX *mem_ctx, + struct event_context *ev, + struct rpc_pipe_client *cli, + prs_struct *data, /* Outgoing PDU */ + uint8_t expected_pkt_type) +{ + struct async_req *result, *subreq; + struct rpc_api_pipe_state *state; + NTSTATUS status; + + result = async_req_new(mem_ctx); + if (result == NULL) { + return NULL; + } + state = talloc(result, struct rpc_api_pipe_state); + if (state == NULL) { + goto fail; + } + result->private_data = state; + + state->ev = ev; + state->cli = cli; + state->expected_pkt_type = expected_pkt_type; + state->incoming_pdu_offset = 0; + + prs_init_empty(&state->incoming_frag, state, UNMARSHALL); + + prs_init_empty(&state->incoming_pdu, state, UNMARSHALL); + /* Make incoming_pdu dynamic with no memory. */ + prs_give_memory(&state->incoming_pdu, 0, 0, true); + + talloc_set_destructor(state, rpc_api_pipe_state_destructor); + + /* + * Ensure we're not sending too much. + */ + if (prs_offset(data) > cli->max_xmit_frag) { + status = NT_STATUS_INVALID_PARAMETER; + goto post_status; + } + + DEBUG(5,("rpc_api_pipe: %s\n", rpccli_pipe_txt(debug_ctx(), cli))); + + subreq = cli_api_pipe_send(state, ev, cli, + (uint8_t *)prs_data_p(data), + prs_offset(data), cli->max_recv_frag); + if (subreq == NULL) { + status = NT_STATUS_NO_MEMORY; + goto post_status; + } + subreq->async.fn = rpc_api_pipe_trans_done; + subreq->async.priv = result; + return result; + + post_status: + if (async_post_status(result, ev, status)) { + return result; + } + fail: + TALLOC_FREE(result); + return NULL; +} + +static void rpc_api_pipe_trans_done(struct async_req *subreq) +{ + struct async_req *req = talloc_get_type_abort( + subreq->async.priv, struct async_req); + struct rpc_api_pipe_state *state = talloc_get_type_abort( + req->private_data, struct rpc_api_pipe_state); + NTSTATUS status; + uint8_t *rdata = NULL; + uint32_t rdata_len = 0; + char *rdata_copy; + + status = cli_api_pipe_recv(subreq, state, &rdata, &rdata_len); + TALLOC_FREE(subreq); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(5, ("cli_api_pipe failed: %s\n", nt_errstr(status))); + async_req_error(req, status); + return; + } + + if (rdata == NULL) { + DEBUG(3,("rpc_api_pipe: %s failed to return data.\n", + rpccli_pipe_txt(debug_ctx(), state->cli))); + async_req_done(req); + return; + } + + /* + * Give the memory received from cli_trans as dynamic to the current + * pdu. Duplicating it sucks, but prs_struct doesn't know about talloc + * :-( + */ + rdata_copy = (char *)memdup(rdata, rdata_len); + TALLOC_FREE(rdata); + if (async_req_nomem(rdata_copy, req)) { + return; + } + prs_give_memory(&state->incoming_frag, rdata_copy, rdata_len, true); + + /* Ensure we have enough data for a pdu. */ + subreq = get_complete_frag_send(state, state->ev, state->cli, + &state->rhdr, &state->incoming_frag); + if (async_req_nomem(subreq, req)) { + return; + } + subreq->async.fn = rpc_api_pipe_got_pdu; + subreq->async.priv = req; +} + +static void rpc_api_pipe_got_pdu(struct async_req *subreq) +{ + struct async_req *req = talloc_get_type_abort( + subreq->async.priv, struct async_req); + struct rpc_api_pipe_state *state = talloc_get_type_abort( + req->private_data, struct rpc_api_pipe_state); + NTSTATUS status; + char *rdata = NULL; + uint32_t rdata_len = 0; + + status = get_complete_frag_recv(subreq); + TALLOC_FREE(subreq); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(5, ("get_complete_frag failed: %s\n", + nt_errstr(status))); + async_req_error(req, status); + return; + } + + status = cli_pipe_validate_current_pdu( + state->cli, &state->rhdr, &state->incoming_frag, + state->expected_pkt_type, &rdata, &rdata_len, + &state->incoming_pdu); + + DEBUG(10,("rpc_api_pipe: got frag len of %u at offset %u: %s\n", + (unsigned)prs_data_size(&state->incoming_frag), + (unsigned)state->incoming_pdu_offset, + nt_errstr(status))); + + if (!NT_STATUS_IS_OK(status)) { + async_req_error(req, status); + return; + } + + if ((state->rhdr.flags & RPC_FLG_FIRST) + && (state->rhdr.pack_type[0] == 0)) { + /* + * Set the data type correctly for big-endian data on the + * first packet. + */ + DEBUG(10,("rpc_api_pipe: On %s PDU data format is " + "big-endian.\n", + rpccli_pipe_txt(debug_ctx(), state->cli))); + prs_set_endian_data(&state->incoming_pdu, RPC_BIG_ENDIAN); + } + /* + * Check endianness on subsequent packets. + */ + if (state->incoming_frag.bigendian_data + != state->incoming_pdu.bigendian_data) { + DEBUG(0,("rpc_api_pipe: Error : Endianness changed from %s to " + "%s\n", + state->incoming_pdu.bigendian_data?"big":"little", + state->incoming_frag.bigendian_data?"big":"little")); + async_req_error(req, NT_STATUS_INVALID_PARAMETER); + return; + } + + /* Now copy the data portion out of the pdu into rbuf. */ + if (!prs_force_grow(&state->incoming_pdu, rdata_len)) { + async_req_error(req, NT_STATUS_NO_MEMORY); + return; + } + + memcpy(prs_data_p(&state->incoming_pdu) + state->incoming_pdu_offset, + rdata, (size_t)rdata_len); + state->incoming_pdu_offset += rdata_len; + + status = cli_pipe_reset_current_pdu(state->cli, &state->rhdr, + &state->incoming_frag); + if (!NT_STATUS_IS_OK(status)) { + async_req_error(req, status); + return; + } + + if (state->rhdr.flags & RPC_FLG_LAST) { + DEBUG(10,("rpc_api_pipe: %s returned %u bytes.\n", + rpccli_pipe_txt(debug_ctx(), state->cli), + (unsigned)prs_data_size(&state->incoming_pdu))); + async_req_done(req); + return; + } + + subreq = get_complete_frag_send(state, state->ev, state->cli, + &state->rhdr, &state->incoming_frag); + if (async_req_nomem(subreq, req)) { + return; + } + subreq->async.fn = rpc_api_pipe_got_pdu; + subreq->async.priv = req; +} + +static NTSTATUS rpc_api_pipe_recv(struct async_req *req, TALLOC_CTX *mem_ctx, + prs_struct *reply_pdu) +{ + struct rpc_api_pipe_state *state = talloc_get_type_abort( + req->private_data, struct rpc_api_pipe_state); + NTSTATUS status; + + if (async_req_is_error(req, &status)) { + return status; + } + + *reply_pdu = state->incoming_pdu; + reply_pdu->mem_ctx = mem_ctx; + + /* + * Prevent state->incoming_pdu from being freed in + * rpc_api_pipe_state_destructor() + */ + prs_init_empty(&state->incoming_pdu, state, UNMARSHALL); + + return NT_STATUS_OK; +} + +static NTSTATUS rpc_api_pipe(TALLOC_CTX *mem_ctx, struct rpc_pipe_client *cli, + prs_struct *data, /* Outgoing pdu fragment, + * already formatted for + * send. */ + prs_struct *rbuf, /* Incoming reply - return as + * an NDR stream. */ + uint8 expected_pkt_type) +{ + TALLOC_CTX *frame = talloc_stackframe(); + struct event_context *ev; + struct async_req *req; + NTSTATUS status = NT_STATUS_NO_MEMORY; + + ev = event_context_init(frame); + if (ev == NULL) { + goto fail; + } + + req = rpc_api_pipe_send(frame, ev, cli, data, expected_pkt_type); + if (req == NULL) { + goto fail; + } + + while (req->state < ASYNC_REQ_DONE) { + event_loop_once(ev); + } + + status = rpc_api_pipe_recv(req, mem_ctx, rbuf); + fail: + TALLOC_FREE(frame); + return status; +} /******************************************************************* Creates krb5 auth bind. -- cgit