summaryrefslogtreecommitdiff
path: root/source4/wrepl_server/wrepl_out_helpers.c
diff options
context:
space:
mode:
authorStefan Metzmacher <metze@samba.org>2005-12-12 21:31:42 +0000
committerGerald (Jerry) Carter <jerry@samba.org>2007-10-10 13:47:17 -0500
commit36acd6e79c8cb881b9c333313402d993a6d0f511 (patch)
tree29738d21349a159c3cd28d4577779c6c7e8f209e /source4/wrepl_server/wrepl_out_helpers.c
parentb052ab8ad4ea31a45d61d5c6d8e34b6879848045 (diff)
downloadsamba-36acd6e79c8cb881b9c333313402d993a6d0f511.tar.gz
samba-36acd6e79c8cb881b9c333313402d993a6d0f511.tar.bz2
samba-36acd6e79c8cb881b9c333313402d993a6d0f511.zip
r12200: - move the the winsreplication client and server code to the packet_context
system - this needs to be in one big patch, because of the merging code, that changes client in server connections and the other way around - use socket_connect_send/_recv() in the client code metze (This used to be commit f0105b7fcdc3032d22444a1973927fff2dd9a06f)
Diffstat (limited to 'source4/wrepl_server/wrepl_out_helpers.c')
-rw-r--r--source4/wrepl_server/wrepl_out_helpers.c105
1 files changed, 74 insertions, 31 deletions
diff --git a/source4/wrepl_server/wrepl_out_helpers.c b/source4/wrepl_server/wrepl_out_helpers.c
index c7c6f55767..d19c38bfb8 100644
--- a/source4/wrepl_server/wrepl_out_helpers.c
+++ b/source4/wrepl_server/wrepl_out_helpers.c
@@ -45,24 +45,26 @@ struct wreplsrv_out_connect_state {
enum wreplsrv_out_connect_stage stage;
struct composite_context *c;
struct wrepl_request *req;
+ struct composite_context *c_req;
struct wrepl_associate assoc_io;
enum winsrepl_partner_type type;
struct wreplsrv_out_connection *wreplconn;
};
-static void wreplsrv_out_connect_handler(struct wrepl_request *req);
+static void wreplsrv_out_connect_handler_creq(struct composite_context *c_req);
+static void wreplsrv_out_connect_handler_req(struct wrepl_request *req);
static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
{
NTSTATUS status;
- status = wrepl_connect_recv(state->req);
+ status = wrepl_connect_recv(state->c_req);
NT_STATUS_NOT_OK_RETURN(status);
state->req = wrepl_associate_send(state->wreplconn->sock, &state->assoc_io);
NT_STATUS_HAVE_NO_MEMORY(state->req);
- state->req->async.fn = wreplsrv_out_connect_handler;
+ state->req->async.fn = wreplsrv_out_connect_handler_req;
state->req->async.private = state;
state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
@@ -92,10 +94,8 @@ static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_
return NT_STATUS_OK;
}
-static void wreplsrv_out_connect_handler(struct wrepl_request *req)
+static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state)
{
- struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private,
- struct wreplsrv_out_connect_state);
struct composite_context *c = state->c;
switch (state->stage) {
@@ -119,6 +119,22 @@ static void wreplsrv_out_connect_handler(struct wrepl_request *req)
}
}
+static void wreplsrv_out_connect_handler_creq(struct composite_context *creq)
+{
+ struct wreplsrv_out_connect_state *state = talloc_get_type(creq->async.private_data,
+ struct wreplsrv_out_connect_state);
+ wreplsrv_out_connect_handler(state);
+ return;
+}
+
+static void wreplsrv_out_connect_handler_req(struct wrepl_request *req)
+{
+ struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private,
+ struct wreplsrv_out_connect_state);
+ wreplsrv_out_connect_handler(state);
+ return;
+}
+
static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
enum winsrepl_partner_type type,
struct wreplsrv_out_connection *wreplconn)
@@ -179,13 +195,13 @@ static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partn
state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
state->wreplconn= wreplconn;
- state->req = wrepl_connect_send(wreplconn->sock,
+ state->c_req = wrepl_connect_send(wreplconn->sock,
partner->our_address,
partner->address);
- if (!state->req) goto failed;
+ if (!state->c_req) goto failed;
- state->req->async.fn = wreplsrv_out_connect_handler;
- state->req->async.private = state;
+ state->c_req->async.fn = wreplsrv_out_connect_handler_creq;
+ state->c_req->async.private_data = state;
return c;
failed:
@@ -796,6 +812,7 @@ struct wreplsrv_push_notify_state {
struct wreplsrv_push_notify_io *io;
enum wrepl_replication_cmd command;
BOOL full_table;
+ struct wrepl_send_ctrl ctrl;
struct wrepl_request *req;
struct wrepl_packet req_packet;
struct wrepl_packet *rep_packet;
@@ -815,10 +832,11 @@ static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *s
struct wreplsrv_in_connection *wrepl_in;
NTSTATUS status;
struct socket_context *sock;
- struct data_blob_list_item *update_rep;
+ struct packet_context *packet;
+ uint16_t fde_flags;
const char *our_ip;
- DATA_BLOB update_blob;
+ /* prepare the outgoing request */
req->opcode = WREPL_OPCODE_BITS;
req->assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
req->mess_type = WREPL_REPLICATION;
@@ -832,40 +850,64 @@ static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *s
our_ip, our_ip, state->full_table);
NT_STATUS_NOT_OK_RETURN(status);
- state->req = wrepl_request_send(state->wreplconn->sock, req);
+ /* queue the request */
+ state->req = wrepl_request_send(state->wreplconn->sock, req, NULL);
NT_STATUS_HAVE_NO_MEMORY(state->req);
+ /*
+ * now we need to convert the wrepl_socket (client connection)
+ * into a wreplsrv_in_connection (server connection), because
+ * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
+ * message is received by the peer.
+ */
+
+ /* steal the socket_context */
sock = state->wreplconn->sock->sock;
- talloc_steal(state, state->wreplconn->sock->sock);
state->wreplconn->sock->sock = NULL;
+ talloc_steal(state, sock);
- update_blob = state->req->buffer;
- talloc_steal(state, state->req->buffer.data);
+ /*
+ * steal the packet_context
+ * note the request DATA_BLOB we just send on the
+ * wrepl_socket (client connection) is still unter the
+ * packet context and will be send to the wire
+ */
+ packet = state->wreplconn->sock->packet;
+ state->wreplconn->sock->packet = NULL;
+ talloc_steal(state, packet);
+
+ /*
+ * get the fde_flags of the old fde event,
+ * so that we can later set the same flags to the new one
+ */
+ fde_flags = event_get_fd_flags(state->wreplconn->sock->event.fde);
+ /*
+ * free the wrepl_socket (client connection)
+ */
talloc_free(state->wreplconn->sock);
state->wreplconn->sock = NULL;
+ /*
+ * now create a wreplsrv_in_connection,
+ * on which we act as server
+ *
+ * NOTE: sock and packet will be stolen by
+ * wreplsrv_in_connection_merge()
+ */
status = wreplsrv_in_connection_merge(state->io->in.partner,
- sock, &wrepl_in);
+ sock, packet, &wrepl_in);
NT_STATUS_NOT_OK_RETURN(status);
+ event_set_fd_flags(wrepl_in->conn->event.fde, fde_flags);
+
wrepl_in->assoc_ctx.peer_ctx = state->wreplconn->assoc_ctx.peer_ctx;
wrepl_in->assoc_ctx.our_ctx = 0;
- update_rep = talloc(wrepl_in, struct data_blob_list_item);
- NT_STATUS_HAVE_NO_MEMORY(update_rep);
-
- update_rep->blob = update_blob;
- talloc_steal(update_rep, update_blob.data);
-
+ /* now we can free the wreplsrv_out_connection */
talloc_free(state->wreplconn);
state->wreplconn = NULL;
- if (!wrepl_in->send_queue) {
- EVENT_FD_WRITEABLE(wrepl_in->conn->event.fde);
- }
- DLIST_ADD_END(wrepl_in->send_queue, update_rep, struct data_blob_list_item *);
-
state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
return NT_STATUS_OK;
@@ -893,11 +935,12 @@ static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *s
our_ip, our_ip, state->full_table);
NT_STATUS_NOT_OK_RETURN(status);
- state->req = wrepl_request_send(state->wreplconn->sock, req);
+ /* we won't get a reply to a inform message */
+ state->ctrl.send_only = True;
+
+ state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl);
NT_STATUS_HAVE_NO_MEMORY(state->req);
- /* we won't get a reply to a inform message */
- state->req->send_only = True;
state->req->async.fn = wreplsrv_push_notify_handler_req;
state->req->async.private = state;