From 2450fc1c271b9f944455370510062164e68a8d59 Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Fri, 5 Mar 2010 15:22:10 +0100 Subject: s4:libcli/wrepl: implement wrepl_request_send as a tevent_req based wrapper metze --- source4/libcli/wrepl/winsrepl.c | 157 +++++++++++++++++++++++-------- source4/torture/nbt/winsreplication.c | 12 ++- source4/wrepl_server/wrepl_out_helpers.c | 28 +++--- 3 files changed, 144 insertions(+), 53 deletions(-) (limited to 'source4') diff --git a/source4/libcli/wrepl/winsrepl.c b/source4/libcli/wrepl/winsrepl.c index 2b14de30f4..39d801d606 100644 --- a/source4/libcli/wrepl/winsrepl.c +++ b/source4/libcli/wrepl/winsrepl.c @@ -500,8 +500,8 @@ static int wrepl_send_ctrl_destructor(struct wrepl_send_ctrl_state *s) send a generic wins replication request */ static struct wrepl_request *wrepl_request_internal_send(struct wrepl_socket *wrepl_socket, - struct wrepl_packet *packet, - struct wrepl_send_ctrl *ctrl) + const struct wrepl_packet *packet, + const struct wrepl_send_ctrl *ctrl) { struct wrepl_request *req; struct wrepl_wrap wrap; @@ -576,18 +576,80 @@ static NTSTATUS wrepl_request_internal_recv(struct wrepl_request *req, return status; } -struct wrepl_request *wrepl_request_send(struct wrepl_socket *wrepl_socket, - struct wrepl_packet *packet, - struct wrepl_send_ctrl *ctrl) +struct wrepl_request_state { + struct wrepl_packet *packet; +}; + +static void wrepl_request_done(struct wrepl_request *subreq); + +struct tevent_req *wrepl_request_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct wrepl_socket *wrepl_socket, + const struct wrepl_packet *packet, + const struct wrepl_send_ctrl *ctrl) { - return wrepl_request_internal_send(wrepl_socket, packet, ctrl); + struct tevent_req *req; + struct wrepl_request_state *state; + struct wrepl_request *subreq; + + if (wrepl_socket->event.ctx != ev) { + /* TODO: remove wrepl_socket->event.ctx !!! */ + smb_panic("wrepl_associate_stop_send event context mismatch!"); + return NULL; + } + + req = tevent_req_create(mem_ctx, &state, + struct wrepl_request_state); + if (req == NULL) { + return NULL; + } + + subreq = wrepl_request_internal_send(wrepl_socket, packet, ctrl); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + subreq->async.fn = wrepl_request_done; + subreq->async.private_data = req; + + return req; } -NTSTATUS wrepl_request_recv(struct wrepl_request *req, +static void wrepl_request_done(struct wrepl_request *subreq) +{ + struct tevent_req *req = talloc_get_type_abort(subreq->async.private_data, + struct tevent_req); + struct wrepl_request_state *state = tevent_req_data(req, + struct wrepl_request_state); + NTSTATUS status; + + status = wrepl_request_internal_recv(subreq, state, &state->packet); + if (!NT_STATUS_IS_OK(status)) { + tevent_req_nterror(req, status); + return; + } + + tevent_req_done(req); +} + +NTSTATUS wrepl_request_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx, struct wrepl_packet **packet) { - return wrepl_request_internal_recv(req, mem_ctx, packet); + struct wrepl_request_state *state = tevent_req_data(req, + struct wrepl_request_state); + NTSTATUS status; + + if (tevent_req_is_nterror(req, &status)) { + tevent_req_received(req); + return status; + } + + if (packet) { + *packet = talloc_move(mem_ctx, &state->packet); + } + + tevent_req_received(req); + return NT_STATUS_OK; } /* @@ -595,11 +657,28 @@ NTSTATUS wrepl_request_recv(struct wrepl_request *req, */ NTSTATUS wrepl_request(struct wrepl_socket *wrepl_socket, TALLOC_CTX *mem_ctx, - struct wrepl_packet *req_packet, + const struct wrepl_packet *req_packet, struct wrepl_packet **reply_packet) { - struct wrepl_request *req = wrepl_request_send(wrepl_socket, req_packet, NULL); - return wrepl_request_recv(req, mem_ctx, reply_packet); + struct tevent_req *subreq; + bool ok; + NTSTATUS status; + + subreq = wrepl_request_send(mem_ctx, wrepl_socket->event.ctx, + wrepl_socket, req_packet, NULL); + NT_STATUS_HAVE_NO_MEMORY(subreq); + + ok = tevent_req_poll(subreq, wrepl_socket->event.ctx); + if (!ok) { + TALLOC_FREE(subreq); + return NT_STATUS_INTERNAL_ERROR; + } + + status = wrepl_request_recv(subreq, mem_ctx, reply_packet); + TALLOC_FREE(subreq); + NT_STATUS_NOT_OK_RETURN(status); + + return NT_STATUS_OK; } @@ -609,7 +688,7 @@ struct wrepl_associate_state { uint16_t major_version; }; -static void wrepl_associate_done(struct wrepl_request *subreq); +static void wrepl_associate_done(struct tevent_req *subreq); struct tevent_req *wrepl_associate_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, @@ -618,7 +697,7 @@ struct tevent_req *wrepl_associate_send(TALLOC_CTX *mem_ctx, { struct tevent_req *req; struct wrepl_associate_state *state; - struct wrepl_request *subreq; + struct tevent_req *subreq; if (wrepl_socket->event.ctx != ev) { /* TODO: remove wrepl_socket->event.ctx !!! */ @@ -651,19 +730,18 @@ struct tevent_req *wrepl_associate_send(TALLOC_CTX *mem_ctx, } memset(state->packet.padding.data, 0, state->packet.padding.length); - subreq = wrepl_request_send(wrepl_socket, &state->packet, NULL); + subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } - subreq->async.fn = wrepl_associate_done; - subreq->async.private_data = req; + tevent_req_set_callback(subreq, wrepl_associate_done, req); return req; } -static void wrepl_associate_done(struct wrepl_request *subreq) +static void wrepl_associate_done(struct tevent_req *subreq) { - struct tevent_req *req = talloc_get_type_abort(subreq->async.private_data, + struct tevent_req *req = tevent_req_callback_data(subreq, struct tevent_req); struct wrepl_associate_state *state = tevent_req_data(req, struct wrepl_associate_state); @@ -671,6 +749,7 @@ static void wrepl_associate_done(struct wrepl_request *subreq) struct wrepl_packet *packet; status = wrepl_request_recv(subreq, state, &packet); + TALLOC_FREE(subreq); if (!NT_STATUS_IS_OK(status)) { tevent_req_nterror(req, status); return; @@ -741,7 +820,7 @@ struct wrepl_associate_stop_state { struct wrepl_send_ctrl ctrl; }; -static void wrepl_associate_stop_done(struct wrepl_request *subreq); +static void wrepl_associate_stop_done(struct tevent_req *subreq); struct tevent_req *wrepl_associate_stop_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, @@ -750,7 +829,7 @@ struct tevent_req *wrepl_associate_stop_send(TALLOC_CTX *mem_ctx, { struct tevent_req *req; struct wrepl_associate_stop_state *state; - struct wrepl_request *subreq; + struct tevent_req *subreq; if (wrepl_socket->event.ctx != ev) { /* TODO: remove wrepl_socket->event.ctx !!! */ @@ -774,19 +853,18 @@ struct tevent_req *wrepl_associate_stop_send(TALLOC_CTX *mem_ctx, state->ctrl.disconnect_after_send = true; } - subreq = wrepl_request_send(wrepl_socket, &state->packet, &state->ctrl); + subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, &state->ctrl); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } - subreq->async.fn = wrepl_associate_stop_done; - subreq->async.private_data = req; + tevent_req_set_callback(subreq, wrepl_associate_stop_done, req); return req; } -static void wrepl_associate_stop_done(struct wrepl_request *subreq) +static void wrepl_associate_stop_done(struct tevent_req *subreq) { - struct tevent_req *req = talloc_get_type_abort(subreq->async.private_data, + struct tevent_req *req = tevent_req_callback_data(subreq, struct tevent_req); struct wrepl_associate_stop_state *state = tevent_req_data(req, struct wrepl_associate_stop_state); @@ -794,6 +872,7 @@ static void wrepl_associate_stop_done(struct wrepl_request *subreq) /* currently we don't care about a possible response */ status = wrepl_request_recv(subreq, state, NULL); + TALLOC_FREE(subreq); if (!NT_STATUS_IS_OK(status)) { tevent_req_nterror(req, status); return; @@ -852,7 +931,7 @@ struct wrepl_pull_table_state { struct wrepl_wins_owner *partners; }; -static void wrepl_pull_table_done(struct wrepl_request *subreq); +static void wrepl_pull_table_done(struct tevent_req *subreq); struct tevent_req *wrepl_pull_table_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, @@ -861,7 +940,7 @@ struct tevent_req *wrepl_pull_table_send(TALLOC_CTX *mem_ctx, { struct tevent_req *req; struct wrepl_pull_table_state *state; - struct wrepl_request *subreq; + struct tevent_req *subreq; if (wrepl_socket->event.ctx != ev) { /* TODO: remove wrepl_socket->event.ctx !!! */ @@ -880,19 +959,18 @@ struct tevent_req *wrepl_pull_table_send(TALLOC_CTX *mem_ctx, state->packet.mess_type = WREPL_REPLICATION; state->packet.message.replication.command = WREPL_REPL_TABLE_QUERY; - subreq = wrepl_request_send(wrepl_socket, &state->packet, NULL); + subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } - subreq->async.fn = wrepl_pull_table_done; - subreq->async.private_data = req; + tevent_req_set_callback(subreq, wrepl_pull_table_done, req); return req; } -static void wrepl_pull_table_done(struct wrepl_request *subreq) +static void wrepl_pull_table_done(struct tevent_req *subreq) { - struct tevent_req *req = talloc_get_type_abort(subreq->async.private_data, + struct tevent_req *req = tevent_req_callback_data(subreq, struct tevent_req); struct wrepl_pull_table_state *state = tevent_req_data(req, struct wrepl_pull_table_state); @@ -901,6 +979,7 @@ static void wrepl_pull_table_done(struct wrepl_request *subreq) struct wrepl_table *table; status = wrepl_request_recv(subreq, state, &packet); + TALLOC_FREE(subreq); if (!NT_STATUS_IS_OK(status)) { tevent_req_nterror(req, status); return; @@ -985,7 +1064,7 @@ struct wrepl_pull_names_state { struct wrepl_name *names; }; -static void wrepl_pull_names_done(struct wrepl_request *subreq); +static void wrepl_pull_names_done(struct tevent_req *subreq); struct tevent_req *wrepl_pull_names_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, @@ -994,7 +1073,7 @@ struct tevent_req *wrepl_pull_names_send(TALLOC_CTX *mem_ctx, { struct tevent_req *req; struct wrepl_pull_names_state *state; - struct wrepl_request *subreq; + struct tevent_req *subreq; if (wrepl_socket->event.ctx != ev) { /* TODO: remove wrepl_socket->event.ctx !!! */ @@ -1015,19 +1094,18 @@ struct tevent_req *wrepl_pull_names_send(TALLOC_CTX *mem_ctx, state->packet.message.replication.command = WREPL_REPL_SEND_REQUEST; state->packet.message.replication.info.owner = io->in.partner; - subreq = wrepl_request_send(wrepl_socket, &state->packet, NULL); + subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } - subreq->async.fn = wrepl_pull_names_done; - subreq->async.private_data = req; + tevent_req_set_callback(subreq, wrepl_pull_names_done, req); return req; } -static void wrepl_pull_names_done(struct wrepl_request *subreq) +static void wrepl_pull_names_done(struct tevent_req *subreq) { - struct tevent_req *req = talloc_get_type_abort(subreq->async.private_data, + struct tevent_req *req = tevent_req_callback_data(subreq, struct tevent_req); struct wrepl_pull_names_state *state = tevent_req_data(req, struct wrepl_pull_names_state); @@ -1036,6 +1114,7 @@ static void wrepl_pull_names_done(struct wrepl_request *subreq) uint32_t i; status = wrepl_request_recv(subreq, state, &packet); + TALLOC_FREE(subreq); if (!NT_STATUS_IS_OK(status)) { tevent_req_nterror(req, status); return; diff --git a/source4/torture/nbt/winsreplication.c b/source4/torture/nbt/winsreplication.c index 1655135a33..9a7be03199 100644 --- a/source4/torture/nbt/winsreplication.c +++ b/source4/torture/nbt/winsreplication.c @@ -83,7 +83,7 @@ static const char *wrepl_name_state_string(enum wrepl_name_state state) static bool test_assoc_ctx1(struct torture_context *tctx) { bool ret = true; - struct wrepl_request *req; + struct tevent_req *subreq; struct wrepl_socket *wrepl_socket1; struct wrepl_associate associate1; struct wrepl_socket *wrepl_socket2; @@ -95,6 +95,7 @@ static bool test_assoc_ctx1(struct torture_context *tctx) NTSTATUS status; struct nbt_name name; const char *address; + bool ok; if (!torture_nbt_get_name(tctx, &name, &address)) return false; @@ -131,8 +132,13 @@ static bool test_assoc_ctx1(struct torture_context *tctx) packet.message.replication.command = WREPL_REPL_TABLE_QUERY; ZERO_STRUCT(ctrl); ctrl.send_only = true; - req = wrepl_request_send(wrepl_socket2, &packet, &ctrl); - status = wrepl_request_recv(req, tctx, &rep_packet); + subreq = wrepl_request_send(tctx, tctx->ev, wrepl_socket2, &packet, &ctrl); + ok = tevent_req_poll(subreq, tctx->ev); + if (!ok) { + CHECK_STATUS(tctx, NT_STATUS_INTERNAL_ERROR, NT_STATUS_OK); + } + status = wrepl_request_recv(subreq, tctx, &rep_packet); + TALLOC_FREE(subreq); CHECK_STATUS(tctx, status, NT_STATUS_OK); torture_comment(tctx, "Send a association request (conn2), to make sure the last request was ignored\n"); diff --git a/source4/wrepl_server/wrepl_out_helpers.c b/source4/wrepl_server/wrepl_out_helpers.c index de3fb72318..19cabd1e12 100644 --- a/source4/wrepl_server/wrepl_out_helpers.c +++ b/source4/wrepl_server/wrepl_out_helpers.c @@ -848,15 +848,15 @@ struct wreplsrv_push_notify_state { enum wrepl_replication_cmd command; bool full_table; struct wrepl_send_ctrl ctrl; - struct wrepl_request *req; struct wrepl_packet req_packet; struct wrepl_packet *rep_packet; struct composite_context *creq; struct wreplsrv_out_connection *wreplconn; + struct tevent_req *subreq; }; static void wreplsrv_push_notify_handler_creq(struct composite_context *creq); -static void wreplsrv_push_notify_handler_req(struct wrepl_request *req); +static void wreplsrv_push_notify_handler_treq(struct tevent_req *subreq); static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state) { @@ -880,8 +880,10 @@ static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *s NT_STATUS_NOT_OK_RETURN(status); /* queue the request */ - state->req = wrepl_request_send(state->wreplconn->sock, req, NULL); - NT_STATUS_HAVE_NO_MEMORY(state->req); + state->subreq = wrepl_request_send(state, + state->wreplconn->service->task->event_ctx, + state->wreplconn->sock, req, NULL); + NT_STATUS_HAVE_NO_MEMORY(state->subreq); /* * now we need to convert the wrepl_socket (client connection) @@ -951,11 +953,14 @@ static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *s /* we won't get a reply to a inform message */ state->ctrl.send_only = true; - state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl); - NT_STATUS_HAVE_NO_MEMORY(state->req); + state->subreq = wrepl_request_send(state, + state->wreplconn->service->task->event_ctx, + state->wreplconn->sock, req, &state->ctrl); + NT_STATUS_HAVE_NO_MEMORY(state->subreq); - state->req->async.fn = wreplsrv_push_notify_handler_req; - state->req->async.private_data = state; + tevent_req_set_callback(state->subreq, + wreplsrv_push_notify_handler_treq, + state); state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM; @@ -1009,7 +1014,8 @@ static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_sta { NTSTATUS status; - status = wrepl_request_recv(state->req, state, NULL); + status = wrepl_request_recv(state->subreq, state, NULL); + TALLOC_FREE(state->subreq); NT_STATUS_NOT_OK_RETURN(status); state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE; @@ -1052,9 +1058,9 @@ static void wreplsrv_push_notify_handler_creq(struct composite_context *creq) return; } -static void wreplsrv_push_notify_handler_req(struct wrepl_request *req) +static void wreplsrv_push_notify_handler_treq(struct tevent_req *subreq) { - struct wreplsrv_push_notify_state *state = talloc_get_type(req->async.private_data, + struct wreplsrv_push_notify_state *state = tevent_req_callback_data(subreq, struct wreplsrv_push_notify_state); wreplsrv_push_notify_handler(state); return; -- cgit