summaryrefslogtreecommitdiff
path: root/source3
diff options
context:
space:
mode:
Diffstat (limited to 'source3')
-rw-r--r--source3/libsmb/clireadwrite.c417
1 files changed, 246 insertions, 171 deletions
diff --git a/source3/libsmb/clireadwrite.c b/source3/libsmb/clireadwrite.c
index 550c52b15e..dd5d4c2865 100644
--- a/source3/libsmb/clireadwrite.c
+++ b/source3/libsmb/clireadwrite.c
@@ -388,23 +388,9 @@ static NTSTATUS cli_readall_recv(struct tevent_req *req, ssize_t *received,
return NT_STATUS_OK;
}
-struct cli_pull_subreq {
- struct tevent_req *req;
- ssize_t received;
- uint8_t *buf;
-};
-
-/*
- * Parallel read support.
- *
- * cli_pull sends as many read&x requests as the server would allow via
- * max_mux at a time. When replies flow back in, the data is written into
- * the callback function "sink" in the right order.
- */
+struct cli_pull_chunk;
struct cli_pull_state {
- struct tevent_req *req;
-
struct tevent_context *ev;
struct cli_state *cli;
uint16_t fnum;
@@ -415,54 +401,49 @@ struct cli_pull_state {
void *priv;
size_t chunk_size;
+ off_t next_offset;
+ off_t remaining;
/*
- * Outstanding requests
- */
- uint16_t max_reqs;
- int num_reqs;
- struct cli_pull_subreq *reqs;
-
- /*
- * For how many bytes did we send requests already?
- */
- off_t requested;
-
- /*
- * Next request index to push into "sink". This walks around the "req"
- * array, taking care that the requests are pushed to "sink" in the
- * right order. If necessary (i.e. replies don't come in in the right
- * order), replies are held back in "reqs".
+ * How many bytes did we push into "sink"?
*/
- int top_req;
+ off_t pushed;
/*
- * How many bytes did we push into "sink"?
+ * Outstanding requests
+ *
+ * The maximum is 256:
+ * - which would be a window of 256 MByte
+ * for SMB2 with multi-credit
+ * or smb1 unix extentions.
*/
-
- off_t pushed;
+ uint16_t max_chunks;
+ uint16_t num_chunks;
+ uint16_t num_waiting;
+ struct cli_pull_chunk *chunks;
};
-static char *cli_pull_print(struct tevent_req *req, TALLOC_CTX *mem_ctx)
-{
- struct cli_pull_state *state = tevent_req_data(
- req, struct cli_pull_state);
- char *result;
-
- result = tevent_req_default_print(req, mem_ctx);
- if (result == NULL) {
- return NULL;
- }
-
- return talloc_asprintf_append_buffer(
- result, "num_reqs=%d, top_req=%d",
- state->num_reqs, state->top_req);
-}
+struct cli_pull_chunk {
+ struct cli_pull_chunk *prev, *next;
+ struct tevent_req *req;/* This is the main request! Not the subreq */
+ struct tevent_req *subreq;
+ off_t ofs;
+ uint8_t *buf;
+ size_t total_size;
+ size_t tmp_size;
+ bool done;
+};
-static void cli_pull_read_done(struct tevent_req *read_req);
+static void cli_pull_setup_chunks(struct tevent_req *req);
+static void cli_pull_chunk_ship(struct cli_pull_chunk *chunk);
+static void cli_pull_chunk_done(struct tevent_req *subreq);
/*
- * Prepare an async pull request
+ * Parallel read support.
+ *
+ * cli_pull sends as many read&x requests as the server would allow via
+ * max_mux at a time. When replies flow back in, the data is written into
+ * the callback function "sink" in the right order.
*/
struct tevent_req *cli_pull_send(TALLOC_CTX *mem_ctx,
@@ -476,16 +457,13 @@ struct tevent_req *cli_pull_send(TALLOC_CTX *mem_ctx,
{
struct tevent_req *req;
struct cli_pull_state *state;
- int i;
size_t page_size = 1024;
+ uint64_t tmp64;
req = tevent_req_create(mem_ctx, &state, struct cli_pull_state);
if (req == NULL) {
return NULL;
}
- tevent_req_set_print_fn(req, cli_pull_print);
- state->req = req;
-
state->cli = cli;
state->ev = ev;
state->fnum = fnum;
@@ -493,9 +471,8 @@ struct tevent_req *cli_pull_send(TALLOC_CTX *mem_ctx,
state->size = size;
state->sink = sink;
state->priv = priv;
-
- state->pushed = 0;
- state->top_req = 0;
+ state->next_offset = start_offset;
+ state->remaining = size;
if (size == 0) {
tevent_req_done(req);
@@ -507,155 +484,251 @@ struct tevent_req *cli_pull_send(TALLOC_CTX *mem_ctx,
state->chunk_size &= ~(page_size - 1);
}
- state->max_reqs = smbXcli_conn_max_requests(cli->conn);
+ if (window_size == 0) {
+ /*
+ * We use 16 MByte as default window size.
+ */
+ window_size = 16 * 1024 * 1024;
+ }
+
+ tmp64 = window_size/state->chunk_size;
+ if ((window_size % state->chunk_size) > 0) {
+ tmp64 += 1;
+ }
+ tmp64 = MAX(tmp64, 1);
+ tmp64 = MIN(tmp64, 256);
+ state->max_chunks = tmp64;
+
+ /*
+ * We defer the callback because of the complex
+ * substate/subfunction logic
+ */
+ tevent_req_defer_callback(req, ev);
+
+ cli_pull_setup_chunks(req);
+ if (!tevent_req_is_in_progress(req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ return req;
+}
- state->num_reqs = MAX(window_size/state->chunk_size, 1);
- state->num_reqs = MIN(state->num_reqs, state->max_reqs);
+static void cli_pull_setup_chunks(struct tevent_req *req)
+{
+ struct cli_pull_state *state =
+ tevent_req_data(req,
+ struct cli_pull_state);
+ struct cli_pull_chunk *chunk, *next = NULL;
+ size_t i;
- state->reqs = talloc_zero_array(state, struct cli_pull_subreq,
- state->num_reqs);
- if (state->reqs == NULL) {
- goto failed;
+ for (chunk = state->chunks; chunk; chunk = next) {
+ /*
+ * Note that chunk might be removed from this call.
+ */
+ next = chunk->next;
+ cli_pull_chunk_ship(chunk);
+ if (!tevent_req_is_in_progress(req)) {
+ return;
+ }
}
- state->requested = 0;
+ for (i = state->num_chunks; i < state->max_chunks; i++) {
- for (i=0; i<state->num_reqs; i++) {
- struct cli_pull_subreq *subreq = &state->reqs[i];
- off_t size_left;
- size_t request_thistime;
+ if (state->num_waiting > 0) {
+ return;
+ }
- if (state->requested >= size) {
- state->num_reqs = i;
+ if (state->remaining == 0) {
break;
}
- size_left = size - state->requested;
- request_thistime = MIN(size_left, state->chunk_size);
+ chunk = talloc_zero(state, struct cli_pull_chunk);
+ if (tevent_req_nomem(chunk, req)) {
+ return;
+ }
+ chunk->req = req;
+ chunk->ofs = state->next_offset;
+ chunk->total_size = MIN(state->remaining, state->chunk_size);
+ state->next_offset += chunk->total_size;
+ state->remaining -= chunk->total_size;
- subreq->req = cli_readall_send(
- state->reqs, ev, cli, fnum,
- state->start_offset + state->requested,
- request_thistime);
+ DLIST_ADD_END(state->chunks, chunk, NULL);
+ state->num_chunks++;
+ state->num_waiting++;
- if (subreq->req == NULL) {
- goto failed;
+ cli_pull_chunk_ship(chunk);
+ if (!tevent_req_is_in_progress(req)) {
+ return;
}
- tevent_req_set_callback(subreq->req, cli_pull_read_done, req);
- state->requested += request_thistime;
}
- return req;
-failed:
- TALLOC_FREE(req);
- return NULL;
-}
+ if (state->remaining > 0) {
+ return;
+ }
-/*
- * Handle incoming read replies, push the data into sink and send out new
- * requests if necessary.
- */
+ if (state->num_chunks > 0) {
+ return;
+ }
+
+ tevent_req_done(req);
+}
-static void cli_pull_read_done(struct tevent_req *subreq)
+static void cli_pull_chunk_ship(struct cli_pull_chunk *chunk)
{
- struct tevent_req *req = tevent_req_callback_data(
- subreq, struct tevent_req);
- struct cli_pull_state *state = tevent_req_data(
- req, struct cli_pull_state);
- struct cli_pull_subreq *pull_subreq = NULL;
- NTSTATUS status;
- int i;
+ struct tevent_req *req = chunk->req;
+ struct cli_pull_state *state =
+ tevent_req_data(req,
+ struct cli_pull_state);
+ bool ok;
+ off_t ofs;
+ size_t size;
- for (i = 0; i < state->num_reqs; i++) {
- pull_subreq = &state->reqs[i];
- if (subreq == pull_subreq->req) {
- break;
+ if (chunk->done) {
+ NTSTATUS status;
+
+ if (chunk != state->chunks) {
+ /*
+ * this chunk is not the
+ * first one in the list.
+ *
+ * which means we should not
+ * push it into the sink yet.
+ */
+ return;
+ }
+
+ if (chunk->tmp_size == 0) {
+ /*
+ * we git a short read, we're done
+ */
+ tevent_req_done(req);
+ return;
}
+
+ status = state->sink((char *)chunk->buf,
+ chunk->tmp_size,
+ state->priv);
+ if (tevent_req_nterror(req, status)) {
+ return;
+ }
+ state->pushed += chunk->tmp_size;
+
+ if (chunk->tmp_size < chunk->total_size) {
+ /*
+ * we git a short read, we're done
+ */
+ tevent_req_done(req);
+ return;
+ }
+
+ DLIST_REMOVE(state->chunks, chunk);
+ SMB_ASSERT(state->num_chunks > 0);
+ state->num_chunks--;
+ TALLOC_FREE(chunk);
+
+ return;
}
- if (i == state->num_reqs) {
- /* Huh -- received something we did not send?? */
- tevent_req_nterror(req, NT_STATUS_INTERNAL_ERROR);
+
+ if (chunk->subreq != NULL) {
return;
}
- status = cli_readall_recv(subreq, &pull_subreq->received,
- &pull_subreq->buf);
- if (!NT_STATUS_IS_OK(status)) {
- tevent_req_nterror(state->req, status);
+ SMB_ASSERT(state->num_waiting > 0);
+
+ ofs = chunk->ofs + chunk->tmp_size;
+ size = chunk->total_size - chunk->tmp_size;
+
+ ok = smb1cli_conn_req_possible(state->cli->conn);
+ if (!ok) {
return;
}
- /*
- * This loop is the one to take care of out-of-order replies. All
- * pending requests are in state->reqs, state->reqs[top_req] is the
- * one that is to be pushed next. If however a request later than
- * top_req is replied to, then we can't push yet. If top_req is
- * replied to at a later point then, we need to push all the finished
- * requests.
- */
+ chunk->subreq = cli_read_andx_send(chunk,
+ state->ev,
+ state->cli,
+ state->fnum,
+ ofs,
+ size);
+ if (tevent_req_nomem(chunk->subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(chunk->subreq,
+ cli_pull_chunk_done,
+ chunk);
- while (state->reqs[state->top_req].req != NULL) {
- struct cli_pull_subreq *top_subreq;
+ state->num_waiting--;
+ return;
+}
- DEBUG(11, ("cli_pull_read_done: top_req = %d\n",
- state->top_req));
+static void cli_pull_chunk_done(struct tevent_req *subreq)
+{
+ struct cli_pull_chunk *chunk =
+ tevent_req_callback_data(subreq,
+ struct cli_pull_chunk);
+ struct tevent_req *req = chunk->req;
+ struct cli_pull_state *state =
+ tevent_req_data(req,
+ struct cli_pull_state);
+ NTSTATUS status;
+ size_t expected = chunk->total_size - chunk->tmp_size;
+ ssize_t received;
+ uint8_t *buf = NULL;
- top_subreq = &state->reqs[state->top_req];
+ chunk->subreq = NULL;
- if (tevent_req_is_in_progress(top_subreq->req)) {
- DEBUG(11, ("cli_pull_read_done: top request not yet "
- "done\n"));
- return;
- }
+ status = cli_read_andx_recv(subreq, &received, &buf);
+ if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) {
+ received = 0;
+ status = NT_STATUS_OK;
+ }
+ if (tevent_req_nterror(req, status)) {
+ return;
+ }
+
+ if (received > expected) {
+ tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
+ return;
+ }
- DEBUG(10, ("cli_pull_read_done: Pushing %d bytes, %d already "
- "pushed\n", (int)top_subreq->received,
- (int)state->pushed));
+ if (received == 0) {
+ /*
+ * We got EOF we're done
+ */
+ chunk->done = true;
+ cli_pull_setup_chunks(req);
+ return;
+ }
- status = state->sink((char *)top_subreq->buf,
- top_subreq->received, state->priv);
- if (tevent_req_nterror(state->req, status)) {
+ if (received == chunk->total_size) {
+ /*
+ * We got it in the first run.
+ *
+ * We don't call TALLOC_FREE(subreq)
+ * here and keep the returned buffer.
+ */
+ chunk->buf = buf;
+ } else if (chunk->buf == NULL) {
+ chunk->buf = talloc_array(chunk, uint8_t, chunk->total_size);
+ if (tevent_req_nomem(chunk->buf, req)) {
return;
}
- state->pushed += top_subreq->received;
-
- TALLOC_FREE(state->reqs[state->top_req].req);
-
- if (state->requested < state->size) {
- struct tevent_req *new_req;
- off_t size_left;
- size_t request_thistime;
-
- size_left = state->size - state->requested;
- request_thistime = MIN(size_left, state->chunk_size);
-
- DEBUG(10, ("cli_pull_read_done: Requesting %d bytes "
- "at %d, position %d\n",
- (int)request_thistime,
- (int)(state->start_offset
- + state->requested),
- state->top_req));
-
- new_req = cli_readall_send(
- state->reqs, state->ev, state->cli,
- state->fnum,
- state->start_offset + state->requested,
- request_thistime);
-
- if (tevent_req_nomem(new_req, state->req)) {
- return;
- }
- tevent_req_set_callback(new_req, cli_pull_read_done,
- req);
-
- state->reqs[state->top_req].req = new_req;
- state->requested += request_thistime;
- }
+ }
- state->top_req = (state->top_req+1) % state->num_reqs;
+ if (received != chunk->total_size) {
+ uint8_t *p = chunk->buf + chunk->tmp_size;
+ memcpy(p, buf, received);
+ TALLOC_FREE(subreq);
}
- tevent_req_done(req);
+ chunk->tmp_size += received;
+
+ if (chunk->tmp_size == chunk->total_size) {
+ chunk->done = true;
+ } else {
+ state->num_waiting++;
+ }
+
+ cli_pull_setup_chunks(req);
}
NTSTATUS cli_pull_recv(struct tevent_req *req, off_t *received)
@@ -665,9 +738,11 @@ NTSTATUS cli_pull_recv(struct tevent_req *req, off_t *received)
NTSTATUS status;
if (tevent_req_is_nterror(req, &status)) {
+ tevent_req_received(req);
return status;
}
*received = state->pushed;
+ tevent_req_received(req);
return NT_STATUS_OK;
}