diff options
-rw-r--r-- | source4/include/structs.h | 1 | ||||
-rw-r--r-- | source4/libcli/wrepl/winsrepl.c | 546 | ||||
-rw-r--r-- | source4/libcli/wrepl/winsrepl.h | 27 | ||||
-rw-r--r-- | source4/torture/nbt/winsreplication.c | 15 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_in_call.c | 9 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_in_connection.c | 271 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_out_helpers.c | 105 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_server.h | 21 |
8 files changed, 442 insertions, 553 deletions
diff --git a/source4/include/structs.h b/source4/include/structs.h index ca52ad5336..89b8bf9dcf 100644 --- a/source4/include/structs.h +++ b/source4/include/structs.h @@ -300,6 +300,7 @@ struct wreplsrv_push_notify_io; struct winsdb_record; struct wrepl_packet; +struct wrepl_send_ctrl; struct wrepl_associate; struct wrepl_associate_stop; struct wrepl_pull_table; diff --git a/source4/libcli/wrepl/winsrepl.c b/source4/libcli/wrepl/winsrepl.c index 5658a2cc03..109910be1f 100644 --- a/source4/libcli/wrepl/winsrepl.c +++ b/source4/libcli/wrepl/winsrepl.c @@ -25,6 +25,10 @@ #include "dlinklist.h" #include "lib/socket/socket.h" #include "libcli/wrepl/winsrepl.h" +#include "lib/stream/packet.h" +#include "libcli/composite/composite.h" + +static struct wrepl_request *wrepl_request_finished(struct wrepl_request *req, NTSTATUS status); /* mark all pending requests as dead - called when a socket error happens @@ -34,12 +38,15 @@ static void wrepl_socket_dead(struct wrepl_socket *wrepl_socket, NTSTATUS status talloc_set_destructor(wrepl_socket, NULL); wrepl_socket->dead = True; - if (wrepl_socket->fde) { - talloc_free(wrepl_socket->fde); - wrepl_socket->fde = NULL; + if (wrepl_socket->event.fde) { + packet_recv_disable(wrepl_socket->packet); + packet_set_fde(wrepl_socket->packet, NULL); + talloc_free(wrepl_socket->event.fde); + wrepl_socket->event.fde = NULL; } if (wrepl_socket->sock) { + packet_set_socket(wrepl_socket->packet, NULL); talloc_free(wrepl_socket->sock); wrepl_socket->sock = NULL; } @@ -47,23 +54,10 @@ static void wrepl_socket_dead(struct wrepl_socket *wrepl_socket, NTSTATUS status if (NT_STATUS_EQUAL(NT_STATUS_UNSUCCESSFUL, status)) { status = NT_STATUS_UNEXPECTED_NETWORK_ERROR; } - while (wrepl_socket->send_queue) { - struct wrepl_request *req = wrepl_socket->send_queue; - DLIST_REMOVE(wrepl_socket->send_queue, req); - req->state = WREPL_REQUEST_ERROR; - req->status = status; - if (req->async.fn) { - req->async.fn(req); - } - } while (wrepl_socket->recv_queue) { struct wrepl_request *req = wrepl_socket->recv_queue; DLIST_REMOVE(wrepl_socket->recv_queue, req); - req->state = WREPL_REQUEST_ERROR; - req->status = status; - if (req->async.fn) { - req->async.fn(req); - } + wrepl_request_finished(req, status); } } @@ -75,179 +69,38 @@ static void wrepl_request_timeout_handler(struct event_context *ev, struct timed } /* - handle send events -*/ -static void wrepl_handler_send(struct wrepl_socket *wrepl_socket) -{ - while (wrepl_socket->send_queue) { - struct wrepl_request *req = wrepl_socket->send_queue; - size_t nsent; - NTSTATUS status; - - status = socket_send(wrepl_socket->sock, &req->buffer, &nsent, 0); - if (NT_STATUS_IS_ERR(status)) { - wrepl_socket_dead(wrepl_socket, status); - return; - } - if (!NT_STATUS_IS_OK(status) || nsent == 0) return; - - req->buffer.data += nsent; - req->buffer.length -= nsent; - if (req->buffer.length != 0) { - return; - } - - if (req->disconnect_after_send) { - DLIST_REMOVE(wrepl_socket->send_queue, req); - req->status = NT_STATUS_OK; - req->state = WREPL_REQUEST_DONE; - wrepl_socket_dead(wrepl_socket, NT_STATUS_LOCAL_DISCONNECT); - if (req->async.fn) { - req->async.fn(req); - } - return; - } - - if (req->send_only) { - DLIST_REMOVE(wrepl_socket->send_queue, req); - req->status = NT_STATUS_OK; - req->state = WREPL_REQUEST_DONE; - if (req->async.fn) { - EVENT_FD_READABLE(wrepl_socket->fde); - req->async.fn(req); - return; - } - } else { - DLIST_REMOVE(wrepl_socket->send_queue, req); - DLIST_ADD_END(wrepl_socket->recv_queue, req, struct wrepl_request *); - req->state = WREPL_REQUEST_RECV; - } - - EVENT_FD_READABLE(wrepl_socket->fde); - } - - EVENT_FD_NOT_WRITEABLE(wrepl_socket->fde); -} - - -/* handle recv events */ -static void wrepl_handler_recv(struct wrepl_socket *wrepl_socket) +static NTSTATUS wrepl_finish_recv(void *private, DATA_BLOB packet_blob_in) { - size_t nread; + struct wrepl_socket *wrepl_socket = talloc_get_type(private, struct wrepl_socket); struct wrepl_request *req = wrepl_socket->recv_queue; DATA_BLOB blob; - if (req == NULL) { - NTSTATUS status; - - EVENT_FD_NOT_READABLE(wrepl_socket->fde); - - status = socket_recv(wrepl_socket->sock, NULL, 0, &nread, 0); - if (NT_STATUS_EQUAL(NT_STATUS_END_OF_FILE,status)) return; - if (NT_STATUS_IS_ERR(status)) { - wrepl_socket_dead(wrepl_socket, status); - return; - } - return; - } - - if (req->buffer.length == 0) { - req->buffer = data_blob_talloc(req, NULL, 4); - if (req->buffer.data == NULL) { - req->status = NT_STATUS_NO_MEMORY; - goto failed; - } - req->num_read = 0; - } - - /* read in the packet length */ - if (req->num_read < 4) { - uint32_t req_length; - - req->status = socket_recv(wrepl_socket->sock, - req->buffer.data + req->num_read, - 4 - req->num_read, - &nread, 0); - if (NT_STATUS_IS_ERR(req->status)) { - wrepl_socket_dead(wrepl_socket, req->status); - return; - } - if (!NT_STATUS_IS_OK(req->status)) return; - - req->num_read += nread; - if (req->num_read != 4) return; - - req_length = RIVAL(req->buffer.data, 0) + 4; - - req->buffer.data = talloc_realloc(req, req->buffer.data, - uint8_t, req_length); - if (req->buffer.data == NULL) { - req->status = NT_STATUS_NO_MEMORY; - goto failed; - } - req->buffer.length = req_length; - } - - /* read in the body */ - req->status = socket_recv(wrepl_socket->sock, - req->buffer.data + req->num_read, - req->buffer.length - req->num_read, - &nread, 0); - if (NT_STATUS_IS_ERR(req->status)) { - wrepl_socket_dead(wrepl_socket, req->status); - return; - } - if (!NT_STATUS_IS_OK(req->status)) return; - - req->num_read += nread; - if (req->num_read != req->buffer.length) return; - req->packet = talloc(req, struct wrepl_packet); - if (req->packet == NULL) { - req->status = NT_STATUS_NO_MEMORY; - goto failed; - } + NT_STATUS_HAVE_NO_MEMORY(req->packet); - blob.data = req->buffer.data + 4; - blob.length = req->buffer.length - 4; + blob.data = packet_blob_in.data + 4; + blob.length = packet_blob_in.length - 4; /* we have a full request - parse it */ req->status = ndr_pull_struct_blob(&blob, req->packet, req->packet, (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet); if (!NT_STATUS_IS_OK(req->status)) { - DEBUG(2,("Failed to parse incoming WINS packet - %s\n", - nt_errstr(req->status))); - DEBUG(10,("packet length %d\n", (int)req->buffer.length)); - NDR_PRINT_DEBUG(wrepl_packet, req->packet); - goto failed; + wrepl_request_finished(req, req->status); + return NT_STATUS_OK; } if (DEBUGLVL(10)) { - DEBUG(10,("Received WINS packet of length %d\n", (int)req->buffer.length)); + DEBUG(10,("Received WINS packet of length %u\n", packet_blob_in.length)); NDR_PRINT_DEBUG(wrepl_packet, req->packet); } - DLIST_REMOVE(wrepl_socket->recv_queue, req); - req->state = WREPL_REQUEST_DONE; - if (req->async.fn) { - req->async.fn(req); - } - return; - -failed: - if (req->state == WREPL_REQUEST_RECV) { - DLIST_REMOVE(wrepl_socket->recv_queue, req); - } - req->state = WREPL_REQUEST_ERROR; - if (req->async.fn) { - req->async.fn(req); - } + wrepl_request_finished(req, req->status); + return NT_STATUS_OK; } - /* handler for winrepl events */ @@ -256,56 +109,23 @@ static void wrepl_handler(struct event_context *ev, struct fd_event *fde, { struct wrepl_socket *wrepl_socket = talloc_get_type(private, struct wrepl_socket); - if (flags & EVENT_FD_WRITE) { - wrepl_handler_send(wrepl_socket); - } if (flags & EVENT_FD_READ) { - wrepl_handler_recv(wrepl_socket); + packet_recv(wrepl_socket->packet); + return; + } + if (flags & EVENT_FD_WRITE) { + packet_queue_run(wrepl_socket->packet); } } - -/* - handler for winrepl connection completion -*/ -static void wrepl_connect_handler(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private) +static void wrepl_error(void *private, NTSTATUS status) { struct wrepl_socket *wrepl_socket = talloc_get_type(private, struct wrepl_socket); - struct wrepl_request *req = wrepl_socket->recv_queue; - - talloc_free(wrepl_socket->fde); - wrepl_socket->fde = NULL; - - if (req == NULL) return; - - req->status = socket_connect_complete(wrepl_socket->sock, 0); - if (NT_STATUS_IS_ERR(req->status)) goto failed; - - if (!NT_STATUS_IS_OK(req->status)) return; - - wrepl_socket->fde = event_add_fd(wrepl_socket->event_ctx, wrepl_socket, - socket_get_fd(wrepl_socket->sock), - EVENT_FD_WRITE, - wrepl_handler, wrepl_socket); - if (wrepl_socket->fde == NULL) { - req->status = NT_STATUS_NO_MEMORY; - } - - -failed: - DLIST_REMOVE(wrepl_socket->recv_queue, req); - if (!NT_STATUS_IS_OK(req->status)) { - req->state = WREPL_REQUEST_ERROR; - } else { - req->state = WREPL_REQUEST_DONE; - } - if (req->async.fn) { - req->async.fn(req); - } + wrepl_socket_dead(wrepl_socket, status); } + /* destroy a wrepl_socket destructor */ @@ -326,35 +146,22 @@ struct wrepl_socket *wrepl_socket_init(TALLOC_CTX *mem_ctx, struct wrepl_socket *wrepl_socket; NTSTATUS status; - wrepl_socket = talloc(mem_ctx, struct wrepl_socket); - if (wrepl_socket == NULL) goto failed; + wrepl_socket = talloc_zero(mem_ctx, struct wrepl_socket); + if (!wrepl_socket) return NULL; if (event_ctx == NULL) { - wrepl_socket->event_ctx = event_context_init(wrepl_socket); + wrepl_socket->event.ctx = event_context_init(wrepl_socket); } else { - wrepl_socket->event_ctx = talloc_reference(wrepl_socket, event_ctx); + wrepl_socket->event.ctx = talloc_reference(wrepl_socket, event_ctx); } - if (wrepl_socket->event_ctx == NULL) goto failed; + if (!wrepl_socket->event.ctx) goto failed; status = socket_create("ip", SOCKET_TYPE_STREAM, &wrepl_socket->sock, 0); if (!NT_STATUS_IS_OK(status)) goto failed; talloc_steal(wrepl_socket, wrepl_socket->sock); - wrepl_socket->send_queue = NULL; - wrepl_socket->recv_queue = NULL; wrepl_socket->request_timeout = WREPL_SOCKET_REQUEST_TIMEOUT; - wrepl_socket->dead = False; - - wrepl_socket->fde = event_add_fd(wrepl_socket->event_ctx, wrepl_socket, - socket_get_fd(wrepl_socket->sock), - EVENT_FD_WRITE, - wrepl_connect_handler, wrepl_socket); - if (wrepl_socket->fde == NULL) { - goto failed; - } - - set_blocking(socket_get_fd(wrepl_socket->sock), False); talloc_set_destructor(wrepl_socket, wrepl_socket_destructor); @@ -370,32 +177,42 @@ failed: */ struct wrepl_socket *wrepl_socket_merge(TALLOC_CTX *mem_ctx, struct event_context *event_ctx, - struct socket_context *socket) + struct socket_context *socket, + struct packet_context *packet) { struct wrepl_socket *wrepl_socket; - wrepl_socket = talloc(mem_ctx, struct wrepl_socket); + wrepl_socket = talloc_zero(mem_ctx, struct wrepl_socket); if (wrepl_socket == NULL) goto failed; - wrepl_socket->event_ctx = talloc_reference(wrepl_socket, event_ctx); - if (wrepl_socket->event_ctx == NULL) goto failed; + wrepl_socket->event.ctx = talloc_reference(wrepl_socket, event_ctx); + if (wrepl_socket->event.ctx == NULL) goto failed; wrepl_socket->sock = socket; talloc_steal(wrepl_socket, wrepl_socket->sock); - wrepl_socket->send_queue = NULL; - wrepl_socket->recv_queue = NULL; + wrepl_socket->request_timeout = WREPL_SOCKET_REQUEST_TIMEOUT; - wrepl_socket->dead = False; - wrepl_socket->fde = event_add_fd(wrepl_socket->event_ctx, wrepl_socket, - socket_get_fd(wrepl_socket->sock), - 0, - wrepl_handler, wrepl_socket); - if (wrepl_socket->fde == NULL) { + wrepl_socket->event.fde = event_add_fd(wrepl_socket->event.ctx, wrepl_socket, + socket_get_fd(wrepl_socket->sock), + EVENT_FD_READ, + wrepl_handler, wrepl_socket); + if (wrepl_socket->event.fde == NULL) { goto failed; } + wrepl_socket->packet = packet; + talloc_steal(wrepl_socket, wrepl_socket->packet); + packet_set_private(wrepl_socket->packet, wrepl_socket); + packet_set_socket(wrepl_socket->packet, wrepl_socket->sock); + packet_set_callback(wrepl_socket->packet, wrepl_finish_recv); + packet_set_full_request(wrepl_socket->packet, packet_full_request_u32); + packet_set_error_handler(wrepl_socket->packet, wrepl_error); + packet_set_event_context(wrepl_socket->packet, wrepl_socket->event.ctx); + packet_set_fde(wrepl_socket->packet, wrepl_socket->event.fde); + packet_set_serialise(wrepl_socket->packet); + talloc_set_destructor(wrepl_socket, wrepl_socket_destructor); return wrepl_socket; @@ -411,9 +228,6 @@ failed: static int wrepl_request_destructor(void *ptr) { struct wrepl_request *req = talloc_get_type(ptr, struct wrepl_request); - if (req->state == WREPL_REQUEST_SEND) { - DLIST_REMOVE(req->wrepl_socket->send_queue, req); - } if (req->state == WREPL_REQUEST_RECV) { DLIST_REMOVE(req->wrepl_socket->recv_queue, req); } @@ -428,79 +242,108 @@ static NTSTATUS wrepl_request_wait(struct wrepl_request *req) { NT_STATUS_HAVE_NO_MEMORY(req); while (req->state < WREPL_REQUEST_DONE) { - event_loop_once(req->wrepl_socket->event_ctx); + event_loop_once(req->wrepl_socket->event.ctx); } return req->status; } -static void wrepl_request_trigger(struct wrepl_request *req, NTSTATUS status); +struct wrepl_connect_state { + struct composite_context *result; + struct wrepl_socket *wrepl_socket; + struct composite_context *creq; +}; + +/* + handler for winrepl connection completion +*/ +static void wrepl_connect_handler(struct composite_context *creq) +{ + struct wrepl_connect_state *state = talloc_get_type(creq->async.private_data, + struct wrepl_connect_state); + struct wrepl_socket *wrepl_socket = state->wrepl_socket; + struct composite_context *result = state->result; + + result->status = socket_connect_recv(state->creq); + if (!composite_is_ok(result)) return; + + wrepl_socket->event.fde = event_add_fd(wrepl_socket->event.ctx, wrepl_socket, + socket_get_fd(wrepl_socket->sock), + EVENT_FD_READ, + wrepl_handler, wrepl_socket); + if (composite_nomem(wrepl_socket->event.fde, result)) return; + + /* setup the stream -> packet parser */ + wrepl_socket->packet = packet_init(wrepl_socket); + if (composite_nomem(wrepl_socket->packet, result)) return; + packet_set_private(wrepl_socket->packet, wrepl_socket); + packet_set_socket(wrepl_socket->packet, wrepl_socket->sock); + packet_set_callback(wrepl_socket->packet, wrepl_finish_recv); + packet_set_full_request(wrepl_socket->packet, packet_full_request_u32); + packet_set_error_handler(wrepl_socket->packet, wrepl_error); + packet_set_event_context(wrepl_socket->packet, wrepl_socket->event.ctx); + packet_set_fde(wrepl_socket->packet, wrepl_socket->event.fde); + packet_set_serialise(wrepl_socket->packet); + + composite_done(result); +} /* connect a wrepl_socket to a WINS server */ -struct wrepl_request *wrepl_connect_send(struct wrepl_socket *wrepl_socket, - const char *our_ip, const char *peer_ip) +struct composite_context *wrepl_connect_send(struct wrepl_socket *wrepl_socket, + const char *our_ip, const char *peer_ip) { - struct wrepl_request *req; - NTSTATUS status; + struct composite_context *result; + struct wrepl_connect_state *state; - req = talloc_zero(wrepl_socket, struct wrepl_request); - if (req == NULL) goto failed; + result = talloc_zero(wrepl_socket, struct composite_context); + if (!result) return NULL; - req->wrepl_socket = wrepl_socket; - req->state = WREPL_REQUEST_RECV; - - DLIST_ADD_END(wrepl_socket->recv_queue, req, struct wrepl_request *); + result->state = COMPOSITE_STATE_IN_PROGRESS; + result->event_ctx = wrepl_socket->event.ctx; - talloc_set_destructor(req, wrepl_request_destructor); + state = talloc_zero(result, struct wrepl_connect_state); + if (composite_nomem(state, result)) return result; + result->private_data = state; + state->result = result; + state->wrepl_socket = wrepl_socket; if (!our_ip) { our_ip = iface_best_ip(peer_ip); } - status = socket_connect(wrepl_socket->sock, our_ip, 0, peer_ip, - WINS_REPLICATION_PORT, 0); - if (NT_STATUS_IS_OK(status)) { - talloc_free(wrepl_socket->fde); - - wrepl_socket->fde = event_add_fd(wrepl_socket->event_ctx, wrepl_socket, - socket_get_fd(wrepl_socket->sock), - EVENT_FD_WRITE, - wrepl_handler, wrepl_socket); - if (wrepl_socket->fde == NULL) { - status = NT_STATUS_NO_MEMORY; - } - } - - if (!NT_STATUS_EQUAL(status, NT_STATUS_MORE_PROCESSING_REQUIRED)) { - req->wrepl_socket = wrepl_socket; - wrepl_request_trigger(req, status); - return req; - } - - return req; - -failed: - talloc_free(req); - return NULL; + state->creq = socket_connect_send(wrepl_socket->sock, our_ip, 0, + peer_ip, WINS_REPLICATION_PORT, + 0, wrepl_socket->event.ctx); + composite_continue(result, state->creq, wrepl_connect_handler, state); + return result; } /* connect a wrepl_socket to a WINS server - recv side */ -NTSTATUS wrepl_connect_recv(struct wrepl_request *req) +NTSTATUS wrepl_connect_recv(struct composite_context *result) { - return wrepl_request_wait(req); -} + struct wrepl_connect_state *state = talloc_get_type(result->private_data, + struct wrepl_connect_state); + struct wrepl_socket *wrepl_socket = state->wrepl_socket; + NTSTATUS status = composite_wait(result); + if (!NT_STATUS_IS_OK(status)) { + wrepl_socket_dead(wrepl_socket, status); + } + + talloc_free(result); + return status; +} /* connect a wrepl_socket to a WINS server - sync API */ NTSTATUS wrepl_connect(struct wrepl_socket *wrepl_socket, const char *our_ip, const char *peer_ip) { - struct wrepl_request *req = wrepl_connect_send(wrepl_socket, our_ip, peer_ip); - return wrepl_connect_recv(req); + struct composite_context *c_req = wrepl_connect_send(wrepl_socket, our_ip, peer_ip); + return wrepl_connect_recv(c_req); } /* @@ -517,12 +360,13 @@ static void wrepl_request_trigger_handler(struct event_context *ev, struct timed /* trigger an immediate event on a wrepl_request + the return value should only be used in wrepl_request_send() + this is the only place where req->trigger is True */ -static void wrepl_request_trigger(struct wrepl_request *req, NTSTATUS status) +static struct wrepl_request *wrepl_request_finished(struct wrepl_request *req, NTSTATUS status) { - if (req->state == WREPL_REQUEST_SEND) { - DLIST_REMOVE(req->wrepl_socket->send_queue, req); - } + struct timed_event *te; + if (req->state == WREPL_REQUEST_RECV) { DLIST_REMOVE(req->wrepl_socket->recv_queue, req); } @@ -535,61 +379,119 @@ static void wrepl_request_trigger(struct wrepl_request *req, NTSTATUS status) req->status = status; - /* a zero timeout means immediate */ - event_add_timed(req->wrepl_socket->event_ctx, - req, timeval_zero(), - wrepl_request_trigger_handler, req); + if (req->trigger) { + req->trigger = False; + /* a zero timeout means immediate */ + te = event_add_timed(req->wrepl_socket->event.ctx, + req, timeval_zero(), + wrepl_request_trigger_handler, req); + if (!te) { + talloc_free(req); + return NULL; + } + return req; + } + + if (req->async.fn) { + req->async.fn(req); + } + return NULL; } +struct wrepl_send_ctrl_state { + struct wrepl_send_ctrl ctrl; + struct wrepl_request *req; + struct wrepl_socket *wrepl_sock; +}; + +static int wrepl_send_ctrl_destructor(void *ptr) +{ + struct wrepl_send_ctrl_state *s = talloc_get_type(ptr, struct wrepl_send_ctrl_state); + struct wrepl_request *req = s->wrepl_sock->recv_queue; + + /* check if the request is still in WREPL_STATE_RECV, + * we need this here because the caller has may called + * talloc_free(req) and wrepl_send_ctrl_state isn't + * a talloc child of the request, so our s->req pointer + * is maybe invalid! + */ + for (; req; req = req->next) { + if (req == s->req) break; + } + if (!req) return 0; + + /* here, we need to make sure the async request handler is called + * later in the next event_loop and now now + */ + req->trigger = True; + wrepl_request_finished(req, NT_STATUS_OK); + + if (s->ctrl.disconnect_after_send) { + wrepl_socket_dead(s->wrepl_sock, NT_STATUS_LOCAL_DISCONNECT); + } + + return 0; +} /* send a generic wins replication request */ struct wrepl_request *wrepl_request_send(struct wrepl_socket *wrepl_socket, - struct wrepl_packet *packet) + struct wrepl_packet *packet, + struct wrepl_send_ctrl *ctrl) { struct wrepl_request *req; struct wrepl_wrap wrap; + DATA_BLOB blob; req = talloc_zero(wrepl_socket, struct wrepl_request); - if (req == NULL) goto failed; - + if (!req) return NULL; req->wrepl_socket = wrepl_socket; - req->state = WREPL_REQUEST_SEND; - - DLIST_ADD_END(wrepl_socket->send_queue, req, struct wrepl_request *); + req->state = WREPL_REQUEST_RECV; + req->trigger = True; + DLIST_ADD_END(wrepl_socket->recv_queue, req, struct wrepl_request *); talloc_set_destructor(req, wrepl_request_destructor); if (wrepl_socket->dead) { - req->wrepl_socket = wrepl_socket; - wrepl_request_trigger(req, NT_STATUS_INVALID_CONNECTION); - return req; + return wrepl_request_finished(req, NT_STATUS_INVALID_CONNECTION); } wrap.packet = *packet; - req->status = ndr_push_struct_blob(&req->buffer, req, &wrap, + req->status = ndr_push_struct_blob(&blob, req, &wrap, (ndr_push_flags_fn_t)ndr_push_wrepl_wrap); - if (!NT_STATUS_IS_OK(req->status)) goto failed; + if (!NT_STATUS_IS_OK(req->status)) { + return wrepl_request_finished(req, req->status); + } if (DEBUGLVL(10)) { - DEBUG(10,("Sending WINS packet of length %d\n", (int)req->buffer.length)); + DEBUG(10,("Sending WINS packet of length %u\n", blob.length)); NDR_PRINT_DEBUG(wrepl_packet, &wrap.packet); } if (wrepl_socket->request_timeout > 0) { - req->te = event_add_timed(wrepl_socket->event_ctx, req, + req->te = event_add_timed(wrepl_socket->event.ctx, req, timeval_current_ofs(wrepl_socket->request_timeout, 0), wrepl_request_timeout_handler, req); + if (!req->te) return wrepl_request_finished(req, NT_STATUS_NO_MEMORY); } - EVENT_FD_WRITEABLE(wrepl_socket->fde); - - return req; + if (ctrl && (ctrl->send_only || ctrl->disconnect_after_send)) { + struct wrepl_send_ctrl_state *s = talloc(blob.data, struct wrepl_send_ctrl_state); + if (!s) return wrepl_request_finished(req, NT_STATUS_NO_MEMORY); + s->ctrl = *ctrl; + s->req = req; + s->wrepl_sock = wrepl_socket; + talloc_set_destructor(s, wrepl_send_ctrl_destructor); + } -failed: - talloc_free(req); - return NULL; + req->status = packet_send(wrepl_socket->packet, blob); + if (!NT_STATUS_IS_OK(req->status)) { + return wrepl_request_finished(req, req->status); + } + + req->trigger = False; + return req; } /* @@ -615,7 +517,7 @@ NTSTATUS wrepl_request(struct wrepl_socket *wrepl_socket, struct wrepl_packet *req_packet, struct wrepl_packet **reply_packet) { - struct wrepl_request *req = wrepl_request_send(wrepl_socket, req_packet); + struct wrepl_request *req = wrepl_request_send(wrepl_socket, req_packet, NULL); return wrepl_request_recv(req, mem_ctx, reply_packet); } @@ -637,7 +539,7 @@ struct wrepl_request *wrepl_associate_send(struct wrepl_socket *wrepl_socket, packet->message.start.minor_version = 2; packet->message.start.major_version = 5; - req = wrepl_request_send(wrepl_socket, packet); + req = wrepl_request_send(wrepl_socket, packet, NULL); talloc_free(packet); @@ -683,6 +585,7 @@ struct wrepl_request *wrepl_associate_stop_send(struct wrepl_socket *wrepl_socke { struct wrepl_packet *packet; struct wrepl_request *req; + struct wrepl_send_ctrl ctrl; packet = talloc_zero(wrepl_socket, struct wrepl_packet); if (packet == NULL) return NULL; @@ -692,13 +595,14 @@ struct wrepl_request *wrepl_associate_stop_send(struct wrepl_socket *wrepl_socke packet->mess_type = WREPL_STOP_ASSOCIATION; packet->message.stop.reason = io->in.reason; - req = wrepl_request_send(wrepl_socket, packet); - - if (req && io->in.reason == 0) { - req->send_only = True; - req->disconnect_after_send = True; + ZERO_STRUCT(ctrl); + if (io->in.reason == 0) { + ctrl.send_only = True; + ctrl.disconnect_after_send = True; } + req = wrepl_request_send(wrepl_socket, packet, &ctrl); + talloc_free(packet); return req; @@ -745,7 +649,7 @@ struct wrepl_request *wrepl_pull_table_send(struct wrepl_socket *wrepl_socket, packet->mess_type = WREPL_REPLICATION; packet->message.replication.command = WREPL_REPL_TABLE_QUERY; - req = wrepl_request_send(wrepl_socket, packet); + req = wrepl_request_send(wrepl_socket, packet, NULL); talloc_free(packet); @@ -817,7 +721,7 @@ struct wrepl_request *wrepl_pull_names_send(struct wrepl_socket *wrepl_socket, packet->message.replication.command = WREPL_REPL_SEND_REQUEST; packet->message.replication.info.owner = io->in.partner; - req = wrepl_request_send(wrepl_socket, packet); + req = wrepl_request_send(wrepl_socket, packet, NULL); talloc_free(packet); diff --git a/source4/libcli/wrepl/winsrepl.h b/source4/libcli/wrepl/winsrepl.h index 89a4c642b2..9bedfe7548 100644 --- a/source4/libcli/wrepl/winsrepl.h +++ b/source4/libcli/wrepl/winsrepl.h @@ -28,17 +28,16 @@ */ struct wrepl_socket { struct socket_context *sock; - struct event_context *event_ctx; + struct packet_context *packet; - /* a queue of requests pending to be sent */ - struct wrepl_request *send_queue; + struct { + struct event_context *ctx; + struct fd_event *fde; + } event; /* a queue of replies waiting to be received */ struct wrepl_request *recv_queue; - /* the fd event */ - struct fd_event *fde; - /* the default timeout for requests, 0 means no timeout */ #define WREPL_SOCKET_REQUEST_TIMEOUT (60) uint32_t request_timeout; @@ -50,8 +49,13 @@ struct wrepl_socket { BOOL dead; }; +struct wrepl_send_ctrl { + BOOL send_only; + BOOL disconnect_after_send; +}; + enum wrepl_request_state { - WREPL_REQUEST_SEND = 0, + WREPL_REQUEST_INIT = 0, WREPL_REQUEST_RECV = 1, WREPL_REQUEST_DONE = 2, WREPL_REQUEST_ERROR = 3 @@ -65,16 +69,9 @@ struct wrepl_request { struct wrepl_socket *wrepl_socket; enum wrepl_request_state state; + BOOL trigger; NTSTATUS status; - DATA_BLOB buffer; - - BOOL disconnect_after_send; - - BOOL send_only; - - size_t num_read; - struct timed_event *te; struct wrepl_packet *packet; diff --git a/source4/torture/nbt/winsreplication.c b/source4/torture/nbt/winsreplication.c index 3767722e35..578fff1c5a 100644 --- a/source4/torture/nbt/winsreplication.c +++ b/source4/torture/nbt/winsreplication.c @@ -103,6 +103,8 @@ static BOOL test_assoc_ctx1(TALLOC_CTX *mem_ctx, const char *address) struct wrepl_socket *wrepl_socket2; struct wrepl_associate associate2; struct wrepl_pull_table pull_table; + struct wrepl_packet packet; + struct wrepl_send_ctrl ctrl; struct wrepl_packet *rep_packet; struct wrepl_associate_stop assoc_stop; NTSTATUS status; @@ -137,9 +139,14 @@ static BOOL test_assoc_ctx1(TALLOC_CTX *mem_ctx, const char *address) printf("association context (conn2): 0x%x\n", associate2.out.assoc_ctx); printf("Send a replication table query, with assoc 1 (conn2), the anwser should be on conn1\n"); - pull_table.in.assoc_ctx = associate1.out.assoc_ctx; - req = wrepl_pull_table_send(wrepl_socket2, &pull_table); - req->send_only = True; + ZERO_STRUCT(packet); + packet.opcode = WREPL_OPCODE_BITS; + packet.assoc_ctx = associate1.out.assoc_ctx; + packet.mess_type = WREPL_REPLICATION; + packet.message.replication.command = WREPL_REPL_TABLE_QUERY; + ZERO_STRUCT(ctrl); + ctrl.send_only = True; + req = wrepl_request_send(wrepl_socket2, &packet, &ctrl); status = wrepl_request_recv(req, mem_ctx, &rep_packet); CHECK_STATUS(status, NT_STATUS_OK); @@ -281,7 +288,7 @@ static BOOL test_wins_replication(TALLOC_CTX *mem_ctx, const char *address) packet.mess_type = WREPL_STOP_ASSOCIATION; packet.message.stop.reason = 0; - req = wrepl_request_send(wrepl_socket, &packet); + req = wrepl_request_send(wrepl_socket, &packet, NULL); talloc_free(req); printf("failed - We are not a valid pull partner for the server\n"); diff --git a/source4/wrepl_server/wrepl_in_call.c b/source4/wrepl_server/wrepl_in_call.c index 718442a288..d186152848 100644 --- a/source4/wrepl_server/wrepl_in_call.c +++ b/source4/wrepl_server/wrepl_in_call.c @@ -106,7 +106,7 @@ static NTSTATUS wreplsrv_in_stop_association(struct wreplsrv_in_call *call) } /* this will cause to not receive packets anymore and terminate the connection if the reply is send */ - call->wreplconn->terminate = True; + call->terminate_after_send = True; return wreplsrv_in_stop_assoc_ctx(call); } @@ -315,6 +315,7 @@ static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call) struct wreplsrv_out_connection *wrepl_out; struct wrepl_table *update_in = &call->req_packet.message.replication.info.table; struct wreplsrv_in_update_state *update_state; + uint16_t fde_flags; DEBUG(2,("WREPL_REPL_UPDATE: partner[%s] initiator[%s] num_owners[%u]\n", call->wreplconn->partner->address, @@ -325,6 +326,7 @@ static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call) * and do a WREPL_REPL_SEND_REQUEST's on the that connection * and then stop this connection */ + fde_flags = event_get_fd_flags(wrepl_in->conn->event.fde); talloc_free(wrepl_in->conn->event.fde); wrepl_in->conn->event.fde = NULL; @@ -339,9 +341,12 @@ static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call) wrepl_out->assoc_ctx.peer_ctx = wrepl_in->assoc_ctx.peer_ctx; wrepl_out->sock = wrepl_socket_merge(wrepl_out, wrepl_in->conn->event.ctx, - wrepl_in->conn->socket); + wrepl_in->conn->socket, + wrepl_in->packet); NT_STATUS_HAVE_NO_MEMORY(wrepl_out->sock); + event_set_fd_flags(wrepl_out->sock->event.fde, fde_flags); + update_state->wrepl_in = wrepl_in; update_state->wrepl_out = wrepl_out; update_state->cycle_io.in.partner = wrepl_out->partner; diff --git a/source4/wrepl_server/wrepl_in_connection.c b/source4/wrepl_server/wrepl_in_connection.c index e06e69103e..a6abf99e41 100644 --- a/source4/wrepl_server/wrepl_in_connection.c +++ b/source4/wrepl_server/wrepl_in_connection.c @@ -24,6 +24,7 @@ #include "dlinklist.h" #include "lib/events/events.h" #include "lib/socket/socket.h" +#include "lib/stream/packet.h" #include "smbd/service_task.h" #include "smbd/service_stream.h" #include "lib/messaging/irpc.h" @@ -37,226 +38,156 @@ void wreplsrv_terminate_in_connection(struct wreplsrv_in_connection *wreplconn, stream_terminate_connection(wreplconn->conn, reason); } -/* - called when we get a new connection -*/ -static void wreplsrv_accept(struct stream_connection *conn) +static int terminate_after_send_destructor(void *ptr) { - struct wreplsrv_service *service = talloc_get_type(conn->private, struct wreplsrv_service); - struct wreplsrv_in_connection *wreplconn; - const char *peer_ip; - - wreplconn = talloc_zero(conn, struct wreplsrv_in_connection); - if (!wreplconn) { - stream_terminate_connection(conn, "wreplsrv_accept: out of memory"); - return; - } - - wreplconn->conn = conn; - wreplconn->service = service; - wreplconn->our_ip = socket_get_my_addr(conn->socket, wreplconn); - if (!wreplconn->our_ip) { - wreplsrv_terminate_in_connection(wreplconn, "wreplsrv_accept: out of memory"); - return; - } - - peer_ip = socket_get_peer_addr(conn->socket, wreplconn); - if (!peer_ip) { - wreplsrv_terminate_in_connection(wreplconn, "wreplsrv_accept: out of memory"); - return; - } - - wreplconn->partner = wreplsrv_find_partner(service, peer_ip); - - conn->private = wreplconn; - - irpc_add_name(conn->msg_ctx, "wreplsrv_connection"); + struct wreplsrv_in_connection **tas = talloc_get_type(ptr, struct wreplsrv_in_connection *); + wreplsrv_terminate_in_connection(*tas, "wreplsrv_in_connection: terminate_after_send"); + return 0; } /* receive some data on a WREPL connection */ -static void wreplsrv_recv(struct stream_connection *conn, uint16_t flags) +static NTSTATUS wreplsrv_recv_request(void *private, DATA_BLOB blob) { - struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private, struct wreplsrv_in_connection); + struct wreplsrv_in_connection *wreplconn = talloc_get_type(private, struct wreplsrv_in_connection); struct wreplsrv_in_call *call; DATA_BLOB packet_in_blob; DATA_BLOB packet_out_blob; struct wrepl_wrap packet_out_wrap; - struct data_blob_list_item *rep; - NTSTATUS status = NT_STATUS_UNSUCCESSFUL; - size_t nread; - - /* avoid recursion, because of half async code */ - if (wreplconn->processing) { - EVENT_FD_NOT_READABLE(conn->event.fde); - return; - } - - if (wreplconn->partial.length == 0) { - wreplconn->partial = data_blob_talloc(wreplconn, NULL, 4); - if (wreplconn->partial.data == NULL) { - status = NT_STATUS_NO_MEMORY; - goto failed; - } - wreplconn->partial_read = 0; - } - - /* read in the packet length */ - if (wreplconn->partial_read < 4) { - uint32_t packet_length; - - status = socket_recv(conn->socket, - wreplconn->partial.data + wreplconn->partial_read, - 4 - wreplconn->partial_read, - &nread, 0); - if (NT_STATUS_IS_ERR(status)) goto failed; - if (!NT_STATUS_IS_OK(status)) return; - - wreplconn->partial_read += nread; - if (wreplconn->partial_read != 4) return; - - packet_length = RIVAL(wreplconn->partial.data, 0) + 4; - - wreplconn->partial.data = talloc_realloc(wreplconn, wreplconn->partial.data, - uint8_t, packet_length); - if (wreplconn->partial.data == NULL) { - status = NT_STATUS_NO_MEMORY; - goto failed; - } - wreplconn->partial.length = packet_length; - } - - /* read in the body */ - status = socket_recv(conn->socket, - wreplconn->partial.data + wreplconn->partial_read, - wreplconn->partial.length - wreplconn->partial_read, - &nread, 0); - if (NT_STATUS_IS_ERR(status)) goto failed; - if (!NT_STATUS_IS_OK(status)) return; - - wreplconn->partial_read += nread; - if (wreplconn->partial_read != wreplconn->partial.length) return; - - packet_in_blob.data = wreplconn->partial.data + 4; - packet_in_blob.length = wreplconn->partial.length - 4; + NTSTATUS status; call = talloc_zero(wreplconn, struct wreplsrv_in_call); - if (!call) { - status = NT_STATUS_NO_MEMORY; - goto failed; - } + NT_STATUS_HAVE_NO_MEMORY(call); call->wreplconn = wreplconn; + talloc_steal(call, blob.data); - /* we have a full request - parse it */ - status = ndr_pull_struct_blob(&packet_in_blob, - call, &call->req_packet, - (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet); - if (!NT_STATUS_IS_OK(status)) { - DEBUG(2,("Failed to parse incoming WINS-Replication packet - %s\n", - nt_errstr(status))); - DEBUG(10,("packet length %lu\n", (long)wreplconn->partial.length)); - NDR_PRINT_DEBUG(wrepl_packet, &call->req_packet); - goto failed; - } + packet_in_blob.data = blob.data + 4; + packet_in_blob.length = blob.length - 4; - /* - * we have parsed the request, so we can reset the wreplconn->partial_read, - * maybe we could also free wreplconn->partial, but for now we keep it, - * and overwrite it the next time - */ - wreplconn->partial_read = 0; + status = ndr_pull_struct_blob(&packet_in_blob, call, &call->req_packet, + (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet); + NT_STATUS_NOT_OK_RETURN(status); if (DEBUGLVL(10)) { - DEBUG(10,("Received WINS-Replication packet of length %lu\n", (long)wreplconn->partial.length)); + DEBUG(10,("Received WINS-Replication packet of length %u\n", packet_in_blob.length + 4)); NDR_PRINT_DEBUG(wrepl_packet, &call->req_packet); } - /* actually process the request */ - wreplconn->processing = True; status = wreplsrv_in_call(call); - wreplconn->processing = False; - if (NT_STATUS_IS_ERR(status)) goto failed; + NT_STATUS_IS_ERR_RETURN(status); if (!NT_STATUS_IS_OK(status)) { /* w2k just ignores invalid packets, so we do */ DEBUG(10,("Received WINS-Replication packet was invalid, we just ignore it\n")); talloc_free(call); - return; + return NT_STATUS_OK; } /* and now encode the reply */ packet_out_wrap.packet = call->rep_packet; status = ndr_push_struct_blob(&packet_out_blob, call, &packet_out_wrap, (ndr_push_flags_fn_t)ndr_push_wrepl_wrap); - if (!NT_STATUS_IS_OK(status)) goto failed; + NT_STATUS_NOT_OK_RETURN(status); if (DEBUGLVL(10)) { DEBUG(10,("Sending WINS-Replication packet of length %d\n", (int)packet_out_blob.length)); NDR_PRINT_DEBUG(wrepl_packet, &call->rep_packet); } - rep = talloc(wreplconn, struct data_blob_list_item); - if (!rep) { - status = NT_STATUS_NO_MEMORY; - goto failed; + if (call->terminate_after_send) { + struct wreplsrv_in_connection **tas; + tas = talloc(packet_out_blob.data, struct wreplsrv_in_connection *); + NT_STATUS_HAVE_NO_MEMORY(tas); + *tas = wreplconn; + talloc_set_destructor(tas, terminate_after_send_destructor); } - rep->blob = packet_out_blob; - talloc_steal(rep, packet_out_blob.data); - /* we don't need the call anymore */ - talloc_free(call); + status = packet_send(wreplconn->packet, packet_out_blob); + NT_STATUS_NOT_OK_RETURN(status); - if (!wreplconn->send_queue) { - EVENT_FD_WRITEABLE(conn->event.fde); - } - DLIST_ADD_END(wreplconn->send_queue, rep, struct data_blob_list_item *); + talloc_free(call); + return NT_STATUS_OK; +} - if (wreplconn->terminate) { - EVENT_FD_NOT_READABLE(conn->event.fde); - } else { - EVENT_FD_READABLE(conn->event.fde); - } - return; +/* + called when the socket becomes readable +*/ +static void wreplsrv_recv(struct stream_connection *conn, uint16_t flags) +{ + struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private, + struct wreplsrv_in_connection); -failed: - wreplsrv_terminate_in_connection(wreplconn, nt_errstr(status)); + packet_recv(wreplconn->packet); } /* - called when we can write to a connection + called when the socket becomes writable */ static void wreplsrv_send(struct stream_connection *conn, uint16_t flags) { - struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private, struct wreplsrv_in_connection); - NTSTATUS status; + struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private, + struct wreplsrv_in_connection); + packet_queue_run(wreplconn->packet); +} - while (wreplconn->send_queue) { - struct data_blob_list_item *rep = wreplconn->send_queue; - size_t sendlen; +/* + handle socket recv errors +*/ +static void wreplsrv_recv_error(void *private, NTSTATUS status) +{ + struct wreplsrv_in_connection *wreplconn = talloc_get_type(private, + struct wreplsrv_in_connection); + wreplsrv_terminate_in_connection(wreplconn, nt_errstr(status)); +} - status = socket_send(conn->socket, &rep->blob, &sendlen, 0); - if (NT_STATUS_IS_ERR(status)) goto failed; - if (!NT_STATUS_IS_OK(status)) return; +/* + called when we get a new connection +*/ +static void wreplsrv_accept(struct stream_connection *conn) +{ + struct wreplsrv_service *service = talloc_get_type(conn->private, struct wreplsrv_service); + struct wreplsrv_in_connection *wreplconn; + const char *peer_ip; - rep->blob.length -= sendlen; - rep->blob.data += sendlen; + wreplconn = talloc_zero(conn, struct wreplsrv_in_connection); + if (!wreplconn) { + stream_terminate_connection(conn, "wreplsrv_accept: out of memory"); + return; + } - if (rep->blob.length == 0) { - DLIST_REMOVE(wreplconn->send_queue, rep); - talloc_free(rep); - } + wreplconn->packet = packet_init(wreplconn); + if (!wreplconn->packet) { + wreplsrv_terminate_in_connection(wreplconn, "wreplsrv_accept: out of memory"); + return; } + packet_set_private(wreplconn->packet, wreplconn); + packet_set_socket(wreplconn->packet, conn->socket); + packet_set_callback(wreplconn->packet, wreplsrv_recv_request); + packet_set_full_request(wreplconn->packet, packet_full_request_u32); + packet_set_error_handler(wreplconn->packet, wreplsrv_recv_error); + packet_set_event_context(wreplconn->packet, conn->event.ctx); + packet_set_fde(wreplconn->packet, conn->event.fde); + packet_set_serialise(wreplconn->packet); - if (wreplconn->terminate) { - wreplsrv_terminate_in_connection(wreplconn, "connection terminated after all pending packets are send"); + wreplconn->conn = conn; + wreplconn->service = service; + wreplconn->our_ip = socket_get_my_addr(conn->socket, wreplconn); + if (!wreplconn->our_ip) { + wreplsrv_terminate_in_connection(wreplconn, "wreplsrv_accept: out of memory"); return; } - EVENT_FD_NOT_WRITEABLE(conn->event.fde); - return; + peer_ip = socket_get_peer_addr(conn->socket, wreplconn); + if (!peer_ip) { + wreplsrv_terminate_in_connection(wreplconn, "wreplsrv_accept: out of memory"); + return; + } -failed: - wreplsrv_terminate_in_connection(wreplconn, nt_errstr(status)); + wreplconn->partner = wreplsrv_find_partner(service, peer_ip); + + conn->private = wreplconn; + + irpc_add_name(conn->msg_ctx, "wreplsrv_connection"); } static const struct stream_server_ops wreplsrv_stream_ops = { @@ -271,6 +202,7 @@ static const struct stream_server_ops wreplsrv_stream_ops = { */ NTSTATUS wreplsrv_in_connection_merge(struct wreplsrv_partner *partner, struct socket_context *sock, + struct packet_context *packet, struct wreplsrv_in_connection **_wrepl_in) { struct wreplsrv_service *service = partner->service; @@ -301,9 +233,26 @@ NTSTATUS wreplsrv_in_connection_merge(struct wreplsrv_partner *partner, wrepl_in, &conn); NT_STATUS_NOT_OK_RETURN(status); + /* + * make the wreplsrv_in_connection structure a child of the + * stream_connection, to match the hierachie of wreplsrv_accept + */ wrepl_in->conn = conn; talloc_steal(conn, wrepl_in); + /* + * now update the packet handling callback,... + */ + wrepl_in->packet = talloc_steal(wrepl_in, packet); + packet_set_private(wrepl_in->packet, wrepl_in); + packet_set_socket(wrepl_in->packet, conn->socket); + packet_set_callback(wrepl_in->packet, wreplsrv_recv_request); + packet_set_full_request(wrepl_in->packet, packet_full_request_u32); + packet_set_error_handler(wrepl_in->packet, wreplsrv_recv_error); + packet_set_event_context(wrepl_in->packet, conn->event.ctx); + packet_set_fde(wrepl_in->packet, conn->event.fde); + packet_set_serialise(wrepl_in->packet); + *_wrepl_in = wrepl_in; return NT_STATUS_OK; } diff --git a/source4/wrepl_server/wrepl_out_helpers.c b/source4/wrepl_server/wrepl_out_helpers.c index c7c6f55767..d19c38bfb8 100644 --- a/source4/wrepl_server/wrepl_out_helpers.c +++ b/source4/wrepl_server/wrepl_out_helpers.c @@ -45,24 +45,26 @@ struct wreplsrv_out_connect_state { enum wreplsrv_out_connect_stage stage; struct composite_context *c; struct wrepl_request *req; + struct composite_context *c_req; struct wrepl_associate assoc_io; enum winsrepl_partner_type type; struct wreplsrv_out_connection *wreplconn; }; -static void wreplsrv_out_connect_handler(struct wrepl_request *req); +static void wreplsrv_out_connect_handler_creq(struct composite_context *c_req); +static void wreplsrv_out_connect_handler_req(struct wrepl_request *req); static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state) { NTSTATUS status; - status = wrepl_connect_recv(state->req); + status = wrepl_connect_recv(state->c_req); NT_STATUS_NOT_OK_RETURN(status); state->req = wrepl_associate_send(state->wreplconn->sock, &state->assoc_io); NT_STATUS_HAVE_NO_MEMORY(state->req); - state->req->async.fn = wreplsrv_out_connect_handler; + state->req->async.fn = wreplsrv_out_connect_handler_req; state->req->async.private = state; state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX; @@ -92,10 +94,8 @@ static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_ return NT_STATUS_OK; } -static void wreplsrv_out_connect_handler(struct wrepl_request *req) +static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state) { - struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private, - struct wreplsrv_out_connect_state); struct composite_context *c = state->c; switch (state->stage) { @@ -119,6 +119,22 @@ static void wreplsrv_out_connect_handler(struct wrepl_request *req) } } +static void wreplsrv_out_connect_handler_creq(struct composite_context *creq) +{ + struct wreplsrv_out_connect_state *state = talloc_get_type(creq->async.private_data, + struct wreplsrv_out_connect_state); + wreplsrv_out_connect_handler(state); + return; +} + +static void wreplsrv_out_connect_handler_req(struct wrepl_request *req) +{ + struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private, + struct wreplsrv_out_connect_state); + wreplsrv_out_connect_handler(state); + return; +} + static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner, enum winsrepl_partner_type type, struct wreplsrv_out_connection *wreplconn) @@ -179,13 +195,13 @@ static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partn state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET; state->wreplconn= wreplconn; - state->req = wrepl_connect_send(wreplconn->sock, + state->c_req = wrepl_connect_send(wreplconn->sock, partner->our_address, partner->address); - if (!state->req) goto failed; + if (!state->c_req) goto failed; - state->req->async.fn = wreplsrv_out_connect_handler; - state->req->async.private = state; + state->c_req->async.fn = wreplsrv_out_connect_handler_creq; + state->c_req->async.private_data = state; return c; failed: @@ -796,6 +812,7 @@ struct wreplsrv_push_notify_state { struct wreplsrv_push_notify_io *io; enum wrepl_replication_cmd command; BOOL full_table; + struct wrepl_send_ctrl ctrl; struct wrepl_request *req; struct wrepl_packet req_packet; struct wrepl_packet *rep_packet; @@ -815,10 +832,11 @@ static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *s struct wreplsrv_in_connection *wrepl_in; NTSTATUS status; struct socket_context *sock; - struct data_blob_list_item *update_rep; + struct packet_context *packet; + uint16_t fde_flags; const char *our_ip; - DATA_BLOB update_blob; + /* prepare the outgoing request */ req->opcode = WREPL_OPCODE_BITS; req->assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx; req->mess_type = WREPL_REPLICATION; @@ -832,40 +850,64 @@ static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *s our_ip, our_ip, state->full_table); NT_STATUS_NOT_OK_RETURN(status); - state->req = wrepl_request_send(state->wreplconn->sock, req); + /* queue the request */ + state->req = wrepl_request_send(state->wreplconn->sock, req, NULL); NT_STATUS_HAVE_NO_MEMORY(state->req); + /* + * now we need to convert the wrepl_socket (client connection) + * into a wreplsrv_in_connection (server connection), because + * we'll act as a server on this connection after the WREPL_REPL_UPDATE* + * message is received by the peer. + */ + + /* steal the socket_context */ sock = state->wreplconn->sock->sock; - talloc_steal(state, state->wreplconn->sock->sock); state->wreplconn->sock->sock = NULL; + talloc_steal(state, sock); - update_blob = state->req->buffer; - talloc_steal(state, state->req->buffer.data); + /* + * steal the packet_context + * note the request DATA_BLOB we just send on the + * wrepl_socket (client connection) is still unter the + * packet context and will be send to the wire + */ + packet = state->wreplconn->sock->packet; + state->wreplconn->sock->packet = NULL; + talloc_steal(state, packet); + + /* + * get the fde_flags of the old fde event, + * so that we can later set the same flags to the new one + */ + fde_flags = event_get_fd_flags(state->wreplconn->sock->event.fde); + /* + * free the wrepl_socket (client connection) + */ talloc_free(state->wreplconn->sock); state->wreplconn->sock = NULL; + /* + * now create a wreplsrv_in_connection, + * on which we act as server + * + * NOTE: sock and packet will be stolen by + * wreplsrv_in_connection_merge() + */ status = wreplsrv_in_connection_merge(state->io->in.partner, - sock, &wrepl_in); + sock, packet, &wrepl_in); NT_STATUS_NOT_OK_RETURN(status); + event_set_fd_flags(wrepl_in->conn->event.fde, fde_flags); + wrepl_in->assoc_ctx.peer_ctx = state->wreplconn->assoc_ctx.peer_ctx; wrepl_in->assoc_ctx.our_ctx = 0; - update_rep = talloc(wrepl_in, struct data_blob_list_item); - NT_STATUS_HAVE_NO_MEMORY(update_rep); - - update_rep->blob = update_blob; - talloc_steal(update_rep, update_blob.data); - + /* now we can free the wreplsrv_out_connection */ talloc_free(state->wreplconn); state->wreplconn = NULL; - if (!wrepl_in->send_queue) { - EVENT_FD_WRITEABLE(wrepl_in->conn->event.fde); - } - DLIST_ADD_END(wrepl_in->send_queue, update_rep, struct data_blob_list_item *); - state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE; return NT_STATUS_OK; @@ -893,11 +935,12 @@ static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *s our_ip, our_ip, state->full_table); NT_STATUS_NOT_OK_RETURN(status); - state->req = wrepl_request_send(state->wreplconn->sock, req); + /* we won't get a reply to a inform message */ + state->ctrl.send_only = True; + + state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl); NT_STATUS_HAVE_NO_MEMORY(state->req); - /* we won't get a reply to a inform message */ - state->req->send_only = True; state->req->async.fn = wreplsrv_push_notify_handler_req; state->req->async.private = state; diff --git a/source4/wrepl_server/wrepl_server.h b/source4/wrepl_server/wrepl_server.h index 7025bff496..118686622e 100644 --- a/source4/wrepl_server/wrepl_server.h +++ b/source4/wrepl_server/wrepl_server.h @@ -35,6 +35,7 @@ struct wreplsrv_in_call { struct wreplsrv_in_connection *wreplconn; struct wrepl_packet req_packet; struct wrepl_packet rep_packet; + BOOL terminate_after_send; }; /* @@ -43,6 +44,7 @@ struct wreplsrv_in_call { struct wreplsrv_in_connection { struct wreplsrv_in_connection *prev,*next; struct stream_connection *conn; + struct packet_context *packet; /* our global service context */ struct wreplsrv_service *service; @@ -67,25 +69,6 @@ struct wreplsrv_in_connection { uint32_t our_ctx; uint32_t peer_ctx; } assoc_ctx; - - /* the partial input on the connection */ - DATA_BLOB partial; - size_t partial_read; - - /* - * are we currently processing a request? - * this prevents loops, with half async code - */ - BOOL processing; - - /* - * if this is set we no longer accept incoming packets - * and terminate the connection after we have send all packets - */ - BOOL terminate; - - /* the list of outgoing DATA_BLOB's that needs to be send */ - struct data_blob_list_item *send_queue; }; /* |