summaryrefslogtreecommitdiff
path: root/source4/wrepl_server
diff options
context:
space:
mode:
authorStefan Metzmacher <metze@samba.org>2005-12-12 21:31:42 +0000
committerGerald (Jerry) Carter <jerry@samba.org>2007-10-10 13:47:17 -0500
commit36acd6e79c8cb881b9c333313402d993a6d0f511 (patch)
tree29738d21349a159c3cd28d4577779c6c7e8f209e /source4/wrepl_server
parentb052ab8ad4ea31a45d61d5c6d8e34b6879848045 (diff)
downloadsamba-36acd6e79c8cb881b9c333313402d993a6d0f511.tar.gz
samba-36acd6e79c8cb881b9c333313402d993a6d0f511.tar.bz2
samba-36acd6e79c8cb881b9c333313402d993a6d0f511.zip
r12200: - move the the winsreplication client and server code to the packet_context
system - this needs to be in one big patch, because of the merging code, that changes client in server connections and the other way around - use socket_connect_send/_recv() in the client code metze (This used to be commit f0105b7fcdc3032d22444a1973927fff2dd9a06f)
Diffstat (limited to 'source4/wrepl_server')
-rw-r--r--source4/wrepl_server/wrepl_in_call.c9
-rw-r--r--source4/wrepl_server/wrepl_in_connection.c271
-rw-r--r--source4/wrepl_server/wrepl_out_helpers.c105
-rw-r--r--source4/wrepl_server/wrepl_server.h21
4 files changed, 193 insertions, 213 deletions
diff --git a/source4/wrepl_server/wrepl_in_call.c b/source4/wrepl_server/wrepl_in_call.c
index 718442a288..d186152848 100644
--- a/source4/wrepl_server/wrepl_in_call.c
+++ b/source4/wrepl_server/wrepl_in_call.c
@@ -106,7 +106,7 @@ static NTSTATUS wreplsrv_in_stop_association(struct wreplsrv_in_call *call)
}
/* this will cause to not receive packets anymore and terminate the connection if the reply is send */
- call->wreplconn->terminate = True;
+ call->terminate_after_send = True;
return wreplsrv_in_stop_assoc_ctx(call);
}
@@ -315,6 +315,7 @@ static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call)
struct wreplsrv_out_connection *wrepl_out;
struct wrepl_table *update_in = &call->req_packet.message.replication.info.table;
struct wreplsrv_in_update_state *update_state;
+ uint16_t fde_flags;
DEBUG(2,("WREPL_REPL_UPDATE: partner[%s] initiator[%s] num_owners[%u]\n",
call->wreplconn->partner->address,
@@ -325,6 +326,7 @@ static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call)
* and do a WREPL_REPL_SEND_REQUEST's on the that connection
* and then stop this connection
*/
+ fde_flags = event_get_fd_flags(wrepl_in->conn->event.fde);
talloc_free(wrepl_in->conn->event.fde);
wrepl_in->conn->event.fde = NULL;
@@ -339,9 +341,12 @@ static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call)
wrepl_out->assoc_ctx.peer_ctx = wrepl_in->assoc_ctx.peer_ctx;
wrepl_out->sock = wrepl_socket_merge(wrepl_out,
wrepl_in->conn->event.ctx,
- wrepl_in->conn->socket);
+ wrepl_in->conn->socket,
+ wrepl_in->packet);
NT_STATUS_HAVE_NO_MEMORY(wrepl_out->sock);
+ event_set_fd_flags(wrepl_out->sock->event.fde, fde_flags);
+
update_state->wrepl_in = wrepl_in;
update_state->wrepl_out = wrepl_out;
update_state->cycle_io.in.partner = wrepl_out->partner;
diff --git a/source4/wrepl_server/wrepl_in_connection.c b/source4/wrepl_server/wrepl_in_connection.c
index e06e69103e..a6abf99e41 100644
--- a/source4/wrepl_server/wrepl_in_connection.c
+++ b/source4/wrepl_server/wrepl_in_connection.c
@@ -24,6 +24,7 @@
#include "dlinklist.h"
#include "lib/events/events.h"
#include "lib/socket/socket.h"
+#include "lib/stream/packet.h"
#include "smbd/service_task.h"
#include "smbd/service_stream.h"
#include "lib/messaging/irpc.h"
@@ -37,226 +38,156 @@ void wreplsrv_terminate_in_connection(struct wreplsrv_in_connection *wreplconn,
stream_terminate_connection(wreplconn->conn, reason);
}
-/*
- called when we get a new connection
-*/
-static void wreplsrv_accept(struct stream_connection *conn)
+static int terminate_after_send_destructor(void *ptr)
{
- struct wreplsrv_service *service = talloc_get_type(conn->private, struct wreplsrv_service);
- struct wreplsrv_in_connection *wreplconn;
- const char *peer_ip;
-
- wreplconn = talloc_zero(conn, struct wreplsrv_in_connection);
- if (!wreplconn) {
- stream_terminate_connection(conn, "wreplsrv_accept: out of memory");
- return;
- }
-
- wreplconn->conn = conn;
- wreplconn->service = service;
- wreplconn->our_ip = socket_get_my_addr(conn->socket, wreplconn);
- if (!wreplconn->our_ip) {
- wreplsrv_terminate_in_connection(wreplconn, "wreplsrv_accept: out of memory");
- return;
- }
-
- peer_ip = socket_get_peer_addr(conn->socket, wreplconn);
- if (!peer_ip) {
- wreplsrv_terminate_in_connection(wreplconn, "wreplsrv_accept: out of memory");
- return;
- }
-
- wreplconn->partner = wreplsrv_find_partner(service, peer_ip);
-
- conn->private = wreplconn;
-
- irpc_add_name(conn->msg_ctx, "wreplsrv_connection");
+ struct wreplsrv_in_connection **tas = talloc_get_type(ptr, struct wreplsrv_in_connection *);
+ wreplsrv_terminate_in_connection(*tas, "wreplsrv_in_connection: terminate_after_send");
+ return 0;
}
/*
receive some data on a WREPL connection
*/
-static void wreplsrv_recv(struct stream_connection *conn, uint16_t flags)
+static NTSTATUS wreplsrv_recv_request(void *private, DATA_BLOB blob)
{
- struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private, struct wreplsrv_in_connection);
+ struct wreplsrv_in_connection *wreplconn = talloc_get_type(private, struct wreplsrv_in_connection);
struct wreplsrv_in_call *call;
DATA_BLOB packet_in_blob;
DATA_BLOB packet_out_blob;
struct wrepl_wrap packet_out_wrap;
- struct data_blob_list_item *rep;
- NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
- size_t nread;
-
- /* avoid recursion, because of half async code */
- if (wreplconn->processing) {
- EVENT_FD_NOT_READABLE(conn->event.fde);
- return;
- }
-
- if (wreplconn->partial.length == 0) {
- wreplconn->partial = data_blob_talloc(wreplconn, NULL, 4);
- if (wreplconn->partial.data == NULL) {
- status = NT_STATUS_NO_MEMORY;
- goto failed;
- }
- wreplconn->partial_read = 0;
- }
-
- /* read in the packet length */
- if (wreplconn->partial_read < 4) {
- uint32_t packet_length;
-
- status = socket_recv(conn->socket,
- wreplconn->partial.data + wreplconn->partial_read,
- 4 - wreplconn->partial_read,
- &nread, 0);
- if (NT_STATUS_IS_ERR(status)) goto failed;
- if (!NT_STATUS_IS_OK(status)) return;
-
- wreplconn->partial_read += nread;
- if (wreplconn->partial_read != 4) return;
-
- packet_length = RIVAL(wreplconn->partial.data, 0) + 4;
-
- wreplconn->partial.data = talloc_realloc(wreplconn, wreplconn->partial.data,
- uint8_t, packet_length);
- if (wreplconn->partial.data == NULL) {
- status = NT_STATUS_NO_MEMORY;
- goto failed;
- }
- wreplconn->partial.length = packet_length;
- }
-
- /* read in the body */
- status = socket_recv(conn->socket,
- wreplconn->partial.data + wreplconn->partial_read,
- wreplconn->partial.length - wreplconn->partial_read,
- &nread, 0);
- if (NT_STATUS_IS_ERR(status)) goto failed;
- if (!NT_STATUS_IS_OK(status)) return;
-
- wreplconn->partial_read += nread;
- if (wreplconn->partial_read != wreplconn->partial.length) return;
-
- packet_in_blob.data = wreplconn->partial.data + 4;
- packet_in_blob.length = wreplconn->partial.length - 4;
+ NTSTATUS status;
call = talloc_zero(wreplconn, struct wreplsrv_in_call);
- if (!call) {
- status = NT_STATUS_NO_MEMORY;
- goto failed;
- }
+ NT_STATUS_HAVE_NO_MEMORY(call);
call->wreplconn = wreplconn;
+ talloc_steal(call, blob.data);
- /* we have a full request - parse it */
- status = ndr_pull_struct_blob(&packet_in_blob,
- call, &call->req_packet,
- (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
- if (!NT_STATUS_IS_OK(status)) {
- DEBUG(2,("Failed to parse incoming WINS-Replication packet - %s\n",
- nt_errstr(status)));
- DEBUG(10,("packet length %lu\n", (long)wreplconn->partial.length));
- NDR_PRINT_DEBUG(wrepl_packet, &call->req_packet);
- goto failed;
- }
+ packet_in_blob.data = blob.data + 4;
+ packet_in_blob.length = blob.length - 4;
- /*
- * we have parsed the request, so we can reset the wreplconn->partial_read,
- * maybe we could also free wreplconn->partial, but for now we keep it,
- * and overwrite it the next time
- */
- wreplconn->partial_read = 0;
+ status = ndr_pull_struct_blob(&packet_in_blob, call, &call->req_packet,
+ (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
+ NT_STATUS_NOT_OK_RETURN(status);
if (DEBUGLVL(10)) {
- DEBUG(10,("Received WINS-Replication packet of length %lu\n", (long)wreplconn->partial.length));
+ DEBUG(10,("Received WINS-Replication packet of length %u\n", packet_in_blob.length + 4));
NDR_PRINT_DEBUG(wrepl_packet, &call->req_packet);
}
- /* actually process the request */
- wreplconn->processing = True;
status = wreplsrv_in_call(call);
- wreplconn->processing = False;
- if (NT_STATUS_IS_ERR(status)) goto failed;
+ NT_STATUS_IS_ERR_RETURN(status);
if (!NT_STATUS_IS_OK(status)) {
/* w2k just ignores invalid packets, so we do */
DEBUG(10,("Received WINS-Replication packet was invalid, we just ignore it\n"));
talloc_free(call);
- return;
+ return NT_STATUS_OK;
}
/* and now encode the reply */
packet_out_wrap.packet = call->rep_packet;
status = ndr_push_struct_blob(&packet_out_blob, call, &packet_out_wrap,
(ndr_push_flags_fn_t)ndr_push_wrepl_wrap);
- if (!NT_STATUS_IS_OK(status)) goto failed;
+ NT_STATUS_NOT_OK_RETURN(status);
if (DEBUGLVL(10)) {
DEBUG(10,("Sending WINS-Replication packet of length %d\n", (int)packet_out_blob.length));
NDR_PRINT_DEBUG(wrepl_packet, &call->rep_packet);
}
- rep = talloc(wreplconn, struct data_blob_list_item);
- if (!rep) {
- status = NT_STATUS_NO_MEMORY;
- goto failed;
+ if (call->terminate_after_send) {
+ struct wreplsrv_in_connection **tas;
+ tas = talloc(packet_out_blob.data, struct wreplsrv_in_connection *);
+ NT_STATUS_HAVE_NO_MEMORY(tas);
+ *tas = wreplconn;
+ talloc_set_destructor(tas, terminate_after_send_destructor);
}
- rep->blob = packet_out_blob;
- talloc_steal(rep, packet_out_blob.data);
- /* we don't need the call anymore */
- talloc_free(call);
+ status = packet_send(wreplconn->packet, packet_out_blob);
+ NT_STATUS_NOT_OK_RETURN(status);
- if (!wreplconn->send_queue) {
- EVENT_FD_WRITEABLE(conn->event.fde);
- }
- DLIST_ADD_END(wreplconn->send_queue, rep, struct data_blob_list_item *);
+ talloc_free(call);
+ return NT_STATUS_OK;
+}
- if (wreplconn->terminate) {
- EVENT_FD_NOT_READABLE(conn->event.fde);
- } else {
- EVENT_FD_READABLE(conn->event.fde);
- }
- return;
+/*
+ called when the socket becomes readable
+*/
+static void wreplsrv_recv(struct stream_connection *conn, uint16_t flags)
+{
+ struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private,
+ struct wreplsrv_in_connection);
-failed:
- wreplsrv_terminate_in_connection(wreplconn, nt_errstr(status));
+ packet_recv(wreplconn->packet);
}
/*
- called when we can write to a connection
+ called when the socket becomes writable
*/
static void wreplsrv_send(struct stream_connection *conn, uint16_t flags)
{
- struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private, struct wreplsrv_in_connection);
- NTSTATUS status;
+ struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private,
+ struct wreplsrv_in_connection);
+ packet_queue_run(wreplconn->packet);
+}
- while (wreplconn->send_queue) {
- struct data_blob_list_item *rep = wreplconn->send_queue;
- size_t sendlen;
+/*
+ handle socket recv errors
+*/
+static void wreplsrv_recv_error(void *private, NTSTATUS status)
+{
+ struct wreplsrv_in_connection *wreplconn = talloc_get_type(private,
+ struct wreplsrv_in_connection);
+ wreplsrv_terminate_in_connection(wreplconn, nt_errstr(status));
+}
- status = socket_send(conn->socket, &rep->blob, &sendlen, 0);
- if (NT_STATUS_IS_ERR(status)) goto failed;
- if (!NT_STATUS_IS_OK(status)) return;
+/*
+ called when we get a new connection
+*/
+static void wreplsrv_accept(struct stream_connection *conn)
+{
+ struct wreplsrv_service *service = talloc_get_type(conn->private, struct wreplsrv_service);
+ struct wreplsrv_in_connection *wreplconn;
+ const char *peer_ip;
- rep->blob.length -= sendlen;
- rep->blob.data += sendlen;
+ wreplconn = talloc_zero(conn, struct wreplsrv_in_connection);
+ if (!wreplconn) {
+ stream_terminate_connection(conn, "wreplsrv_accept: out of memory");
+ return;
+ }
- if (rep->blob.length == 0) {
- DLIST_REMOVE(wreplconn->send_queue, rep);
- talloc_free(rep);
- }
+ wreplconn->packet = packet_init(wreplconn);
+ if (!wreplconn->packet) {
+ wreplsrv_terminate_in_connection(wreplconn, "wreplsrv_accept: out of memory");
+ return;
}
+ packet_set_private(wreplconn->packet, wreplconn);
+ packet_set_socket(wreplconn->packet, conn->socket);
+ packet_set_callback(wreplconn->packet, wreplsrv_recv_request);
+ packet_set_full_request(wreplconn->packet, packet_full_request_u32);
+ packet_set_error_handler(wreplconn->packet, wreplsrv_recv_error);
+ packet_set_event_context(wreplconn->packet, conn->event.ctx);
+ packet_set_fde(wreplconn->packet, conn->event.fde);
+ packet_set_serialise(wreplconn->packet);
- if (wreplconn->terminate) {
- wreplsrv_terminate_in_connection(wreplconn, "connection terminated after all pending packets are send");
+ wreplconn->conn = conn;
+ wreplconn->service = service;
+ wreplconn->our_ip = socket_get_my_addr(conn->socket, wreplconn);
+ if (!wreplconn->our_ip) {
+ wreplsrv_terminate_in_connection(wreplconn, "wreplsrv_accept: out of memory");
return;
}
- EVENT_FD_NOT_WRITEABLE(conn->event.fde);
- return;
+ peer_ip = socket_get_peer_addr(conn->socket, wreplconn);
+ if (!peer_ip) {
+ wreplsrv_terminate_in_connection(wreplconn, "wreplsrv_accept: out of memory");
+ return;
+ }
-failed:
- wreplsrv_terminate_in_connection(wreplconn, nt_errstr(status));
+ wreplconn->partner = wreplsrv_find_partner(service, peer_ip);
+
+ conn->private = wreplconn;
+
+ irpc_add_name(conn->msg_ctx, "wreplsrv_connection");
}
static const struct stream_server_ops wreplsrv_stream_ops = {
@@ -271,6 +202,7 @@ static const struct stream_server_ops wreplsrv_stream_ops = {
*/
NTSTATUS wreplsrv_in_connection_merge(struct wreplsrv_partner *partner,
struct socket_context *sock,
+ struct packet_context *packet,
struct wreplsrv_in_connection **_wrepl_in)
{
struct wreplsrv_service *service = partner->service;
@@ -301,9 +233,26 @@ NTSTATUS wreplsrv_in_connection_merge(struct wreplsrv_partner *partner,
wrepl_in, &conn);
NT_STATUS_NOT_OK_RETURN(status);
+ /*
+ * make the wreplsrv_in_connection structure a child of the
+ * stream_connection, to match the hierachie of wreplsrv_accept
+ */
wrepl_in->conn = conn;
talloc_steal(conn, wrepl_in);
+ /*
+ * now update the packet handling callback,...
+ */
+ wrepl_in->packet = talloc_steal(wrepl_in, packet);
+ packet_set_private(wrepl_in->packet, wrepl_in);
+ packet_set_socket(wrepl_in->packet, conn->socket);
+ packet_set_callback(wrepl_in->packet, wreplsrv_recv_request);
+ packet_set_full_request(wrepl_in->packet, packet_full_request_u32);
+ packet_set_error_handler(wrepl_in->packet, wreplsrv_recv_error);
+ packet_set_event_context(wrepl_in->packet, conn->event.ctx);
+ packet_set_fde(wrepl_in->packet, conn->event.fde);
+ packet_set_serialise(wrepl_in->packet);
+
*_wrepl_in = wrepl_in;
return NT_STATUS_OK;
}
diff --git a/source4/wrepl_server/wrepl_out_helpers.c b/source4/wrepl_server/wrepl_out_helpers.c
index c7c6f55767..d19c38bfb8 100644
--- a/source4/wrepl_server/wrepl_out_helpers.c
+++ b/source4/wrepl_server/wrepl_out_helpers.c
@@ -45,24 +45,26 @@ struct wreplsrv_out_connect_state {
enum wreplsrv_out_connect_stage stage;
struct composite_context *c;
struct wrepl_request *req;
+ struct composite_context *c_req;
struct wrepl_associate assoc_io;
enum winsrepl_partner_type type;
struct wreplsrv_out_connection *wreplconn;
};
-static void wreplsrv_out_connect_handler(struct wrepl_request *req);
+static void wreplsrv_out_connect_handler_creq(struct composite_context *c_req);
+static void wreplsrv_out_connect_handler_req(struct wrepl_request *req);
static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
{
NTSTATUS status;
- status = wrepl_connect_recv(state->req);
+ status = wrepl_connect_recv(state->c_req);
NT_STATUS_NOT_OK_RETURN(status);
state->req = wrepl_associate_send(state->wreplconn->sock, &state->assoc_io);
NT_STATUS_HAVE_NO_MEMORY(state->req);
- state->req->async.fn = wreplsrv_out_connect_handler;
+ state->req->async.fn = wreplsrv_out_connect_handler_req;
state->req->async.private = state;
state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
@@ -92,10 +94,8 @@ static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_
return NT_STATUS_OK;
}
-static void wreplsrv_out_connect_handler(struct wrepl_request *req)
+static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state)
{
- struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private,
- struct wreplsrv_out_connect_state);
struct composite_context *c = state->c;
switch (state->stage) {
@@ -119,6 +119,22 @@ static void wreplsrv_out_connect_handler(struct wrepl_request *req)
}
}
+static void wreplsrv_out_connect_handler_creq(struct composite_context *creq)
+{
+ struct wreplsrv_out_connect_state *state = talloc_get_type(creq->async.private_data,
+ struct wreplsrv_out_connect_state);
+ wreplsrv_out_connect_handler(state);
+ return;
+}
+
+static void wreplsrv_out_connect_handler_req(struct wrepl_request *req)
+{
+ struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private,
+ struct wreplsrv_out_connect_state);
+ wreplsrv_out_connect_handler(state);
+ return;
+}
+
static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
enum winsrepl_partner_type type,
struct wreplsrv_out_connection *wreplconn)
@@ -179,13 +195,13 @@ static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partn
state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
state->wreplconn= wreplconn;
- state->req = wrepl_connect_send(wreplconn->sock,
+ state->c_req = wrepl_connect_send(wreplconn->sock,
partner->our_address,
partner->address);
- if (!state->req) goto failed;
+ if (!state->c_req) goto failed;
- state->req->async.fn = wreplsrv_out_connect_handler;
- state->req->async.private = state;
+ state->c_req->async.fn = wreplsrv_out_connect_handler_creq;
+ state->c_req->async.private_data = state;
return c;
failed:
@@ -796,6 +812,7 @@ struct wreplsrv_push_notify_state {
struct wreplsrv_push_notify_io *io;
enum wrepl_replication_cmd command;
BOOL full_table;
+ struct wrepl_send_ctrl ctrl;
struct wrepl_request *req;
struct wrepl_packet req_packet;
struct wrepl_packet *rep_packet;
@@ -815,10 +832,11 @@ static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *s
struct wreplsrv_in_connection *wrepl_in;
NTSTATUS status;
struct socket_context *sock;
- struct data_blob_list_item *update_rep;
+ struct packet_context *packet;
+ uint16_t fde_flags;
const char *our_ip;
- DATA_BLOB update_blob;
+ /* prepare the outgoing request */
req->opcode = WREPL_OPCODE_BITS;
req->assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
req->mess_type = WREPL_REPLICATION;
@@ -832,40 +850,64 @@ static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *s
our_ip, our_ip, state->full_table);
NT_STATUS_NOT_OK_RETURN(status);
- state->req = wrepl_request_send(state->wreplconn->sock, req);
+ /* queue the request */
+ state->req = wrepl_request_send(state->wreplconn->sock, req, NULL);
NT_STATUS_HAVE_NO_MEMORY(state->req);
+ /*
+ * now we need to convert the wrepl_socket (client connection)
+ * into a wreplsrv_in_connection (server connection), because
+ * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
+ * message is received by the peer.
+ */
+
+ /* steal the socket_context */
sock = state->wreplconn->sock->sock;
- talloc_steal(state, state->wreplconn->sock->sock);
state->wreplconn->sock->sock = NULL;
+ talloc_steal(state, sock);
- update_blob = state->req->buffer;
- talloc_steal(state, state->req->buffer.data);
+ /*
+ * steal the packet_context
+ * note the request DATA_BLOB we just send on the
+ * wrepl_socket (client connection) is still unter the
+ * packet context and will be send to the wire
+ */
+ packet = state->wreplconn->sock->packet;
+ state->wreplconn->sock->packet = NULL;
+ talloc_steal(state, packet);
+
+ /*
+ * get the fde_flags of the old fde event,
+ * so that we can later set the same flags to the new one
+ */
+ fde_flags = event_get_fd_flags(state->wreplconn->sock->event.fde);
+ /*
+ * free the wrepl_socket (client connection)
+ */
talloc_free(state->wreplconn->sock);
state->wreplconn->sock = NULL;
+ /*
+ * now create a wreplsrv_in_connection,
+ * on which we act as server
+ *
+ * NOTE: sock and packet will be stolen by
+ * wreplsrv_in_connection_merge()
+ */
status = wreplsrv_in_connection_merge(state->io->in.partner,
- sock, &wrepl_in);
+ sock, packet, &wrepl_in);
NT_STATUS_NOT_OK_RETURN(status);
+ event_set_fd_flags(wrepl_in->conn->event.fde, fde_flags);
+
wrepl_in->assoc_ctx.peer_ctx = state->wreplconn->assoc_ctx.peer_ctx;
wrepl_in->assoc_ctx.our_ctx = 0;
- update_rep = talloc(wrepl_in, struct data_blob_list_item);
- NT_STATUS_HAVE_NO_MEMORY(update_rep);
-
- update_rep->blob = update_blob;
- talloc_steal(update_rep, update_blob.data);
-
+ /* now we can free the wreplsrv_out_connection */
talloc_free(state->wreplconn);
state->wreplconn = NULL;
- if (!wrepl_in->send_queue) {
- EVENT_FD_WRITEABLE(wrepl_in->conn->event.fde);
- }
- DLIST_ADD_END(wrepl_in->send_queue, update_rep, struct data_blob_list_item *);
-
state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
return NT_STATUS_OK;
@@ -893,11 +935,12 @@ static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *s
our_ip, our_ip, state->full_table);
NT_STATUS_NOT_OK_RETURN(status);
- state->req = wrepl_request_send(state->wreplconn->sock, req);
+ /* we won't get a reply to a inform message */
+ state->ctrl.send_only = True;
+
+ state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl);
NT_STATUS_HAVE_NO_MEMORY(state->req);
- /* we won't get a reply to a inform message */
- state->req->send_only = True;
state->req->async.fn = wreplsrv_push_notify_handler_req;
state->req->async.private = state;
diff --git a/source4/wrepl_server/wrepl_server.h b/source4/wrepl_server/wrepl_server.h
index 7025bff496..118686622e 100644
--- a/source4/wrepl_server/wrepl_server.h
+++ b/source4/wrepl_server/wrepl_server.h
@@ -35,6 +35,7 @@ struct wreplsrv_in_call {
struct wreplsrv_in_connection *wreplconn;
struct wrepl_packet req_packet;
struct wrepl_packet rep_packet;
+ BOOL terminate_after_send;
};
/*
@@ -43,6 +44,7 @@ struct wreplsrv_in_call {
struct wreplsrv_in_connection {
struct wreplsrv_in_connection *prev,*next;
struct stream_connection *conn;
+ struct packet_context *packet;
/* our global service context */
struct wreplsrv_service *service;
@@ -67,25 +69,6 @@ struct wreplsrv_in_connection {
uint32_t our_ctx;
uint32_t peer_ctx;
} assoc_ctx;
-
- /* the partial input on the connection */
- DATA_BLOB partial;
- size_t partial_read;
-
- /*
- * are we currently processing a request?
- * this prevents loops, with half async code
- */
- BOOL processing;
-
- /*
- * if this is set we no longer accept incoming packets
- * and terminate the connection after we have send all packets
- */
- BOOL terminate;
-
- /* the list of outgoing DATA_BLOB's that needs to be send */
- struct data_blob_list_item *send_queue;
};
/*