From ae049c49df88fcbcce490cd81361912e67775b12 Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Fri, 5 Mar 2010 18:30:10 +0100 Subject: s4:libcli/wrepl: rewrite the low level request handling to use tevent_queue and tstream_context metze --- source4/wrepl_server/wrepl_in_call.c | 37 +++++++++++------------------- source4/wrepl_server/wrepl_in_connection.c | 18 ++++----------- source4/wrepl_server/wrepl_out_helpers.c | 30 ++++++------------------ 3 files changed, 24 insertions(+), 61 deletions(-) (limited to 'source4/wrepl_server') diff --git a/source4/wrepl_server/wrepl_in_call.c b/source4/wrepl_server/wrepl_in_call.c index f26a93b595..dfe255567e 100644 --- a/source4/wrepl_server/wrepl_in_call.c +++ b/source4/wrepl_server/wrepl_in_call.c @@ -22,6 +22,7 @@ #include "includes.h" #include "lib/events/events.h" #include "lib/socket/socket.h" +#include "smbd/service_task.h" #include "smbd/service_stream.h" #include "libcli/wrepl/winsrepl.h" #include "wrepl_server/wrepl_server.h" @@ -31,7 +32,7 @@ #include "lib/ldb/include/ldb_errors.h" #include "system/time.h" #include "lib/util/tsort.h" -#include "lib/stream/packet.h" /* FIXME */ +#include "param/param.h" static NTSTATUS wreplsrv_in_start_association(struct wreplsrv_in_call *call) { @@ -342,7 +343,7 @@ static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call) struct wreplsrv_out_connection *wrepl_out; struct wrepl_table *update_in = &call->req_packet.message.replication.info.table; struct wreplsrv_in_update_state *update_state; - struct packet_context *packet; + NTSTATUS status; DEBUG(2,("WREPL_REPL_UPDATE: partner[%s] initiator[%s] num_owners[%u]\n", call->wreplconn->partner->address, @@ -351,34 +352,21 @@ static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call) update_state = talloc(wrepl_in, struct wreplsrv_in_update_state); NT_STATUS_HAVE_NO_MEMORY(update_state); - /* - * We need to flip the connection into a client connection - * and do a WREPL_REPL_SEND_REQUEST's on the that connection - * and then stop this connection. - */ - packet = packet_init(wrepl_in); - if (packet == NULL) { - return NT_STATUS_NO_MEMORY; - } - - /* - * TODO We can free the tstream here as we don't use it in the client - * yet. - */ - TALLOC_FREE(wrepl_in->send_queue); - TALLOC_FREE(wrepl_in->tstream); - wrepl_out = talloc(update_state, struct wreplsrv_out_connection); NT_STATUS_HAVE_NO_MEMORY(wrepl_out); wrepl_out->service = wrepl_in->service; wrepl_out->partner = wrepl_in->partner; wrepl_out->assoc_ctx.our_ctx = wrepl_in->assoc_ctx.our_ctx; wrepl_out->assoc_ctx.peer_ctx = wrepl_in->assoc_ctx.peer_ctx; - wrepl_out->sock = wrepl_socket_merge(wrepl_out, - wrepl_in->conn->event.ctx, - wrepl_in->conn->socket, - packet); - NT_STATUS_HAVE_NO_MEMORY(wrepl_out->sock); + wrepl_out->sock = wrepl_socket_init(wrepl_out, + wrepl_in->conn->event.ctx, + lp_iconv_convenience(wrepl_in->service->task->lp_ctx)); + NT_STATUS_HAVE_NO_MEMORY_AND_FREE(wrepl_out->sock, update_state); + + TALLOC_FREE(wrepl_in->send_queue); + + status = wrepl_socket_donate_stream(wrepl_out->sock, &wrepl_in->tstream); + NT_STATUS_NOT_OK_RETURN_AND_FREE(status, update_state); update_state->wrepl_in = wrepl_in; update_state->wrepl_out = wrepl_out; @@ -389,6 +377,7 @@ static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call) update_state->cycle_io.in.wreplconn = wrepl_out; update_state->creq = wreplsrv_pull_cycle_send(update_state, &update_state->cycle_io); if (!update_state->creq) { + talloc_free(update_state); return NT_STATUS_INTERNAL_ERROR; } diff --git a/source4/wrepl_server/wrepl_in_connection.c b/source4/wrepl_server/wrepl_in_connection.c index 6b60dd178f..ff0bb6c53d 100644 --- a/source4/wrepl_server/wrepl_in_connection.c +++ b/source4/wrepl_server/wrepl_in_connection.c @@ -349,7 +349,8 @@ static const struct stream_server_ops wreplsrv_stream_ops = { called when we get a new connection */ NTSTATUS wreplsrv_in_connection_merge(struct wreplsrv_partner *partner, - struct socket_context *sock, + uint32_t peer_assoc_ctx, + struct tstream_context **stream, struct wreplsrv_in_connection **_wrepl_in) { struct wreplsrv_service *service = partner->service; @@ -358,7 +359,6 @@ NTSTATUS wreplsrv_in_connection_merge(struct wreplsrv_partner *partner, struct stream_connection *conn; struct tevent_req *subreq; NTSTATUS status; - int rc; /* within the wrepl task we want to be a single process, so ask for the single process model ops and pass these to the @@ -374,11 +374,12 @@ NTSTATUS wreplsrv_in_connection_merge(struct wreplsrv_partner *partner, wrepl_in->service = service; wrepl_in->partner = partner; + wrepl_in->tstream = talloc_move(wrepl_in, stream); + wrepl_in->assoc_ctx.peer_ctx = peer_assoc_ctx; status = stream_new_connection_merge(service->task->event_ctx, service->task->lp_ctx, model_ops, - sock, &wreplsrv_stream_ops, service->task->msg_ctx, wrepl_in, @@ -399,17 +400,6 @@ NTSTATUS wreplsrv_in_connection_merge(struct wreplsrv_partner *partner, return NT_STATUS_NO_MEMORY; } - TALLOC_FREE(conn->event.fde); - - rc = tstream_bsd_existing_socket(wrepl_in, - socket_get_fd(sock), - &wrepl_in->tstream); - if (rc < 0) { - stream_terminate_connection(conn, - "wreplsrv_in_connection_merge: out of memory"); - return NT_STATUS_NO_MEMORY; - } - /* * The wrepl pdu's has the length as 4 byte (initial_read_size), * packet_full_request_u32 provides the pdu length then. diff --git a/source4/wrepl_server/wrepl_out_helpers.c b/source4/wrepl_server/wrepl_out_helpers.c index b99d78b3d5..5c15ac8d87 100644 --- a/source4/wrepl_server/wrepl_out_helpers.c +++ b/source4/wrepl_server/wrepl_out_helpers.c @@ -974,7 +974,7 @@ static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_st static NTSTATUS wreplsrv_push_notify_wait_update(struct wreplsrv_push_notify_state *state) { struct wreplsrv_in_connection *wrepl_in; - struct socket_context *sock; + struct tstream_context *stream; NTSTATUS status; status = wrepl_request_recv(state->subreq, state, NULL); @@ -988,38 +988,22 @@ static NTSTATUS wreplsrv_push_notify_wait_update(struct wreplsrv_push_notify_sta * message is received by the peer. */ - /* steal the socket_context */ - sock = state->wreplconn->sock->sock; - state->wreplconn->sock->sock = NULL; - talloc_steal(state, sock); - - /* - * TODO: steal the tstream if we switch the client to tsocket. - * This is just to get a compiler error as soon as we remove - * packet_context. - */ - state->wreplconn->sock->packet = NULL; - - /* - * free the wrepl_socket (client connection) - */ - talloc_free(state->wreplconn->sock); - state->wreplconn->sock = NULL; + status = wrepl_socket_split_stream(state->wreplconn->sock, state, &stream); + NT_STATUS_NOT_OK_RETURN(status); /* * now create a wreplsrv_in_connection, * on which we act as server * - * NOTE: sock and packet will be stolen by + * NOTE: stream will be stolen by * wreplsrv_in_connection_merge() */ status = wreplsrv_in_connection_merge(state->io->in.partner, - sock, &wrepl_in); + state->wreplconn->assoc_ctx.peer_ctx, + &stream, + &wrepl_in); NT_STATUS_NOT_OK_RETURN(status); - wrepl_in->assoc_ctx.peer_ctx = state->wreplconn->assoc_ctx.peer_ctx; - wrepl_in->assoc_ctx.our_ctx = 0; - /* now we can free the wreplsrv_out_connection */ TALLOC_FREE(state->wreplconn); -- cgit