summaryrefslogtreecommitdiff
path: root/source4
diff options
context:
space:
mode:
Diffstat (limited to 'source4')
-rw-r--r--source4/torture/drs/python/getnc_exop.py44
1 files changed, 27 insertions, 17 deletions
diff --git a/source4/torture/drs/python/getnc_exop.py b/source4/torture/drs/python/getnc_exop.py
index 3aeb7e025a..904c013333 100644
--- a/source4/torture/drs/python/getnc_exop.py
+++ b/source4/torture/drs/python/getnc_exop.py
@@ -84,53 +84,63 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
"""Returns (owner, not_owner) pair where:
owner: dns name for FSMO owner
not_owner: dns name for DC not owning the FSMO"""
+ # collect info to return later
+ fsmo_info_1 = {"dns_name": self.dnsname_dc1,
+ "invocation_id": self.ldb_dc1.get_invocation_id(),
+ "ntds_guid": self.ldb_dc1.get_ntds_GUID()}
+ fsmo_info_2 = {"dns_name": self.dnsname_dc2,
+ "invocation_id": self.ldb_dc2.get_invocation_id(),
+ "ntds_guid": self.ldb_dc2.get_ntds_GUID()}
+ # determine the owner dc
res = self.ldb_dc1.search(fsmo_obj_dn,
scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn
fsmo_owner = res[0]["fSMORoleOwner"][0]
if fsmo_owner == self.info_dc1["dsServiceName"][0]:
- return (self.dnsname_dc1, self.dnsname_dc2)
- return (self.dnsname_dc2, self.dnsname_dc1)
+ return (fsmo_info_1, fsmo_info_2)
+ return (fsmo_info_2, fsmo_info_1)
def _check_exop_failed(self, ctr6, expected_failure):
- c = drsuapi.DsGetNCChangesCtr6()
self.assertEqual(ctr6.extended_ret, expected_failure)
- self.assertEqual(ctr6.object_count, 0)
- self.assertEqual(ctr6.first_object, None)
- self.aserrtEqual(ctr6.more_data, False)
+ #self.assertEqual(ctr6.object_count, 0)
+ #self.assertEqual(ctr6.first_object, None)
+ self.assertEqual(ctr6.more_data, False)
self.assertEqual(ctr6.nc_object_count, 0)
self.assertEqual(ctr6.nc_linked_attributes_count, 0)
self.assertEqual(ctr6.linked_attributes_count, 0)
self.assertEqual(ctr6.linked_attributes, None)
- self.assertEqual(ctr6.drs_error, 0)
+ self.assertEqual(ctr6.drs_error[0], 0)
def test_FSMONotOwner(self):
"""Test role transfer with against DC not owner of the role"""
fsmo_dn = self.ldb_dc1.get_schema_basedn()
- (fsmo_owner_dc, fsmo_not_owner_dc) = self._determine_fSMORoleOwner(fsmo_dn)
-
- req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
- invocation_id=self.ldb_dc1.get_invocation_id(),
+ (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+ req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
+ invocation_id=fsmo_not_owner["invocation_id"],
nc_dn_str=fsmo_dn,
exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
- (drs, drs_handle) = self._ds_bind(fsmo_not_owner_dc)
+ (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
(level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
self.assertEqual(level, 6, "Expected level 6 response!")
self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_FSMO_NOT_OWNER)
+ self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
+ self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
def test_InvalidDestDSA(self):
"""Test role transfer with invalid destination DSA guid"""
fsmo_dn = self.ldb_dc1.get_schema_basedn()
- (fsmo_owner_dc, fsmo_not_owner_dc) = self._determine_fSMORoleOwner(fsmo_dn)
+ (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
- invocation_id=self.ldb_dc1.get_invocation_id(),
- nc_dn_str=self.ldb_dc1.get_schema_basedn(),
+ invocation_id=fsmo_owner["invocation_id"],
+ nc_dn_str=fsmo_dn,
exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
- (drs, drs_handle) = self._ds_bind(fsmo_owner_dc)
+ (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
(level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
self.assertEqual(level, 6, "Expected level 6 response!")
- #ctr = drsuapi.DsGetNCChangesCtr6()
self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
+ self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
+ self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))