From d4481be95c0c22ab8dd66edfdd82d1e9312b137d Mon Sep 17 00:00:00 2001 From: Kamen Mazdrashki Date: Wed, 11 May 2011 21:04:54 +0300 Subject: s4/getnc_exop: Initial implementation of a testsuite for GetNCChanges extended opeartion handling --- source4/torture/drs/python/getnc_exop.py | 136 +++++++++++++++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 source4/torture/drs/python/getnc_exop.py (limited to 'source4') diff --git a/source4/torture/drs/python/getnc_exop.py b/source4/torture/drs/python/getnc_exop.py new file mode 100644 index 0000000000..3aeb7e025a --- /dev/null +++ b/source4/torture/drs/python/getnc_exop.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Tests various schema replication scenarios +# +# Copyright (C) Kamen Mazdrashki 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +# +# Usage: +# export DC1=dc1_dns_name +# export DC2=dc2_dns_name +# export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun +# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD" +# + +import drs_base +import samba.tests + +from ldb import SCOPE_BASE + +from samba.dcerpc import drsuapi, misc, drsblobs +from samba.drs_utils import drs_DsBind + + +class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase): + """Intended as a semi-black box test case for DsGetNCChanges + implementation for extended operations. It should be testing + how DsGetNCChanges handles different input params (mostly invalid). + Final goal is to make DsGetNCChanges as binary compatible to + Windows implementation as possible""" + + def setUp(self): + super(DrsReplicaSyncTestCase, self).setUp() + + def tearDown(self): + super(DrsReplicaSyncTestCase, self).tearDown() + + def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop): + req8 = drsuapi.DsGetNCChangesRequest8() + + req8.destination_dsa_guid = misc.GUID(dest_dsa) + req8.source_dsa_invocation_id = misc.GUID(invocation_id) + req8.naming_context = drsuapi.DsReplicaObjectIdentifier() + req8.naming_context.dn = unicode(nc_dn_str) + req8.highwatermark = drsuapi.DsReplicaHighWaterMark() + req8.highwatermark.tmp_highest_usn = 0 + req8.highwatermark.reserved_usn = 0 + req8.highwatermark.highest_usn = 0 + req8.uptodateness_vector = None + req8.replica_flags = 0 + req8.max_object_count = 0 + req8.max_ndr_size = 402116 + req8.extended_op = exop + req8.fsmo_info = 0 + req8.partial_attribute_set = None + req8.partial_attribute_set_ex = None + req8.mapping_ctr.num_mappings = 0 + req8.mapping_ctr.mappings = None + + return req8 + + def _ds_bind(self, server_name): + binding_str = "ncacn_ip_tcp:%s[print,seal]" % server_name + + drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials()) + (drs_handle, supported_extensions) = drs_DsBind(drs) + return (drs, drs_handle) + + def _determine_fSMORoleOwner(self, fsmo_obj_dn): + """Returns (owner, not_owner) pair where: + owner: dns name for FSMO owner + not_owner: dns name for DC not owning the FSMO""" + res = self.ldb_dc1.search(fsmo_obj_dn, + scope=SCOPE_BASE, attrs=["fSMORoleOwner"]) + assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn + fsmo_owner = res[0]["fSMORoleOwner"][0] + if fsmo_owner == self.info_dc1["dsServiceName"][0]: + return (self.dnsname_dc1, self.dnsname_dc2) + return (self.dnsname_dc2, self.dnsname_dc1) + + def _check_exop_failed(self, ctr6, expected_failure): + c = drsuapi.DsGetNCChangesCtr6() + self.assertEqual(ctr6.extended_ret, expected_failure) + self.assertEqual(ctr6.object_count, 0) + self.assertEqual(ctr6.first_object, None) + self.aserrtEqual(ctr6.more_data, False) + self.assertEqual(ctr6.nc_object_count, 0) + self.assertEqual(ctr6.nc_linked_attributes_count, 0) + self.assertEqual(ctr6.linked_attributes_count, 0) + self.assertEqual(ctr6.linked_attributes, None) + self.assertEqual(ctr6.drs_error, 0) + + def test_FSMONotOwner(self): + """Test role transfer with against DC not owner of the role""" + fsmo_dn = self.ldb_dc1.get_schema_basedn() + (fsmo_owner_dc, fsmo_not_owner_dc) = self._determine_fSMORoleOwner(fsmo_dn) + + req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef", + invocation_id=self.ldb_dc1.get_invocation_id(), + nc_dn_str=fsmo_dn, + exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE) + + (drs, drs_handle) = self._ds_bind(fsmo_not_owner_dc) + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + self.assertEqual(level, 6, "Expected level 6 response!") + self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_FSMO_NOT_OWNER) + + def test_InvalidDestDSA(self): + """Test role transfer with invalid destination DSA guid""" + fsmo_dn = self.ldb_dc1.get_schema_basedn() + (fsmo_owner_dc, fsmo_not_owner_dc) = self._determine_fSMORoleOwner(fsmo_dn) + + req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef", + invocation_id=self.ldb_dc1.get_invocation_id(), + nc_dn_str=self.ldb_dc1.get_schema_basedn(), + exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE) + + (drs, drs_handle) = self._ds_bind(fsmo_owner_dc) + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + self.assertEqual(level, 6, "Expected level 6 response!") + #ctr = drsuapi.DsGetNCChangesCtr6() + self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER) -- cgit