From 55a9ea2b33cbcd39ae44972b71922c9c87748d75 Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Sun, 13 Sep 2009 18:16:13 +1000 Subject: s4-repl: added a preiodic notification check to the repl task The dreplsrv_notify code checks the partition uSN values every N seconds, and if one has changed then it sends a DsReplicaSync to all the replication partners listed in the repsTo attribute for the partition. --- source4/dsdb/repl/drepl_notify.c | 426 ++++++++++++++++++++++++++++++++++ source4/dsdb/repl/drepl_out_helpers.c | 4 +- source4/dsdb/repl/drepl_out_pull.c | 3 +- source4/dsdb/repl/drepl_periodic.c | 1 + source4/dsdb/repl/drepl_service.c | 10 + source4/dsdb/repl/drepl_service.h | 37 +++ 6 files changed, 478 insertions(+), 3 deletions(-) create mode 100644 source4/dsdb/repl/drepl_notify.c (limited to 'source4/dsdb/repl') diff --git a/source4/dsdb/repl/drepl_notify.c b/source4/dsdb/repl/drepl_notify.c new file mode 100644 index 0000000000..83c6df48dc --- /dev/null +++ b/source4/dsdb/repl/drepl_notify.c @@ -0,0 +1,426 @@ +/* + Unix SMB/CIFS mplementation. + + DSDB replication service periodic notification handling + + Copyright (C) Andrew Tridgell 2009 + based on drepl_periodic + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "dsdb/samdb/samdb.h" +#include "auth/auth.h" +#include "smbd/service.h" +#include "lib/messaging/irpc.h" +#include "dsdb/repl/drepl_service.h" +#include "lib/ldb/include/ldb_errors.h" +#include "../lib/util/dlinklist.h" +#include "librpc/gen_ndr/ndr_misc.h" +#include "librpc/gen_ndr/ndr_drsuapi.h" +#include "librpc/gen_ndr/ndr_drsblobs.h" +#include "libcli/composite/composite.h" + + +struct dreplsrv_op_notify_state { + struct composite_context *creq; + + struct dreplsrv_out_connection *conn; + + struct dreplsrv_drsuapi_connection *drsuapi; + + struct drsuapi_DsBindInfoCtr bind_info_ctr; + struct drsuapi_DsBind bind_r; + struct dreplsrv_notify_operation *op; +}; + +/* + receive a DsReplicaSync reply + */ +static void dreplsrv_op_notify_replica_sync_recv(struct rpc_request *req) +{ + struct dreplsrv_op_notify_state *st = talloc_get_type(req->async.private_data, + struct dreplsrv_op_notify_state); + struct composite_context *c = st->creq; + struct drsuapi_DsReplicaSync *r = talloc_get_type(req->ndr.struct_ptr, + struct drsuapi_DsReplicaSync); + + c->status = dcerpc_ndr_request_recv(req); + if (!composite_is_ok(c)) return; + + if (!W_ERROR_IS_OK(r->out.result)) { + composite_error(c, werror_to_ntstatus(r->out.result)); + return; + } + + composite_done(c); +} + +/* + send a DsReplicaSync +*/ +static void dreplsrv_op_notify_replica_sync_send(struct dreplsrv_op_notify_state *st) +{ + struct composite_context *c = st->creq; + struct dreplsrv_partition *partition = st->op->source_dsa->partition; + struct dreplsrv_drsuapi_connection *drsuapi = st->op->source_dsa->conn->drsuapi; + struct rpc_request *req; + struct drsuapi_DsReplicaSync *r; + + r = talloc_zero(st, struct drsuapi_DsReplicaSync); + if (composite_nomem(r, c)) return; + + r->in.bind_handle = &drsuapi->bind_handle; + r->in.level = 1; + r->in.req.req1.naming_context = &partition->nc; + r->in.req.req1.source_dsa_guid = st->op->service->ntds_guid; + r->in.req.req1.options = + DRSUAPI_DS_REPLICA_SYNC_ASYNCHRONOUS_OPERATION | + DRSUAPI_DS_REPLICA_SYNC_WRITEABLE | + DRSUAPI_DS_REPLICA_SYNC_ALL_SOURCES; + + + req = dcerpc_drsuapi_DsReplicaSync_send(drsuapi->pipe, r, r); + composite_continue_rpc(c, req, dreplsrv_op_notify_replica_sync_recv, st); +} + +/* + called when we have an established connection + */ +static void dreplsrv_op_notify_connect_recv(struct composite_context *creq) +{ + struct dreplsrv_op_notify_state *st = talloc_get_type(creq->async.private_data, + struct dreplsrv_op_notify_state); + struct composite_context *c = st->creq; + + c->status = dreplsrv_out_drsuapi_recv(creq); + if (!composite_is_ok(c)) return; + + dreplsrv_op_notify_replica_sync_send(st); +} + +/* + start the ReplicaSync async call + */ +static struct composite_context *dreplsrv_op_notify_send(struct dreplsrv_notify_operation *op) +{ + struct composite_context *c; + struct composite_context *creq; + struct dreplsrv_op_notify_state *st; + + c = composite_create(op, op->service->task->event_ctx); + if (c == NULL) return NULL; + + st = talloc_zero(c, struct dreplsrv_op_notify_state); + if (composite_nomem(st, c)) return c; + + st->creq = c; + st->op = op; + + creq = dreplsrv_out_drsuapi_send(op->source_dsa->conn); + composite_continue(c, creq, dreplsrv_op_notify_connect_recv, st); + + return c; +} + +static void dreplsrv_notify_del_repsTo(struct dreplsrv_notify_operation *op) +{ + uint32_t count; + struct repsFromToBlob *reps; + WERROR werr; + struct dreplsrv_service *s = op->service; + int i; + + werr = dsdb_loadreps(s->samdb, op, op->source_dsa->partition->dn, "repsTo", &reps, &count); + if (!W_ERROR_IS_OK(werr)) { + DEBUG(0,(__location__ ": Failed to load repsTo for %s\n", + ldb_dn_get_linearized(op->source_dsa->partition->dn))); + return; + } + + for (i=0; isource_dsa->repsFrom1->source_dsa_obj_guid) == 0) { + memmove(&reps[i], &reps[i+1], + sizeof(reps[i])*(count-(i+1))); + count--; + } + } + + werr = dsdb_savereps(s->samdb, op, op->source_dsa->partition->dn, "repsTo", reps, count); + if (!W_ERROR_IS_OK(werr)) { + DEBUG(0,(__location__ ": Failed to save repsTo for %s\n", + ldb_dn_get_linearized(op->source_dsa->partition->dn))); + return; + } +} + +/* + called when a notify operation has completed + */ +static void dreplsrv_notify_op_callback(struct dreplsrv_notify_operation *op) +{ + NTSTATUS status; + struct dreplsrv_service *s = op->service; + + status = composite_wait(op->creq); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0,("dreplsrv_notify: Failed to send DsReplicaSync to %s - %s\n", + op->source_dsa->repsFrom1->other_info->dns_name, + nt_errstr(status))); + } else { + DEBUG(2,("dreplsrv_notify: DsReplicaSync OK for %s\n", + op->source_dsa->repsFrom1->other_info->dns_name)); + op->source_dsa->notify_uSN = op->uSN; + /* delete the repsTo for this replication partner in the + partition, as we have successfully told him to sync */ + dreplsrv_notify_del_repsTo(op); + } + talloc_free(op->creq); + + talloc_free(op); + s->ops.n_current = NULL; + dreplsrv_notify_run_ops(s); +} + + +static void dreplsrv_notify_op_callback_creq(struct composite_context *creq) +{ + struct dreplsrv_notify_operation *op = talloc_get_type(creq->async.private_data, + struct dreplsrv_notify_operation); + dreplsrv_notify_op_callback(op); +} + +/* + run any pending replica sync calls + */ +void dreplsrv_notify_run_ops(struct dreplsrv_service *s) +{ + struct dreplsrv_notify_operation *op; + + if (s->ops.n_current || s->ops.current) { + /* if there's still one running, we're done */ + return; + } + + if (!s->ops.notifies) { + /* if there're no pending operations, we're done */ + return; + } + + op = s->ops.notifies; + s->ops.n_current = op; + DLIST_REMOVE(s->ops.notifies, op); + + op->creq = dreplsrv_op_notify_send(op); + if (!op->creq) { + dreplsrv_notify_op_callback(op); + return; + } + + op->creq->async.fn = dreplsrv_notify_op_callback_creq; + op->creq->async.private_data = op; +} + + +/* + find a source_dsa for a given guid + */ +static struct dreplsrv_partition_source_dsa *dreplsrv_find_source_dsa(struct dreplsrv_partition *p, + struct GUID *guid) +{ + struct dreplsrv_partition_source_dsa *s; + + for (s=p->sources; s; s=s->next) { + if (GUID_compare(&s->repsFrom1->source_dsa_obj_guid, guid) == 0) { + return s; + } + } + return NULL; +} + + +/* + schedule a replicaSync message + */ +static WERROR dreplsrv_schedule_notify_sync(struct dreplsrv_service *service, + struct dreplsrv_partition *p, + struct repsFromToBlob *reps, + TALLOC_CTX *mem_ctx, + uint64_t uSN) +{ + struct dreplsrv_notify_operation *op; + struct dreplsrv_partition_source_dsa *s; + + s = dreplsrv_find_source_dsa(p, &reps->ctr.ctr1.source_dsa_obj_guid); + if (s == NULL) { + DEBUG(0,(__location__ ": Unable to find source_dsa for %s\n", + GUID_string(mem_ctx, &reps->ctr.ctr1.source_dsa_obj_guid))); + return WERR_DS_UNAVAILABLE; + } + + op = talloc_zero(mem_ctx, struct dreplsrv_notify_operation); + W_ERROR_HAVE_NO_MEMORY(op); + + op->service = service; + op->source_dsa = s; + op->uSN = uSN; + + DLIST_ADD_END(service->ops.notifies, op, struct dreplsrv_notify_operation *); + talloc_steal(service, op); + return WERR_OK; +} + +/* + see if a partition has a hugher uSN than what is in the repsTo and + if so then send a DsReplicaSync + */ +static WERROR dreplsrv_notify_check(struct dreplsrv_service *s, + struct dreplsrv_partition *p, + TALLOC_CTX *mem_ctx) +{ + uint32_t count; + struct repsFromToBlob *reps; + WERROR werr; + uint64_t uSN; + int ret, i; + + werr = dsdb_loadreps(s->samdb, mem_ctx, p->dn, "repsTo", &reps, &count); + if (!W_ERROR_IS_OK(werr)) { + DEBUG(0,(__location__ ": Failed to load repsTo for %s\n", + ldb_dn_get_linearized(p->dn))); + return werr; + } + + /* loads the partition uSNHighest */ + ret = dsdb_load_partition_usn(s->samdb, p->dn, &uSN); + if (ret != LDB_SUCCESS || uSN == 0) { + /* nothing to do */ + return WERR_OK; + } + + /* see if any of our partners need some of our objects */ + for (i=0; inotify_uSN < uSN) { + /* we need to tell this partner to replicate + with us */ + werr = dreplsrv_schedule_notify_sync(s, p, &reps[i], mem_ctx, uSN); + if (!W_ERROR_IS_OK(werr)) { + DEBUG(0,(__location__ ": Failed to setup notify to %s for %s\n", + reps[i].ctr.ctr1.other_info->dns_name, + ldb_dn_get_linearized(p->dn))); + return werr; + } + } + } + + + return WERR_OK; +} + +/* + see if any of the partitions have changed, and if so then send a + DsReplicaSync to all the replica partners in the repsTo object + */ +static WERROR dreplsrv_notify_check_all(struct dreplsrv_service *s, TALLOC_CTX *mem_ctx) +{ + WERROR status; + struct dreplsrv_partition *p; + + for (p = s->partitions; p; p = p->next) { + status = dreplsrv_notify_check(s, p, mem_ctx); + W_ERROR_NOT_OK_RETURN(status); + } + + return WERR_OK; +} + +static void dreplsrv_notify_run(struct dreplsrv_service *service); + +static void dreplsrv_notify_handler_te(struct tevent_context *ev, struct tevent_timer *te, + struct timeval t, void *ptr) +{ + struct dreplsrv_service *service = talloc_get_type(ptr, struct dreplsrv_service); + WERROR status; + + service->notify.te = NULL; + + dreplsrv_notify_run(service); + + status = dreplsrv_notify_schedule(service, service->notify.interval); + if (!W_ERROR_IS_OK(status)) { + task_server_terminate(service->task, win_errstr(status)); + return; + } +} + +WERROR dreplsrv_notify_schedule(struct dreplsrv_service *service, uint32_t next_interval) +{ + TALLOC_CTX *tmp_mem; + struct tevent_timer *new_te; + struct timeval next_time; + + /* prevent looping */ + if (next_interval == 0) next_interval = 1; + + next_time = timeval_current_ofs(next_interval, 50); + + if (service->notify.te) { + /* + * if the timestamp of the new event is higher, + * as current next we don't need to reschedule + */ + if (timeval_compare(&next_time, &service->notify.next_event) > 0) { + return WERR_OK; + } + } + + /* reset the next scheduled timestamp */ + service->notify.next_event = next_time; + + new_te = event_add_timed(service->task->event_ctx, service, + service->notify.next_event, + dreplsrv_notify_handler_te, service); + W_ERROR_HAVE_NO_MEMORY(new_te); + + tmp_mem = talloc_new(service); + DEBUG(2,("dreplsrv_notify_schedule(%u) %sscheduled for: %s\n", + next_interval, + (service->notify.te?"re":""), + nt_time_string(tmp_mem, timeval_to_nttime(&next_time)))); + talloc_free(tmp_mem); + + talloc_free(service->notify.te); + service->notify.te = new_te; + + return WERR_OK; +} + +static void dreplsrv_notify_run(struct dreplsrv_service *service) +{ + TALLOC_CTX *mem_ctx; + + mem_ctx = talloc_new(service); + dreplsrv_notify_check_all(service, mem_ctx); + talloc_free(mem_ctx); + + dreplsrv_run_pending_ops(service); + dreplsrv_notify_run_ops(service); +} diff --git a/source4/dsdb/repl/drepl_out_helpers.c b/source4/dsdb/repl/drepl_out_helpers.c index e90f2783b1..168aacdde9 100644 --- a/source4/dsdb/repl/drepl_out_helpers.c +++ b/source4/dsdb/repl/drepl_out_helpers.c @@ -48,7 +48,7 @@ struct dreplsrv_out_drsuapi_state { static void dreplsrv_out_drsuapi_connect_recv(struct composite_context *creq); -static struct composite_context *dreplsrv_out_drsuapi_send(struct dreplsrv_out_connection *conn) +struct composite_context *dreplsrv_out_drsuapi_send(struct dreplsrv_out_connection *conn) { struct composite_context *c; struct composite_context *creq; @@ -165,7 +165,7 @@ static void dreplsrv_out_drsuapi_bind_recv(struct rpc_request *req) composite_done(c); } -static NTSTATUS dreplsrv_out_drsuapi_recv(struct composite_context *c) +NTSTATUS dreplsrv_out_drsuapi_recv(struct composite_context *c) { NTSTATUS status; struct dreplsrv_out_drsuapi_state *st = talloc_get_type(c->private_data, diff --git a/source4/dsdb/repl/drepl_out_pull.c b/source4/dsdb/repl/drepl_out_pull.c index 54dbd29730..b073d265c3 100644 --- a/source4/dsdb/repl/drepl_out_pull.c +++ b/source4/dsdb/repl/drepl_out_pull.c @@ -125,6 +125,7 @@ done: talloc_free(op); s->ops.current = NULL; dreplsrv_run_pending_ops(s); + dreplsrv_notify_run_ops(s); } static void dreplsrv_pending_op_callback_creq(struct composite_context *creq) @@ -140,7 +141,7 @@ void dreplsrv_run_pending_ops(struct dreplsrv_service *s) time_t t; NTTIME now; - if (s->ops.current) { + if (s->ops.current || s->ops.n_current) { /* if there's still one running, we're done */ return; } diff --git a/source4/dsdb/repl/drepl_periodic.c b/source4/dsdb/repl/drepl_periodic.c index 2cfcb310dc..377cecbe99 100644 --- a/source4/dsdb/repl/drepl_periodic.c +++ b/source4/dsdb/repl/drepl_periodic.c @@ -110,4 +110,5 @@ static void dreplsrv_periodic_run(struct dreplsrv_service *service) dreplsrv_refresh_partitions(service); dreplsrv_run_pending_ops(service); + dreplsrv_notify_run_ops(service); } diff --git a/source4/dsdb/repl/drepl_service.c b/source4/dsdb/repl/drepl_service.c index eb49da3d41..cb415b65e6 100644 --- a/source4/dsdb/repl/drepl_service.c +++ b/source4/dsdb/repl/drepl_service.c @@ -196,6 +196,16 @@ static void dreplsrv_task_init(struct task_server *task) return; } + service->notify.interval = lp_parm_int(task->lp_ctx, NULL, "dreplsrv", + "notify_interval", 5); /* in seconds */ + status = dreplsrv_notify_schedule(service, service->notify.interval); + if (!W_ERROR_IS_OK(status)) { + task_server_terminate(task, talloc_asprintf(task, + "dreplsrv: Failed to setup notify schedule: %s\n", + win_errstr(status))); + return; + } + irpc_add_name(task->msg_ctx, "dreplsrv"); IRPC_REGISTER(task->msg_ctx, drsuapi, DRSUAPI_DSREPLICASYNC, drepl_replica_sync, service); diff --git a/source4/dsdb/repl/drepl_service.h b/source4/dsdb/repl/drepl_service.h index a9eea30719..0f9684fa78 100644 --- a/source4/dsdb/repl/drepl_service.h +++ b/source4/dsdb/repl/drepl_service.h @@ -69,6 +69,9 @@ struct dreplsrv_partition_source_dsa { struct repsFromToBlob _repsFromBlob; struct repsFromTo1 *repsFrom1; + /* the last uSN when we sent a notify */ + uint64_t notify_uSN; + /* the reference to the source_dsa and its outgoing connection */ struct dreplsrv_out_connection *conn; }; @@ -107,6 +110,17 @@ struct dreplsrv_out_operation { struct composite_context *creq; }; +struct dreplsrv_notify_operation { + struct dreplsrv_notify_operation *prev, *next; + + struct dreplsrv_service *service; + uint64_t uSN; + + struct dreplsrv_partition_source_dsa *source_dsa; + + struct composite_context *creq; +}; + struct dreplsrv_service { /* the whole drepl service is in one task */ struct task_server *task; @@ -150,6 +164,23 @@ struct dreplsrv_service { struct tevent_timer *te; } periodic; + /* some stuff for notify processing */ + struct { + /* + * the interval between notify runs + */ + uint32_t interval; + + /* + * the timestamp for the next event, + * this is the timstamp passed to event_add_timed() + */ + struct timeval next_event; + + /* here we have a reference to the timed event the schedules the notifies */ + struct tevent_timer *te; + } notify; + /* * the list of partitions we need to replicate */ @@ -166,6 +197,12 @@ struct dreplsrv_service { /* the list of pending operations */ struct dreplsrv_out_operation *pending; + + /* the list of pending notify operations */ + struct dreplsrv_notify_operation *notifies; + + /* an active notify operation */ + struct dreplsrv_notify_operation *n_current; } ops; }; -- cgit From 3cf73dfdbdd33189e0f9f22e0e494962376f9b86 Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Mon, 14 Sep 2009 03:43:26 -0700 Subject: s4-repl: fall back to repsFrom if repsTo not set Windows does not seem to be always setting up repsTo using DsUpdateRefs(). For now we will fall back to using repsFrom if repsTo is empty. This is almost certainly incorrect, but it does get notification based replication working with both w2k3 and w2k8. --- source4/dsdb/repl/drepl_notify.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'source4/dsdb/repl') diff --git a/source4/dsdb/repl/drepl_notify.c b/source4/dsdb/repl/drepl_notify.c index 83c6df48dc..73280917c5 100644 --- a/source4/dsdb/repl/drepl_notify.c +++ b/source4/dsdb/repl/drepl_notify.c @@ -293,13 +293,16 @@ static WERROR dreplsrv_notify_check(struct dreplsrv_service *s, struct dreplsrv_partition *p, TALLOC_CTX *mem_ctx) { - uint32_t count; + uint32_t count=0; struct repsFromToBlob *reps; WERROR werr; uint64_t uSN; int ret, i; werr = dsdb_loadreps(s->samdb, mem_ctx, p->dn, "repsTo", &reps, &count); + if (count == 0) { + werr = dsdb_loadreps(s->samdb, mem_ctx, p->dn, "repsFrom", &reps, &count); + } if (!W_ERROR_IS_OK(werr)) { DEBUG(0,(__location__ ": Failed to load repsTo for %s\n", ldb_dn_get_linearized(p->dn))); @@ -331,7 +334,6 @@ static WERROR dreplsrv_notify_check(struct dreplsrv_service *s, } } - return WERR_OK; } -- cgit