From e64e3985688e57a882e0e128b256ec5f84c6f67a Mon Sep 17 00:00:00 2001 From: Kamen Mazdrashki Date: Tue, 7 Sep 2010 17:00:20 +0300 Subject: s4-dreplsrv: Run NC replication synchronously if requested --- source4/dsdb/repl/drepl_service.c | 149 ++++++++++++++++++++++++++++++-------- 1 file changed, 119 insertions(+), 30 deletions(-) (limited to 'source4') diff --git a/source4/dsdb/repl/drepl_service.c b/source4/dsdb/repl/drepl_service.c index 5babbcbf6d..7d4a88ca80 100644 --- a/source4/dsdb/repl/drepl_service.c +++ b/source4/dsdb/repl/drepl_service.c @@ -34,6 +34,21 @@ #include "librpc/gen_ndr/ndr_irpc.h" #include "param/param.h" +/** + * Call-back data for _drepl_replica_sync_done_cb() + */ +struct drepl_replica_sync_cb_data { + struct irpc_message *msg; + struct drsuapi_DsReplicaSync *r; + + /* number of ops left to be completed */ + int ops_count; + + /* last failure error code */ + WERROR werr_last_failure; +}; + + static WERROR dreplsrv_init_creds(struct dreplsrv_service *service) { service->system_session_info = system_session(service->task->lp_ctx); @@ -104,6 +119,92 @@ static WERROR dreplsrv_connect_samdb(struct dreplsrv_service *service, struct lo } +/** + * Callback for dreplsrv_out_operation operation completion. + * + * We just need to complete a waiting IRPC message here. + * In case pull operation has failed, + * caller of this callback will dump + * failure information. + * + * NOTE: cb_data is allocated in IRPC msg's context + * and will be freed during irpc_send_reply() call. + */ +static void _drepl_replica_sync_done_cb(struct dreplsrv_service *service, + WERROR werr, + enum drsuapi_DsExtendedError ext_err, + void *cb_data) +{ + struct drepl_replica_sync_cb_data *data = talloc_get_type(cb_data, + struct drepl_replica_sync_cb_data); + struct irpc_message *msg = data->msg; + struct drsuapi_DsReplicaSync *r = data->r; + + /* store last bad result */ + if (W_ERROR_IS_OK(werr)) { + data->werr_last_failure = werr; + } + + /* decrement pending ops count */ + data->ops_count--; + + if (data->ops_count == 0) { + /* Return result to client */ + r->out.result = data->werr_last_failure; + + /* complete IRPC message */ + irpc_send_reply(msg, NT_STATUS_OK); + } +} + +/** + * Helper to schedule a replication operation with a source DSA. + * If 'data' is valid pointer, then a callback + * for the operation is passed and 'data->msg' is + * marked as 'deferred' - defer_reply = true + */ +static WERROR _drepl_schedule_replication(struct dreplsrv_service *service, + struct dreplsrv_partition_source_dsa *dsa, + struct drsuapi_DsReplicaObjectIdentifier *nc, + struct drepl_replica_sync_cb_data *data, + TALLOC_CTX *mem_ctx) +{ + WERROR werr; + dreplsrv_fsmo_callback_t fn_callback = NULL; + + if (data) { + fn_callback = _drepl_replica_sync_done_cb; + } + + /* schedule replication item */ + werr = dreplsrv_schedule_partition_pull_source(service, dsa, + DRSUAPI_EXOP_NONE, 0, + fn_callback, data); + if (!W_ERROR_IS_OK(werr)) { + DEBUG(0,("%s: failed setup of sync of partition (%s, %s, %s) - %s\n", + __FUNCTION__, + GUID_string(mem_ctx, &nc->guid), + nc->dn, + dsa->repsFrom1->other_info->dns_name, + win_errstr(werr))); + return werr; + } + /* log we've scheduled a replication item */ + DEBUG(3,("%s: forcing sync of partition (%s, %s, %s)\n", + __FUNCTION__, + GUID_string(mem_ctx, &nc->guid), + nc->dn, + dsa->repsFrom1->other_info->dns_name)); + + /* mark IRPC message as deferred if necessary */ + if (data) { + data->ops_count++; + data->msg->defer_reply = true; + } + + return WERR_OK; +} + /* DsReplicaSync messages from the DRSUAPI server are forwarded here */ @@ -112,6 +213,7 @@ static NTSTATUS drepl_replica_sync(struct irpc_message *msg, { WERROR werr; struct dreplsrv_partition *p; + struct drepl_replica_sync_cb_data *cb_data; struct dreplsrv_partition_source_dsa *dsa; struct drsuapi_DsReplicaSyncRequest1 *req1; struct drsuapi_DsReplicaObjectIdentifier *nc; @@ -147,28 +249,29 @@ static NTSTATUS drepl_replica_sync(struct irpc_message *msg, REPLICA_SYNC_FAIL(werr); } + /* should we process it asynchronously? */ + if (req1->options & DRSUAPI_DRS_ASYNC_OP) { + cb_data = NULL; + } else { + cb_data = talloc_zero(msg, struct drepl_replica_sync_cb_data); + if (!cb_data) { + DEBUG(0,(__location__ ": Not enought memory!")); + REPLICA_SYNC_FAIL(WERR_DS_DRA_INTERNAL_ERROR); + } + + cb_data->msg = msg; + cb_data->r = r; + cb_data->werr_last_failure = WERR_OK; + } + /* collect source DSAs to sync with */ if (req1->options & DRSUAPI_DRS_SYNC_ALL) { for (dsa = p->sources; dsa; dsa = dsa->next) { /* schedule replication item */ - werr = dreplsrv_schedule_partition_pull_source(service, dsa, - DRSUAPI_EXOP_NONE, 0, - NULL, NULL); + werr = _drepl_schedule_replication(service, dsa, nc, cb_data, msg); if (!W_ERROR_IS_OK(werr)) { - DEBUG(0,("%s: failed setup of sync of partition (%s, %s, %s) - %s\n", - __FUNCTION__, - GUID_string(msg, &nc->guid), - nc->dn, - dsa->repsFrom1->other_info->dns_name, - win_errstr(werr))); REPLICA_SYNC_FAIL(werr); } - /* log we've scheduled replication item */ - DEBUG(3,("%s: forcing sync of partition (%s, %s, %s)\n", - __FUNCTION__, - GUID_string(msg, &nc->guid), - nc->dn, - dsa->repsFrom1->other_info->dns_name)); } } else { if (req1->options & DRSUAPI_DRS_SYNC_BYNAME) { @@ -201,24 +304,10 @@ static NTSTATUS drepl_replica_sync(struct irpc_message *msg, } /* schedule replication item */ - werr = dreplsrv_schedule_partition_pull_source(service, dsa, - DRSUAPI_EXOP_NONE, 0, - NULL, NULL); + werr = _drepl_schedule_replication(service, dsa, nc, cb_data, msg); if (!W_ERROR_IS_OK(werr)) { - DEBUG(0,("%s: failed setup of sync of partition (%s, %s, %s) - %s\n", - __FUNCTION__, - GUID_string(msg, &nc->guid), - nc->dn, - dsa->repsFrom1->other_info->dns_name, - win_errstr(werr))); REPLICA_SYNC_FAIL(werr); } - /* log we've scheduled replication item */ - DEBUG(3,("%s: forcing sync of partition (%s, %s, %s)\n", - __FUNCTION__, - GUID_string(msg, &nc->guid), - nc->dn, - dsa->repsFrom1->other_info->dns_name)); } /* if we got here, everything is OK */ -- cgit