diff options
-rw-r--r-- | source3/libsmb/clireadwrite.c | 288 |
1 files changed, 187 insertions, 101 deletions
diff --git a/source3/libsmb/clireadwrite.c b/source3/libsmb/clireadwrite.c index 47e7f1bfd4..550c52b15e 100644 --- a/source3/libsmb/clireadwrite.c +++ b/source3/libsmb/clireadwrite.c @@ -1092,13 +1092,7 @@ NTSTATUS cli_writeall(struct cli_state *cli, uint16_t fnum, uint16_t mode, return status; } -struct cli_push_write_state { - struct tevent_req *req;/* This is the main request! Not the subreq */ - uint32_t idx; - off_t ofs; - uint8_t *buf; - size_t size; -}; +struct cli_push_chunk; struct cli_push_state { struct tevent_context *ev; @@ -1106,7 +1100,6 @@ struct cli_push_state { uint16_t fnum; uint16_t mode; off_t start_offset; - size_t window_size; size_t (*source)(uint8_t *buf, size_t n, void *priv); void *priv; @@ -1118,62 +1111,32 @@ struct cli_push_state { /* * Outstanding requests + * + * The maximum is 256: + * - which would be a window of 256 MByte + * for SMB2 with multi-credit + * or smb1 unix extentions. */ - uint32_t pending; - uint16_t max_reqs; - uint32_t num_reqs; - struct cli_push_write_state **reqs; + uint16_t max_chunks; + uint16_t num_chunks; + uint16_t num_waiting; + struct cli_push_chunk *chunks; }; -static void cli_push_written(struct tevent_req *req); - -static bool cli_push_write_setup(struct tevent_req *req, - struct cli_push_state *state, - uint32_t idx) -{ - struct cli_push_write_state *substate; +struct cli_push_chunk { + struct cli_push_chunk *prev, *next; + struct tevent_req *req;/* This is the main request! Not the subreq */ struct tevent_req *subreq; + off_t ofs; + uint8_t *buf; + size_t total_size; + size_t tmp_size; + bool done; +}; - substate = talloc(state->reqs, struct cli_push_write_state); - if (!substate) { - return false; - } - substate->req = req; - substate->idx = idx; - substate->ofs = state->next_offset; - substate->buf = talloc_array(substate, uint8_t, state->chunk_size); - if (!substate->buf) { - talloc_free(substate); - return false; - } - substate->size = state->source(substate->buf, - state->chunk_size, - state->priv); - if (substate->size == 0) { - state->eof = true; - /* nothing to send */ - talloc_free(substate); - return true; - } - - subreq = cli_writeall_send(substate, - state->ev, state->cli, - state->fnum, state->mode, - substate->buf, - substate->ofs, - substate->size); - if (!subreq) { - talloc_free(substate); - return false; - } - tevent_req_set_callback(subreq, cli_push_written, substate); - - state->reqs[idx] = substate; - state->pending += 1; - state->next_offset += substate->size; - - return true; -} +static void cli_push_setup_chunks(struct tevent_req *req); +static void cli_push_chunk_ship(struct cli_push_chunk *chunk); +static void cli_push_chunk_done(struct tevent_req *subreq); struct tevent_req *cli_push_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct cli_state *cli, @@ -1185,8 +1148,8 @@ struct tevent_req *cli_push_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, { struct tevent_req *req; struct cli_push_state *state; - uint32_t i; size_t page_size = 1024; + uint64_t tmp64; req = tevent_req_create(mem_ctx, &state, struct cli_push_state); if (req == NULL) { @@ -1199,8 +1162,6 @@ struct tevent_req *cli_push_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, state->mode = mode; state->source = source; state->priv = priv; - state->eof = false; - state->pending = 0; state->next_offset = start_offset; state->chunk_size = cli_write_max_bufsize(cli, mode, 14); @@ -1208,77 +1169,202 @@ struct tevent_req *cli_push_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, state->chunk_size &= ~(page_size - 1); } - state->max_reqs = smbXcli_conn_max_requests(cli->conn); - if (window_size == 0) { - window_size = state->max_reqs * state->chunk_size; + /* + * We use 16 MByte as default window size. + */ + window_size = 16 * 1024 * 1024; } - state->num_reqs = window_size/state->chunk_size; + + tmp64 = window_size/state->chunk_size; if ((window_size % state->chunk_size) > 0) { - state->num_reqs += 1; + tmp64 += 1; } - state->num_reqs = MIN(state->num_reqs, state->max_reqs); - state->num_reqs = MAX(state->num_reqs, 1); + tmp64 = MAX(tmp64, 1); + tmp64 = MIN(tmp64, 256); + state->max_chunks = tmp64; - state->reqs = talloc_zero_array(state, struct cli_push_write_state *, - state->num_reqs); - if (state->reqs == NULL) { - goto failed; + /* + * We defer the callback because of the complex + * substate/subfunction logic + */ + tevent_req_defer_callback(req, ev); + + cli_push_setup_chunks(req); + if (!tevent_req_is_in_progress(req)) { + return tevent_req_post(req, ev); } - for (i=0; i<state->num_reqs; i++) { - if (!cli_push_write_setup(req, state, i)) { - goto failed; + return req; +} + +static void cli_push_setup_chunks(struct tevent_req *req) +{ + struct cli_push_state *state = + tevent_req_data(req, + struct cli_push_state); + struct cli_push_chunk *chunk, *next = NULL; + size_t i; + + for (chunk = state->chunks; chunk; chunk = next) { + /* + * Note that chunk might be removed from this call. + */ + next = chunk->next; + cli_push_chunk_ship(chunk); + if (!tevent_req_is_in_progress(req)) { + return; + } + } + + for (i = state->num_chunks; i < state->max_chunks; i++) { + + if (state->num_waiting > 0) { + return; } if (state->eof) { break; } + + chunk = talloc_zero(state, struct cli_push_chunk); + if (tevent_req_nomem(chunk, req)) { + return; + } + chunk->req = req; + chunk->ofs = state->next_offset; + chunk->buf = talloc_array(chunk, + uint8_t, + state->chunk_size); + if (tevent_req_nomem(chunk->buf, req)) { + return; + } + chunk->total_size = state->source(chunk->buf, + state->chunk_size, + state->priv); + if (chunk->total_size == 0) { + /* nothing to send */ + talloc_free(chunk); + state->eof = true; + break; + } + state->next_offset += chunk->total_size; + + DLIST_ADD_END(state->chunks, chunk, NULL); + state->num_chunks++; + state->num_waiting++; + + cli_push_chunk_ship(chunk); + if (!tevent_req_is_in_progress(req)) { + return; + } } - if (state->pending == 0) { - tevent_req_done(req); - return tevent_req_post(req, ev); + if (!state->eof) { + return; } - return req; + if (state->num_chunks > 0) { + return; + } + + tevent_req_done(req); +} + +static void cli_push_chunk_ship(struct cli_push_chunk *chunk) +{ + struct tevent_req *req = chunk->req; + struct cli_push_state *state = + tevent_req_data(req, + struct cli_push_state); + bool ok; + const uint8_t *buf; + off_t ofs; + size_t size; + + if (chunk->done) { + DLIST_REMOVE(state->chunks, chunk); + SMB_ASSERT(state->num_chunks > 0); + state->num_chunks--; + TALLOC_FREE(chunk); + + return; + } + + if (chunk->subreq != NULL) { + return; + } + + SMB_ASSERT(state->num_waiting > 0); + + buf = chunk->buf + chunk->tmp_size; + ofs = chunk->ofs + chunk->tmp_size; + size = chunk->total_size - chunk->tmp_size; + + ok = smb1cli_conn_req_possible(state->cli->conn); + if (!ok) { + return; + } + + chunk->subreq = cli_write_andx_send(chunk, + state->ev, + state->cli, + state->fnum, + state->mode, + buf, + ofs, + size); + if (tevent_req_nomem(chunk->subreq, req)) { + return; + } + tevent_req_set_callback(chunk->subreq, + cli_push_chunk_done, + chunk); - failed: - tevent_req_nterror(req, NT_STATUS_NO_MEMORY); - return tevent_req_post(req, ev); + state->num_waiting--; + return; } -static void cli_push_written(struct tevent_req *subreq) +static void cli_push_chunk_done(struct tevent_req *subreq) { - struct cli_push_write_state *substate = tevent_req_callback_data( - subreq, struct cli_push_write_state); - struct tevent_req *req = substate->req; - struct cli_push_state *state = tevent_req_data( - req, struct cli_push_state); + struct cli_push_chunk *chunk = + tevent_req_callback_data(subreq, + struct cli_push_chunk); + struct tevent_req *req = chunk->req; + struct cli_push_state *state = + tevent_req_data(req, + struct cli_push_state); NTSTATUS status; - uint32_t idx = substate->idx; + size_t expected = chunk->total_size - chunk->tmp_size; + size_t written; - state->reqs[idx] = NULL; - state->pending -= 1; + chunk->subreq = NULL; - status = cli_writeall_recv(subreq, NULL); + status = cli_write_andx_recv(subreq, &written); TALLOC_FREE(subreq); - TALLOC_FREE(substate); if (tevent_req_nterror(req, status)) { return; } - if (!state->eof) { - if (!cli_push_write_setup(req, state, idx)) { - tevent_req_nterror(req, NT_STATUS_NO_MEMORY); - return; - } + if (written > expected) { + tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE); + return; } - if (state->pending == 0) { - tevent_req_done(req); + if (written == 0) { + tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE); return; } + + chunk->tmp_size += written; + + if (chunk->tmp_size == chunk->total_size) { + chunk->done = true; + } else { + state->num_waiting++; + } + + cli_push_setup_chunks(req); } NTSTATUS cli_push_recv(struct tevent_req *req) |