summaryrefslogtreecommitdiff
path: root/source4/dsdb/samdb/ldb_modules
diff options
context:
space:
mode:
Diffstat (limited to 'source4/dsdb/samdb/ldb_modules')
-rw-r--r--source4/dsdb/samdb/ldb_modules/repl_meta_data.c337
1 files changed, 335 insertions, 2 deletions
diff --git a/source4/dsdb/samdb/ldb_modules/repl_meta_data.c b/source4/dsdb/samdb/ldb_modules/repl_meta_data.c
index 487a6146af..7998d5466c 100644
--- a/source4/dsdb/samdb/ldb_modules/repl_meta_data.c
+++ b/source4/dsdb/samdb/ldb_modules/repl_meta_data.c
@@ -45,6 +45,7 @@
#include "lib/ldb/include/ldb_private.h"
#include "dsdb/samdb/samdb.h"
#include "librpc/gen_ndr/ndr_misc.h"
+#include "librpc/gen_ndr/ndr_drsuapi.h"
#include "librpc/gen_ndr/ndr_drsblobs.h"
struct replmd_replicated_request {
@@ -606,9 +607,11 @@ static int replmd_replicated_apply_search(struct replmd_replicated_request *ar)
static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
{
+#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
if (ar->index_current >= ar->objs->num_objects) {
- return replmd_replicated_request_done(ar);
+ return replmd_replicated_uptodate_vector(ar);
}
+#endif
ar->sub.mem_ctx = talloc_new(ar);
if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
@@ -616,6 +619,331 @@ static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
return replmd_replicated_apply_search(ar);
}
+static int replmd_replicated_uptodate_modify_callback(struct ldb_context *ldb,
+ void *private_data,
+ struct ldb_reply *ares)
+{
+#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
+ struct replmd_replicated_request *ar = talloc_get_type(private_data,
+ struct replmd_replicated_request);
+
+ ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
+ if (ar->sub.change_ret != LDB_SUCCESS) {
+ return replmd_replicated_request_error(ar, ar->sub.change_ret);
+ }
+
+ talloc_free(ar->sub.mem_ctx);
+ ZERO_STRUCT(ar->sub);
+
+ return replmd_replicated_request_done(ar);
+#else
+ return LDB_SUCCESS;
+#endif
+}
+
+static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
+{
+ NTSTATUS nt_status;
+ struct ldb_message *msg;
+ struct replUpToDateVectorBlob ouv;
+ const struct ldb_val *ouv_value;
+ const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
+ struct replUpToDateVectorBlob nuv;
+ struct ldb_val nuv_value;
+ struct ldb_message_element *nuv_el = NULL;
+ struct GUID *our_invocation_id;
+ uint32_t i,j,ni=0;
+ uint64_t seq_num;
+ bool found = false;
+ time_t t = time(NULL);
+ NTTIME now;
+ int ret;
+
+ ruv = ar->objs->uptodateness_vector;
+ ZERO_STRUCT(ouv);
+ ouv.version = 2;
+ ZERO_STRUCT(nuv);
+ nuv.version = 2;
+
+ unix_to_nt_time(&now, t);
+
+ /*
+ * we use the next sequence number for our own highest_usn
+ * because we will do a modify request and this will increment
+ * our highest_usn
+ */
+ ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
+ if (ret != LDB_SUCCESS) {
+ return replmd_replicated_request_error(ar, ret);
+ }
+
+ ouv_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replUpToDateVector");
+ if (ouv_value) {
+ nt_status = ndr_pull_struct_blob(ouv_value, ar->sub.mem_ctx, &ouv,
+ (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
+ if (!NT_STATUS_IS_OK(nt_status)) {
+ return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
+ }
+
+ if (ouv.version != 2) {
+ return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
+ }
+ }
+
+ /*
+ * the new uptodateness vector will at least
+ * contain 2 entries, one for the source_dsa and one the local server
+ *
+ * plus optional values from our old vector and the one from the source_dsa
+ */
+ nuv.ctr.ctr2.count = 2 + ouv.ctr.ctr2.count;
+ if (ruv) nuv.ctr.ctr2.count += ruv->count;
+ nuv.ctr.ctr2.cursors = talloc_array(ar->sub.mem_ctx,
+ struct drsuapi_DsReplicaCursor2,
+ nuv.ctr.ctr2.count);
+ if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
+
+ /* first copy the old vector */
+ for (i=0; i < ouv.ctr.ctr2.count; i++) {
+ nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
+ ni++;
+ }
+
+ /* merge in the source_dsa vector is available */
+ for (i=0; (ruv && i < ruv->count); i++) {
+ found = false;
+
+ for (j=0; j < ni; j++) {
+ if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
+ &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
+ continue;
+ }
+
+ found = true;
+
+ /*
+ * we update only the highest_usn and not the latest_sync_success time,
+ * because the last success stands for direct replication
+ */
+ if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
+ nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
+ }
+ break;
+ }
+
+ if (found) continue;
+
+ /* if it's not there yet, add it */
+ nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
+ ni++;
+ }
+
+ /*
+ * merge in the current highwatermark for the source_dsa
+ */
+ found = false;
+ for (j=0; j < ni; j++) {
+ if (!GUID_equal(ar->objs->source_dsa_invocation_id,
+ &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
+ continue;
+ }
+
+ found = true;
+
+ /*
+ * here we update the highest_usn and last_sync_success time
+ * because we're directly replicating from the source_dsa
+ *
+ * and use the tmp_highest_usn because this is what we have just applied
+ * to our ldb
+ */
+ nuv.ctr.ctr2.cursors[j].highest_usn = ar->objs->new_highwatermark->tmp_highest_usn;
+ nuv.ctr.ctr2.cursors[j].last_sync_success = now;
+ break;
+ }
+ if (!found) {
+ /*
+ * here we update the highest_usn and last_sync_success time
+ * because we're directly replicating from the source_dsa
+ *
+ * and use the tmp_highest_usn because this is what we have just applied
+ * to our ldb
+ */
+ nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= *ar->objs->source_dsa_invocation_id;
+ nuv.ctr.ctr2.cursors[ni].highest_usn = ar->objs->new_highwatermark->tmp_highest_usn;
+ nuv.ctr.ctr2.cursors[ni].last_sync_success = now;
+ ni++;
+ }
+
+ /*
+ * merge our own current values if we have a invocation_id already
+ * attached to the ldb
+ */
+ our_invocation_id = samdb_ntds_invocation_id(ar->module->ldb);
+ if (our_invocation_id) {
+ found = false;
+ for (j=0; j < ni; j++) {
+ if (!GUID_equal(our_invocation_id,
+ &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
+ continue;
+ }
+
+ found = true;
+
+ /*
+ * here we update the highest_usn and last_sync_success time
+ * because it's our own entry
+ */
+ nuv.ctr.ctr2.cursors[j].highest_usn = seq_num;
+ nuv.ctr.ctr2.cursors[j].last_sync_success = now;
+ break;
+ }
+ if (!found) {
+ /*
+ * here we update the highest_usn and last_sync_success time
+ * because it's our own entry
+ */
+ nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= *our_invocation_id;
+ nuv.ctr.ctr2.cursors[ni].highest_usn = seq_num;
+ nuv.ctr.ctr2.cursors[ni].last_sync_success = now;
+ ni++;
+ }
+ }
+
+ /*
+ * finally correct the size of the cursors array
+ */
+ nuv.ctr.ctr2.count = ni;
+
+ /*
+ * create the change ldb_message
+ */
+ msg = ldb_msg_new(ar->sub.mem_ctx);
+ if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
+ msg->dn = ar->sub.search_msg->dn;
+
+ nt_status = ndr_push_struct_blob(&nuv_value, msg, &nuv,
+ (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
+ if (!NT_STATUS_IS_OK(nt_status)) {
+ return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
+ }
+ ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
+ if (ret != LDB_SUCCESS) {
+ return replmd_replicated_request_error(ar, ret);
+ }
+ nuv_el->flags = LDB_FLAG_MOD_REPLACE;
+
+ ret = ldb_build_mod_req(&ar->sub.change_req,
+ ar->module->ldb,
+ ar->sub.mem_ctx,
+ msg,
+ NULL,
+ ar,
+ replmd_replicated_uptodate_modify_callback);
+ if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
+
+#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
+ return ldb_next_request(ar->module, ar->sub.change_req);
+#else
+ ret = ldb_next_request(ar->module, ar->sub.change_req);
+ if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
+
+ ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
+ if (ar->sub.change_ret != LDB_SUCCESS) {
+ return replmd_replicated_request_error(ar, ar->sub.change_ret);
+ }
+
+ talloc_free(ar->sub.mem_ctx);
+ ZERO_STRUCT(ar->sub);
+
+ return replmd_replicated_request_done(ar);
+#endif
+}
+
+static int replmd_replicated_uptodate_search_callback(struct ldb_context *ldb,
+ void *private_data,
+ struct ldb_reply *ares)
+{
+ struct replmd_replicated_request *ar = talloc_get_type(private_data,
+ struct replmd_replicated_request);
+ bool is_done = false;
+
+ switch (ares->type) {
+ case LDB_REPLY_ENTRY:
+ ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
+ break;
+ case LDB_REPLY_REFERRAL:
+ /* we ignore referrals */
+ break;
+ case LDB_REPLY_EXTENDED:
+ case LDB_REPLY_DONE:
+ is_done = true;
+ }
+
+ talloc_free(ares);
+
+#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
+ if (is_done) {
+ ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
+ if (ar->sub.search_ret != LDB_SUCCESS) {
+ return replmd_replicated_request_error(ar, ar->sub.search_ret);
+ }
+ if (!ar->sub.search_msg) {
+ return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
+ }
+
+ return replmd_replicated_uptodate_modify(ar);
+ }
+#endif
+ return LDB_SUCCESS;
+}
+
+static int replmd_replicated_uptodate_search(struct replmd_replicated_request *ar)
+{
+ int ret;
+ static const char *attrs[] = {
+ "replUpToDateVector",
+ NULL
+ };
+
+ ret = ldb_build_search_req(&ar->sub.search_req,
+ ar->module->ldb,
+ ar->sub.mem_ctx,
+ ar->objs->partition_dn,
+ LDB_SCOPE_BASE,
+ "(objectClass=*)",
+ attrs,
+ NULL,
+ ar,
+ replmd_replicated_uptodate_search_callback);
+ if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
+
+#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
+ return ldb_next_request(ar->module, ar->sub.search_req);
+#else
+ ret = ldb_next_request(ar->module, ar->sub.search_req);
+ if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
+
+ ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
+ if (ar->sub.search_ret != LDB_SUCCESS) {
+ return replmd_replicated_request_error(ar, ar->sub.search_ret);
+ }
+ if (!ar->sub.search_msg) {
+ return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
+ }
+
+ return replmd_replicated_uptodate_modify(ar);
+#endif
+}
+
+static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
+{
+ ar->sub.mem_ctx = talloc_new(ar);
+ if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
+
+ return replmd_replicated_uptodate_search(ar);
+}
+
static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
{
struct dsdb_extended_replicated_objects *objs;
@@ -636,10 +964,15 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
return replmd_replicated_apply_next(ar);
#else
- while (req->handle->state != LDB_ASYNC_DONE) {
+ while (ar->index_current < ar->objs->num_objects &&
+ req->handle->state != LDB_ASYNC_DONE) {
replmd_replicated_apply_next(ar);
}
+ if (req->handle->state != LDB_ASYNC_DONE) {
+ replmd_replicated_uptodate_vector(ar);
+ }
+
return LDB_SUCCESS;
#endif
}