From cffd522b5c806508dfacfb10234e4c0a115c0a98 Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Fri, 14 Oct 2005 14:02:47 +0000 Subject: r11052: bring samba4 uptodate with the samba4-winsrepl branch, before the bad merge metze (This used to be commit 471c0ca4abb17fb5f73c0efed195c67628c1c06e) --- source4/wrepl_server/wrepl_out_connection.c | 212 +++++++++++++++++++++++++++- 1 file changed, 209 insertions(+), 3 deletions(-) (limited to 'source4/wrepl_server/wrepl_out_connection.c') diff --git a/source4/wrepl_server/wrepl_out_connection.c b/source4/wrepl_server/wrepl_out_connection.c index 39406c7e2a..0d5bfda185 100644 --- a/source4/wrepl_server/wrepl_out_connection.c +++ b/source4/wrepl_server/wrepl_out_connection.c @@ -31,15 +31,221 @@ #include "wrepl_server/wrepl_server.h" #include "nbt_server/wins/winsdb.h" #include "ldb/include/ldb.h" +#include "libcli/composite/composite.h" +#include "libcli/wrepl/winsrepl.h" +#include "wrepl_server/wrepl_out_helpers.h" + +static void wreplsrv_pull_handler_te(struct event_context *ev, struct timed_event *te, + struct timeval t, void *ptr); + +static void wreplsrv_pull_handler_creq(struct composite_context *creq) +{ + struct wreplsrv_partner *partner = talloc_get_type(creq->async.private_data, struct wreplsrv_partner); + uint32_t interval; + + partner->pull.last_status = wreplsrv_pull_cycle_recv(partner->pull.creq); + partner->pull.creq = NULL; + talloc_free(partner->pull.cycle_io); + partner->pull.cycle_io = NULL; + + if (!NT_STATUS_IS_OK(partner->pull.last_status)) { + interval = partner->pull.error_count * partner->pull.retry_interval; + interval = MIN(interval, partner->pull.interval); + partner->pull.error_count++; + + DEBUG(1,("wreplsrv_pull_cycle(%s): %s: next: %us\n", + partner->address, nt_errstr(partner->pull.last_status), + interval)); + } else { + interval = partner->pull.interval; + partner->pull.error_count = 0; + + DEBUG(2,("wreplsrv_pull_cycle(%s): %s: next: %us\n", + partner->address, nt_errstr(partner->pull.last_status), + interval)); + } + + partner->pull.te = event_add_timed(partner->service->task->event_ctx, partner, + timeval_current_ofs(interval, 0), + wreplsrv_pull_handler_te, partner); + if (!partner->pull.te) { + DEBUG(0,("wreplsrv_pull_handler_creq: event_add_timed() failed! no memory!\n")); + } +} + +static void wreplsrv_pull_handler_te(struct event_context *ev, struct timed_event *te, + struct timeval t, void *ptr) +{ + struct wreplsrv_partner *partner = talloc_get_type(ptr, struct wreplsrv_partner); + + partner->pull.te = NULL; + + partner->pull.cycle_io = talloc(partner, struct wreplsrv_pull_cycle_io); + if (!partner->pull.cycle_io) { + goto requeue; + } + + partner->pull.cycle_io->in.partner = partner; + partner->pull.cycle_io->in.num_owners = 0; + partner->pull.cycle_io->in.owners = NULL; + partner->pull.cycle_io->in.wreplconn = NULL; + partner->pull.creq = wreplsrv_pull_cycle_send(partner->pull.cycle_io, partner->pull.cycle_io); + if (!partner->pull.creq) { + DEBUG(1,("wreplsrv_pull_cycle_send(%s) failed\n", + partner->address)); + goto requeue; + } + + partner->pull.creq->async.fn = wreplsrv_pull_handler_creq; + partner->pull.creq->async.private_data = partner; + + return; +requeue: + talloc_free(partner->pull.cycle_io); + partner->pull.cycle_io = NULL; + /* retry later */ + partner->pull.te = event_add_timed(partner->service->task->event_ctx, partner, + timeval_add(&t, partner->pull.retry_interval, 0), + wreplsrv_pull_handler_te, partner); + if (!partner->pull.te) { + DEBUG(0,("wreplsrv_pull_handler_te: event_add_timed() failed! no memory!\n")); + } +} + +NTSTATUS wreplsrv_sched_inform_action(struct wreplsrv_partner *partner, struct wrepl_table *inform_in) +{ + if (partner->pull.creq) { + /* there's already a pull in progress, so we're done */ + return NT_STATUS_OK; + } + + /* remove the scheduled pull */ + talloc_free(partner->pull.te); + partner->pull.te = NULL; + + partner->pull.cycle_io = talloc(partner, struct wreplsrv_pull_cycle_io); + if (!partner->pull.cycle_io) { + goto requeue; + } + + partner->pull.cycle_io->in.partner = partner; + partner->pull.cycle_io->in.num_owners = inform_in->partner_count; + partner->pull.cycle_io->in.owners = inform_in->partners; + talloc_steal(partner->pull.cycle_io, inform_in->partners); + partner->pull.cycle_io->in.wreplconn = NULL; + partner->pull.creq = wreplsrv_pull_cycle_send(partner->pull.cycle_io, partner->pull.cycle_io); + if (!partner->pull.creq) { + DEBUG(1,("wreplsrv_pull_cycle_send(%s) failed\n", + partner->address)); + goto requeue; + } + + partner->pull.creq->async.fn = wreplsrv_pull_handler_creq; + partner->pull.creq->async.private_data = partner; + + return NT_STATUS_OK; +requeue: + talloc_free(partner->pull.cycle_io); + partner->pull.cycle_io = NULL; + /* retry later */ + partner->pull.te = event_add_timed(partner->service->task->event_ctx, partner, + timeval_current_ofs(partner->pull.retry_interval, 0), + wreplsrv_pull_handler_te, partner); + if (!partner->pull.te) { + DEBUG(0,("wreplsrv_pull_handler_te: event_add_timed() failed! no memory!\n")); + } + + return NT_STATUS_OK; +} + +static void wreplsrv_push_handler_te(struct event_context *ev, struct timed_event *te, + struct timeval t, void *ptr); + +static void wreplsrv_push_handler_creq(struct composite_context *creq) +{ + struct wreplsrv_partner *partner = talloc_get_type(creq->async.private_data, struct wreplsrv_partner); + uint32_t interval; + + partner->push.last_status = wreplsrv_push_notify_recv(partner->push.creq); + partner->push.creq = NULL; + talloc_free(partner->push.notify_io); + partner->push.notify_io = NULL; + + if (!NT_STATUS_IS_OK(partner->push.last_status)) { + interval = 15; + + DEBUG(1,("wreplsrv_push_notify(%s): %s: next: %us\n", + partner->address, nt_errstr(partner->push.last_status), + interval)); + } else { + interval = 100; + + DEBUG(2,("wreplsrv_push_notify(%s): %s: next: %us\n", + partner->address, nt_errstr(partner->push.last_status), + interval)); + } + + partner->push.te = event_add_timed(partner->service->task->event_ctx, partner, + timeval_current_ofs(interval, 0), + wreplsrv_push_handler_te, partner); + if (!partner->push.te) { + DEBUG(0,("wreplsrv_push_handler_creq: event_add_timed() failed! no memory!\n")); + } +} + +static void wreplsrv_push_handler_te(struct event_context *ev, struct timed_event *te, + struct timeval t, void *ptr) +{ + struct wreplsrv_partner *partner = talloc_get_type(ptr, struct wreplsrv_partner); + + partner->push.te = NULL; + + partner->push.notify_io = talloc(partner, struct wreplsrv_push_notify_io); + if (!partner->push.notify_io) { + goto requeue; + } + + partner->push.notify_io->in.partner = partner; + partner->push.notify_io->in.inform = False; + partner->push.notify_io->in.propagate = False; + partner->push.creq = wreplsrv_push_notify_send(partner->push.notify_io, partner->push.notify_io); + if (!partner->push.creq) { + DEBUG(1,("wreplsrv_push_notify_send(%s) failed\n", + partner->address)); + goto requeue; + } + + partner->push.creq->async.fn = wreplsrv_push_handler_creq; + partner->push.creq->async.private_data = partner; + + return; +requeue: + talloc_free(partner->push.notify_io); + partner->push.notify_io = NULL; + /* retry later */ + partner->push.te = event_add_timed(partner->service->task->event_ctx, partner, + timeval_add(&t, 5, 0), + wreplsrv_push_handler_te, partner); + if (!partner->push.te) { + DEBUG(0,("wreplsrv_push_handler_te: event_add_timed() failed! no memory!\n")); + } +} NTSTATUS wreplsrv_setup_out_connections(struct wreplsrv_service *service) { struct wreplsrv_partner *cur; for (cur = service->partners; cur; cur = cur->next) { - if (!(cur->type & WINSREPL_PARTNER_PULL)) continue; - - DEBUG(0,("TODO: pull from: %s\n", cur->address)); + if (cur->type & WINSREPL_PARTNER_PULL) { + cur->pull.te = event_add_timed(service->task->event_ctx, cur, + timeval_zero(), wreplsrv_pull_handler_te, cur); + NT_STATUS_HAVE_NO_MEMORY(cur->pull.te); + } + if (cur->type & WINSREPL_PARTNER_PUSH) { + cur->push.te = event_add_timed(service->task->event_ctx, cur, + timeval_zero(), wreplsrv_push_handler_te, cur); + NT_STATUS_HAVE_NO_MEMORY(cur->push.te); + } } return NT_STATUS_OK; -- cgit