From 99df3f6cbbdfa1693d805c2696e34f353dfa28ac Mon Sep 17 00:00:00 2001 From: Kamen Mazdrashki Date: Thu, 12 May 2011 19:43:54 +0300 Subject: s4/test/getnc_exop: Tune the the test to work against windows It turns out that sometimes, w2k8-r2 returns objects even when FSMO extended request has failed. Also verify that target DC returns source_dsa_guid and source_dsa_invocation_id correctly Autobuild-User: Kamen Mazdrashki Autobuild-Date: Fri May 13 02:26:04 CEST 2011 on sn-devel-104 --- source4/torture/drs/python/getnc_exop.py | 44 ++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/source4/torture/drs/python/getnc_exop.py b/source4/torture/drs/python/getnc_exop.py index 3aeb7e025a..904c013333 100644 --- a/source4/torture/drs/python/getnc_exop.py +++ b/source4/torture/drs/python/getnc_exop.py @@ -84,53 +84,63 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase): """Returns (owner, not_owner) pair where: owner: dns name for FSMO owner not_owner: dns name for DC not owning the FSMO""" + # collect info to return later + fsmo_info_1 = {"dns_name": self.dnsname_dc1, + "invocation_id": self.ldb_dc1.get_invocation_id(), + "ntds_guid": self.ldb_dc1.get_ntds_GUID()} + fsmo_info_2 = {"dns_name": self.dnsname_dc2, + "invocation_id": self.ldb_dc2.get_invocation_id(), + "ntds_guid": self.ldb_dc2.get_ntds_GUID()} + # determine the owner dc res = self.ldb_dc1.search(fsmo_obj_dn, scope=SCOPE_BASE, attrs=["fSMORoleOwner"]) assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn fsmo_owner = res[0]["fSMORoleOwner"][0] if fsmo_owner == self.info_dc1["dsServiceName"][0]: - return (self.dnsname_dc1, self.dnsname_dc2) - return (self.dnsname_dc2, self.dnsname_dc1) + return (fsmo_info_1, fsmo_info_2) + return (fsmo_info_2, fsmo_info_1) def _check_exop_failed(self, ctr6, expected_failure): - c = drsuapi.DsGetNCChangesCtr6() self.assertEqual(ctr6.extended_ret, expected_failure) - self.assertEqual(ctr6.object_count, 0) - self.assertEqual(ctr6.first_object, None) - self.aserrtEqual(ctr6.more_data, False) + #self.assertEqual(ctr6.object_count, 0) + #self.assertEqual(ctr6.first_object, None) + self.assertEqual(ctr6.more_data, False) self.assertEqual(ctr6.nc_object_count, 0) self.assertEqual(ctr6.nc_linked_attributes_count, 0) self.assertEqual(ctr6.linked_attributes_count, 0) self.assertEqual(ctr6.linked_attributes, None) - self.assertEqual(ctr6.drs_error, 0) + self.assertEqual(ctr6.drs_error[0], 0) def test_FSMONotOwner(self): """Test role transfer with against DC not owner of the role""" fsmo_dn = self.ldb_dc1.get_schema_basedn() - (fsmo_owner_dc, fsmo_not_owner_dc) = self._determine_fSMORoleOwner(fsmo_dn) - - req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef", - invocation_id=self.ldb_dc1.get_invocation_id(), + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"], + invocation_id=fsmo_not_owner["invocation_id"], nc_dn_str=fsmo_dn, exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE) - (drs, drs_handle) = self._ds_bind(fsmo_not_owner_dc) + (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"]) (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) self.assertEqual(level, 6, "Expected level 6 response!") self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_FSMO_NOT_OWNER) + self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"])) + self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"])) def test_InvalidDestDSA(self): """Test role transfer with invalid destination DSA guid""" fsmo_dn = self.ldb_dc1.get_schema_basedn() - (fsmo_owner_dc, fsmo_not_owner_dc) = self._determine_fSMORoleOwner(fsmo_dn) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef", - invocation_id=self.ldb_dc1.get_invocation_id(), - nc_dn_str=self.ldb_dc1.get_schema_basedn(), + invocation_id=fsmo_owner["invocation_id"], + nc_dn_str=fsmo_dn, exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE) - (drs, drs_handle) = self._ds_bind(fsmo_owner_dc) + (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"]) (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) self.assertEqual(level, 6, "Expected level 6 response!") - #ctr = drsuapi.DsGetNCChangesCtr6() self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER) + self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"])) + self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"])) -- cgit