diff options
-rw-r--r-- | source3/libsmb/clireadwrite.c | 417 |
1 files changed, 246 insertions, 171 deletions
diff --git a/source3/libsmb/clireadwrite.c b/source3/libsmb/clireadwrite.c index 550c52b15e..dd5d4c2865 100644 --- a/source3/libsmb/clireadwrite.c +++ b/source3/libsmb/clireadwrite.c @@ -388,23 +388,9 @@ static NTSTATUS cli_readall_recv(struct tevent_req *req, ssize_t *received, return NT_STATUS_OK; } -struct cli_pull_subreq { - struct tevent_req *req; - ssize_t received; - uint8_t *buf; -}; - -/* - * Parallel read support. - * - * cli_pull sends as many read&x requests as the server would allow via - * max_mux at a time. When replies flow back in, the data is written into - * the callback function "sink" in the right order. - */ +struct cli_pull_chunk; struct cli_pull_state { - struct tevent_req *req; - struct tevent_context *ev; struct cli_state *cli; uint16_t fnum; @@ -415,54 +401,49 @@ struct cli_pull_state { void *priv; size_t chunk_size; + off_t next_offset; + off_t remaining; /* - * Outstanding requests - */ - uint16_t max_reqs; - int num_reqs; - struct cli_pull_subreq *reqs; - - /* - * For how many bytes did we send requests already? - */ - off_t requested; - - /* - * Next request index to push into "sink". This walks around the "req" - * array, taking care that the requests are pushed to "sink" in the - * right order. If necessary (i.e. replies don't come in in the right - * order), replies are held back in "reqs". + * How many bytes did we push into "sink"? */ - int top_req; + off_t pushed; /* - * How many bytes did we push into "sink"? + * Outstanding requests + * + * The maximum is 256: + * - which would be a window of 256 MByte + * for SMB2 with multi-credit + * or smb1 unix extentions. */ - - off_t pushed; + uint16_t max_chunks; + uint16_t num_chunks; + uint16_t num_waiting; + struct cli_pull_chunk *chunks; }; -static char *cli_pull_print(struct tevent_req *req, TALLOC_CTX *mem_ctx) -{ - struct cli_pull_state *state = tevent_req_data( - req, struct cli_pull_state); - char *result; - - result = tevent_req_default_print(req, mem_ctx); - if (result == NULL) { - return NULL; - } - - return talloc_asprintf_append_buffer( - result, "num_reqs=%d, top_req=%d", - state->num_reqs, state->top_req); -} +struct cli_pull_chunk { + struct cli_pull_chunk *prev, *next; + struct tevent_req *req;/* This is the main request! Not the subreq */ + struct tevent_req *subreq; + off_t ofs; + uint8_t *buf; + size_t total_size; + size_t tmp_size; + bool done; +}; -static void cli_pull_read_done(struct tevent_req *read_req); +static void cli_pull_setup_chunks(struct tevent_req *req); +static void cli_pull_chunk_ship(struct cli_pull_chunk *chunk); +static void cli_pull_chunk_done(struct tevent_req *subreq); /* - * Prepare an async pull request + * Parallel read support. + * + * cli_pull sends as many read&x requests as the server would allow via + * max_mux at a time. When replies flow back in, the data is written into + * the callback function "sink" in the right order. */ struct tevent_req *cli_pull_send(TALLOC_CTX *mem_ctx, @@ -476,16 +457,13 @@ struct tevent_req *cli_pull_send(TALLOC_CTX *mem_ctx, { struct tevent_req *req; struct cli_pull_state *state; - int i; size_t page_size = 1024; + uint64_t tmp64; req = tevent_req_create(mem_ctx, &state, struct cli_pull_state); if (req == NULL) { return NULL; } - tevent_req_set_print_fn(req, cli_pull_print); - state->req = req; - state->cli = cli; state->ev = ev; state->fnum = fnum; @@ -493,9 +471,8 @@ struct tevent_req *cli_pull_send(TALLOC_CTX *mem_ctx, state->size = size; state->sink = sink; state->priv = priv; - - state->pushed = 0; - state->top_req = 0; + state->next_offset = start_offset; + state->remaining = size; if (size == 0) { tevent_req_done(req); @@ -507,155 +484,251 @@ struct tevent_req *cli_pull_send(TALLOC_CTX *mem_ctx, state->chunk_size &= ~(page_size - 1); } - state->max_reqs = smbXcli_conn_max_requests(cli->conn); + if (window_size == 0) { + /* + * We use 16 MByte as default window size. + */ + window_size = 16 * 1024 * 1024; + } + + tmp64 = window_size/state->chunk_size; + if ((window_size % state->chunk_size) > 0) { + tmp64 += 1; + } + tmp64 = MAX(tmp64, 1); + tmp64 = MIN(tmp64, 256); + state->max_chunks = tmp64; + + /* + * We defer the callback because of the complex + * substate/subfunction logic + */ + tevent_req_defer_callback(req, ev); + + cli_pull_setup_chunks(req); + if (!tevent_req_is_in_progress(req)) { + return tevent_req_post(req, ev); + } + + return req; +} - state->num_reqs = MAX(window_size/state->chunk_size, 1); - state->num_reqs = MIN(state->num_reqs, state->max_reqs); +static void cli_pull_setup_chunks(struct tevent_req *req) +{ + struct cli_pull_state *state = + tevent_req_data(req, + struct cli_pull_state); + struct cli_pull_chunk *chunk, *next = NULL; + size_t i; - state->reqs = talloc_zero_array(state, struct cli_pull_subreq, - state->num_reqs); - if (state->reqs == NULL) { - goto failed; + for (chunk = state->chunks; chunk; chunk = next) { + /* + * Note that chunk might be removed from this call. + */ + next = chunk->next; + cli_pull_chunk_ship(chunk); + if (!tevent_req_is_in_progress(req)) { + return; + } } - state->requested = 0; + for (i = state->num_chunks; i < state->max_chunks; i++) { - for (i=0; i<state->num_reqs; i++) { - struct cli_pull_subreq *subreq = &state->reqs[i]; - off_t size_left; - size_t request_thistime; + if (state->num_waiting > 0) { + return; + } - if (state->requested >= size) { - state->num_reqs = i; + if (state->remaining == 0) { break; } - size_left = size - state->requested; - request_thistime = MIN(size_left, state->chunk_size); + chunk = talloc_zero(state, struct cli_pull_chunk); + if (tevent_req_nomem(chunk, req)) { + return; + } + chunk->req = req; + chunk->ofs = state->next_offset; + chunk->total_size = MIN(state->remaining, state->chunk_size); + state->next_offset += chunk->total_size; + state->remaining -= chunk->total_size; - subreq->req = cli_readall_send( - state->reqs, ev, cli, fnum, - state->start_offset + state->requested, - request_thistime); + DLIST_ADD_END(state->chunks, chunk, NULL); + state->num_chunks++; + state->num_waiting++; - if (subreq->req == NULL) { - goto failed; + cli_pull_chunk_ship(chunk); + if (!tevent_req_is_in_progress(req)) { + return; } - tevent_req_set_callback(subreq->req, cli_pull_read_done, req); - state->requested += request_thistime; } - return req; -failed: - TALLOC_FREE(req); - return NULL; -} + if (state->remaining > 0) { + return; + } -/* - * Handle incoming read replies, push the data into sink and send out new - * requests if necessary. - */ + if (state->num_chunks > 0) { + return; + } + + tevent_req_done(req); +} -static void cli_pull_read_done(struct tevent_req *subreq) +static void cli_pull_chunk_ship(struct cli_pull_chunk *chunk) { - struct tevent_req *req = tevent_req_callback_data( - subreq, struct tevent_req); - struct cli_pull_state *state = tevent_req_data( - req, struct cli_pull_state); - struct cli_pull_subreq *pull_subreq = NULL; - NTSTATUS status; - int i; + struct tevent_req *req = chunk->req; + struct cli_pull_state *state = + tevent_req_data(req, + struct cli_pull_state); + bool ok; + off_t ofs; + size_t size; - for (i = 0; i < state->num_reqs; i++) { - pull_subreq = &state->reqs[i]; - if (subreq == pull_subreq->req) { - break; + if (chunk->done) { + NTSTATUS status; + + if (chunk != state->chunks) { + /* + * this chunk is not the + * first one in the list. + * + * which means we should not + * push it into the sink yet. + */ + return; + } + + if (chunk->tmp_size == 0) { + /* + * we git a short read, we're done + */ + tevent_req_done(req); + return; } + + status = state->sink((char *)chunk->buf, + chunk->tmp_size, + state->priv); + if (tevent_req_nterror(req, status)) { + return; + } + state->pushed += chunk->tmp_size; + + if (chunk->tmp_size < chunk->total_size) { + /* + * we git a short read, we're done + */ + tevent_req_done(req); + return; + } + + DLIST_REMOVE(state->chunks, chunk); + SMB_ASSERT(state->num_chunks > 0); + state->num_chunks--; + TALLOC_FREE(chunk); + + return; } - if (i == state->num_reqs) { - /* Huh -- received something we did not send?? */ - tevent_req_nterror(req, NT_STATUS_INTERNAL_ERROR); + + if (chunk->subreq != NULL) { return; } - status = cli_readall_recv(subreq, &pull_subreq->received, - &pull_subreq->buf); - if (!NT_STATUS_IS_OK(status)) { - tevent_req_nterror(state->req, status); + SMB_ASSERT(state->num_waiting > 0); + + ofs = chunk->ofs + chunk->tmp_size; + size = chunk->total_size - chunk->tmp_size; + + ok = smb1cli_conn_req_possible(state->cli->conn); + if (!ok) { return; } - /* - * This loop is the one to take care of out-of-order replies. All - * pending requests are in state->reqs, state->reqs[top_req] is the - * one that is to be pushed next. If however a request later than - * top_req is replied to, then we can't push yet. If top_req is - * replied to at a later point then, we need to push all the finished - * requests. - */ + chunk->subreq = cli_read_andx_send(chunk, + state->ev, + state->cli, + state->fnum, + ofs, + size); + if (tevent_req_nomem(chunk->subreq, req)) { + return; + } + tevent_req_set_callback(chunk->subreq, + cli_pull_chunk_done, + chunk); - while (state->reqs[state->top_req].req != NULL) { - struct cli_pull_subreq *top_subreq; + state->num_waiting--; + return; +} - DEBUG(11, ("cli_pull_read_done: top_req = %d\n", - state->top_req)); +static void cli_pull_chunk_done(struct tevent_req *subreq) +{ + struct cli_pull_chunk *chunk = + tevent_req_callback_data(subreq, + struct cli_pull_chunk); + struct tevent_req *req = chunk->req; + struct cli_pull_state *state = + tevent_req_data(req, + struct cli_pull_state); + NTSTATUS status; + size_t expected = chunk->total_size - chunk->tmp_size; + ssize_t received; + uint8_t *buf = NULL; - top_subreq = &state->reqs[state->top_req]; + chunk->subreq = NULL; - if (tevent_req_is_in_progress(top_subreq->req)) { - DEBUG(11, ("cli_pull_read_done: top request not yet " - "done\n")); - return; - } + status = cli_read_andx_recv(subreq, &received, &buf); + if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) { + received = 0; + status = NT_STATUS_OK; + } + if (tevent_req_nterror(req, status)) { + return; + } + + if (received > expected) { + tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE); + return; + } - DEBUG(10, ("cli_pull_read_done: Pushing %d bytes, %d already " - "pushed\n", (int)top_subreq->received, - (int)state->pushed)); + if (received == 0) { + /* + * We got EOF we're done + */ + chunk->done = true; + cli_pull_setup_chunks(req); + return; + } - status = state->sink((char *)top_subreq->buf, - top_subreq->received, state->priv); - if (tevent_req_nterror(state->req, status)) { + if (received == chunk->total_size) { + /* + * We got it in the first run. + * + * We don't call TALLOC_FREE(subreq) + * here and keep the returned buffer. + */ + chunk->buf = buf; + } else if (chunk->buf == NULL) { + chunk->buf = talloc_array(chunk, uint8_t, chunk->total_size); + if (tevent_req_nomem(chunk->buf, req)) { return; } - state->pushed += top_subreq->received; - - TALLOC_FREE(state->reqs[state->top_req].req); - - if (state->requested < state->size) { - struct tevent_req *new_req; - off_t size_left; - size_t request_thistime; - - size_left = state->size - state->requested; - request_thistime = MIN(size_left, state->chunk_size); - - DEBUG(10, ("cli_pull_read_done: Requesting %d bytes " - "at %d, position %d\n", - (int)request_thistime, - (int)(state->start_offset - + state->requested), - state->top_req)); - - new_req = cli_readall_send( - state->reqs, state->ev, state->cli, - state->fnum, - state->start_offset + state->requested, - request_thistime); - - if (tevent_req_nomem(new_req, state->req)) { - return; - } - tevent_req_set_callback(new_req, cli_pull_read_done, - req); - - state->reqs[state->top_req].req = new_req; - state->requested += request_thistime; - } + } - state->top_req = (state->top_req+1) % state->num_reqs; + if (received != chunk->total_size) { + uint8_t *p = chunk->buf + chunk->tmp_size; + memcpy(p, buf, received); + TALLOC_FREE(subreq); } - tevent_req_done(req); + chunk->tmp_size += received; + + if (chunk->tmp_size == chunk->total_size) { + chunk->done = true; + } else { + state->num_waiting++; + } + + cli_pull_setup_chunks(req); } NTSTATUS cli_pull_recv(struct tevent_req *req, off_t *received) @@ -665,9 +738,11 @@ NTSTATUS cli_pull_recv(struct tevent_req *req, off_t *received) NTSTATUS status; if (tevent_req_is_nterror(req, &status)) { + tevent_req_received(req); return status; } *received = state->pushed; + tevent_req_received(req); return NT_STATUS_OK; } |