diff options
Diffstat (limited to 'source4/dsdb/samdb/ldb_modules')
-rw-r--r-- | source4/dsdb/samdb/ldb_modules/repl_meta_data.c | 337 |
1 files changed, 335 insertions, 2 deletions
diff --git a/source4/dsdb/samdb/ldb_modules/repl_meta_data.c b/source4/dsdb/samdb/ldb_modules/repl_meta_data.c index 487a6146af..7998d5466c 100644 --- a/source4/dsdb/samdb/ldb_modules/repl_meta_data.c +++ b/source4/dsdb/samdb/ldb_modules/repl_meta_data.c @@ -45,6 +45,7 @@ #include "lib/ldb/include/ldb_private.h" #include "dsdb/samdb/samdb.h" #include "librpc/gen_ndr/ndr_misc.h" +#include "librpc/gen_ndr/ndr_drsuapi.h" #include "librpc/gen_ndr/ndr_drsblobs.h" struct replmd_replicated_request { @@ -606,9 +607,11 @@ static int replmd_replicated_apply_search(struct replmd_replicated_request *ar) static int replmd_replicated_apply_next(struct replmd_replicated_request *ar) { +#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ if (ar->index_current >= ar->objs->num_objects) { - return replmd_replicated_request_done(ar); + return replmd_replicated_uptodate_vector(ar); } +#endif ar->sub.mem_ctx = talloc_new(ar); if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM); @@ -616,6 +619,331 @@ static int replmd_replicated_apply_next(struct replmd_replicated_request *ar) return replmd_replicated_apply_search(ar); } +static int replmd_replicated_uptodate_modify_callback(struct ldb_context *ldb, + void *private_data, + struct ldb_reply *ares) +{ +#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ + struct replmd_replicated_request *ar = talloc_get_type(private_data, + struct replmd_replicated_request); + + ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL); + if (ar->sub.change_ret != LDB_SUCCESS) { + return replmd_replicated_request_error(ar, ar->sub.change_ret); + } + + talloc_free(ar->sub.mem_ctx); + ZERO_STRUCT(ar->sub); + + return replmd_replicated_request_done(ar); +#else + return LDB_SUCCESS; +#endif +} + +static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar) +{ + NTSTATUS nt_status; + struct ldb_message *msg; + struct replUpToDateVectorBlob ouv; + const struct ldb_val *ouv_value; + const struct drsuapi_DsReplicaCursor2CtrEx *ruv; + struct replUpToDateVectorBlob nuv; + struct ldb_val nuv_value; + struct ldb_message_element *nuv_el = NULL; + struct GUID *our_invocation_id; + uint32_t i,j,ni=0; + uint64_t seq_num; + bool found = false; + time_t t = time(NULL); + NTTIME now; + int ret; + + ruv = ar->objs->uptodateness_vector; + ZERO_STRUCT(ouv); + ouv.version = 2; + ZERO_STRUCT(nuv); + nuv.version = 2; + + unix_to_nt_time(&now, t); + + /* + * we use the next sequence number for our own highest_usn + * because we will do a modify request and this will increment + * our highest_usn + */ + ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num); + if (ret != LDB_SUCCESS) { + return replmd_replicated_request_error(ar, ret); + } + + ouv_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replUpToDateVector"); + if (ouv_value) { + nt_status = ndr_pull_struct_blob(ouv_value, ar->sub.mem_ctx, &ouv, + (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob); + if (!NT_STATUS_IS_OK(nt_status)) { + return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status)); + } + + if (ouv.version != 2) { + return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR); + } + } + + /* + * the new uptodateness vector will at least + * contain 2 entries, one for the source_dsa and one the local server + * + * plus optional values from our old vector and the one from the source_dsa + */ + nuv.ctr.ctr2.count = 2 + ouv.ctr.ctr2.count; + if (ruv) nuv.ctr.ctr2.count += ruv->count; + nuv.ctr.ctr2.cursors = talloc_array(ar->sub.mem_ctx, + struct drsuapi_DsReplicaCursor2, + nuv.ctr.ctr2.count); + if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM); + + /* first copy the old vector */ + for (i=0; i < ouv.ctr.ctr2.count; i++) { + nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i]; + ni++; + } + + /* merge in the source_dsa vector is available */ + for (i=0; (ruv && i < ruv->count); i++) { + found = false; + + for (j=0; j < ni; j++) { + if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id, + &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) { + continue; + } + + found = true; + + /* + * we update only the highest_usn and not the latest_sync_success time, + * because the last success stands for direct replication + */ + if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) { + nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn; + } + break; + } + + if (found) continue; + + /* if it's not there yet, add it */ + nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i]; + ni++; + } + + /* + * merge in the current highwatermark for the source_dsa + */ + found = false; + for (j=0; j < ni; j++) { + if (!GUID_equal(ar->objs->source_dsa_invocation_id, + &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) { + continue; + } + + found = true; + + /* + * here we update the highest_usn and last_sync_success time + * because we're directly replicating from the source_dsa + * + * and use the tmp_highest_usn because this is what we have just applied + * to our ldb + */ + nuv.ctr.ctr2.cursors[j].highest_usn = ar->objs->new_highwatermark->tmp_highest_usn; + nuv.ctr.ctr2.cursors[j].last_sync_success = now; + break; + } + if (!found) { + /* + * here we update the highest_usn and last_sync_success time + * because we're directly replicating from the source_dsa + * + * and use the tmp_highest_usn because this is what we have just applied + * to our ldb + */ + nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= *ar->objs->source_dsa_invocation_id; + nuv.ctr.ctr2.cursors[ni].highest_usn = ar->objs->new_highwatermark->tmp_highest_usn; + nuv.ctr.ctr2.cursors[ni].last_sync_success = now; + ni++; + } + + /* + * merge our own current values if we have a invocation_id already + * attached to the ldb + */ + our_invocation_id = samdb_ntds_invocation_id(ar->module->ldb); + if (our_invocation_id) { + found = false; + for (j=0; j < ni; j++) { + if (!GUID_equal(our_invocation_id, + &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) { + continue; + } + + found = true; + + /* + * here we update the highest_usn and last_sync_success time + * because it's our own entry + */ + nuv.ctr.ctr2.cursors[j].highest_usn = seq_num; + nuv.ctr.ctr2.cursors[j].last_sync_success = now; + break; + } + if (!found) { + /* + * here we update the highest_usn and last_sync_success time + * because it's our own entry + */ + nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= *our_invocation_id; + nuv.ctr.ctr2.cursors[ni].highest_usn = seq_num; + nuv.ctr.ctr2.cursors[ni].last_sync_success = now; + ni++; + } + } + + /* + * finally correct the size of the cursors array + */ + nuv.ctr.ctr2.count = ni; + + /* + * create the change ldb_message + */ + msg = ldb_msg_new(ar->sub.mem_ctx); + if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM); + msg->dn = ar->sub.search_msg->dn; + + nt_status = ndr_push_struct_blob(&nuv_value, msg, &nuv, + (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob); + if (!NT_STATUS_IS_OK(nt_status)) { + return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status)); + } + ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el); + if (ret != LDB_SUCCESS) { + return replmd_replicated_request_error(ar, ret); + } + nuv_el->flags = LDB_FLAG_MOD_REPLACE; + + ret = ldb_build_mod_req(&ar->sub.change_req, + ar->module->ldb, + ar->sub.mem_ctx, + msg, + NULL, + ar, + replmd_replicated_uptodate_modify_callback); + if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret); + +#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ + return ldb_next_request(ar->module, ar->sub.change_req); +#else + ret = ldb_next_request(ar->module, ar->sub.change_req); + if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret); + + ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL); + if (ar->sub.change_ret != LDB_SUCCESS) { + return replmd_replicated_request_error(ar, ar->sub.change_ret); + } + + talloc_free(ar->sub.mem_ctx); + ZERO_STRUCT(ar->sub); + + return replmd_replicated_request_done(ar); +#endif +} + +static int replmd_replicated_uptodate_search_callback(struct ldb_context *ldb, + void *private_data, + struct ldb_reply *ares) +{ + struct replmd_replicated_request *ar = talloc_get_type(private_data, + struct replmd_replicated_request); + bool is_done = false; + + switch (ares->type) { + case LDB_REPLY_ENTRY: + ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message); + break; + case LDB_REPLY_REFERRAL: + /* we ignore referrals */ + break; + case LDB_REPLY_EXTENDED: + case LDB_REPLY_DONE: + is_done = true; + } + + talloc_free(ares); + +#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ + if (is_done) { + ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL); + if (ar->sub.search_ret != LDB_SUCCESS) { + return replmd_replicated_request_error(ar, ar->sub.search_ret); + } + if (!ar->sub.search_msg) { + return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR); + } + + return replmd_replicated_uptodate_modify(ar); + } +#endif + return LDB_SUCCESS; +} + +static int replmd_replicated_uptodate_search(struct replmd_replicated_request *ar) +{ + int ret; + static const char *attrs[] = { + "replUpToDateVector", + NULL + }; + + ret = ldb_build_search_req(&ar->sub.search_req, + ar->module->ldb, + ar->sub.mem_ctx, + ar->objs->partition_dn, + LDB_SCOPE_BASE, + "(objectClass=*)", + attrs, + NULL, + ar, + replmd_replicated_uptodate_search_callback); + if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret); + +#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ + return ldb_next_request(ar->module, ar->sub.search_req); +#else + ret = ldb_next_request(ar->module, ar->sub.search_req); + if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret); + + ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL); + if (ar->sub.search_ret != LDB_SUCCESS) { + return replmd_replicated_request_error(ar, ar->sub.search_ret); + } + if (!ar->sub.search_msg) { + return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR); + } + + return replmd_replicated_uptodate_modify(ar); +#endif +} + +static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar) +{ + ar->sub.mem_ctx = talloc_new(ar); + if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM); + + return replmd_replicated_uptodate_search(ar); +} + static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req) { struct dsdb_extended_replicated_objects *objs; @@ -636,10 +964,15 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ return replmd_replicated_apply_next(ar); #else - while (req->handle->state != LDB_ASYNC_DONE) { + while (ar->index_current < ar->objs->num_objects && + req->handle->state != LDB_ASYNC_DONE) { replmd_replicated_apply_next(ar); } + if (req->handle->state != LDB_ASYNC_DONE) { + replmd_replicated_uptodate_vector(ar); + } + return LDB_SUCCESS; #endif } |