From ee34e2c94bccb90f6518c401192c5f04ae509d25 Mon Sep 17 00:00:00 2001 From: Kamen Mazdrashki Date: Sun, 7 Nov 2010 04:41:50 +0200 Subject: s4-test: Initial implementation for Schema replication black box test --- source4/torture/drs/python/repl_schema.py | 174 ++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 source4/torture/drs/python/repl_schema.py (limited to 'source4/torture') diff --git a/source4/torture/drs/python/repl_schema.py b/source4/torture/drs/python/repl_schema.py new file mode 100644 index 0000000000..b407dfbe5e --- /dev/null +++ b/source4/torture/drs/python/repl_schema.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Tests various schema replication scenarios +# +# Copyright (C) Kamen Mazdrashki 2010 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +# +# Usage: +# export DC1=dc1_dns_name +# export DC2=dc2_dns_name +# export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun +# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN repl_schema -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD" +# + +import sys +import time +import random +import os + +sys.path.append("bin/python") + +from samba.auth import system_session +from ldb import SCOPE_BASE, SCOPE_SUBTREE +from samba.samdb import SamDB + +import samba.tests + + +class DrsReplSchemaTestCase(samba.tests.TestCase): + + # RootDSE msg for DC1 + info_dc1 = None + ldb_dc1 = None + # RootDSE msg for DC1 + info_dc2 = None + ldb_dc2 = None + # prefix for all objects created + obj_prefix = None + + def setUp(self): + super(DrsReplSchemaTestCase, self).setUp() + + # connect to DCs singleton + self._dc_connect("dc1", "DC1", ldap_only=True) + self._dc_connect("dc2", "DC2", ldap_only=True) + + # initialize objects prefix if not done yet + if self.obj_prefix is None: + t = time.strftime("%s", time.gmtime()) + DrsReplSchemaTestCase.obj_prefix = "DrsReplSchema-%s-" % t + + # cache some of RootDSE props + self.schema_dn = self.info_dc1["schemaNamingContext"][0] + self.domain_dn = self.info_dc1["defaultNamingContext"][0] + self.config_dn = self.info_dc1["configurationNamingContext"][0] + self.forest_level = int(self.info_dc1["forestFunctionality"][0]) + + # we will need DCs DNS names for 'samba-tool drs' command + self.dnsname_dc1 = self.info_dc1["dnsHostName"][0] + self.dnsname_dc2 = self.info_dc2["dnsHostName"][0] + + def tearDown(self): + super(DrsReplSchemaTestCase, self).tearDown() + + @classmethod + def _dc_connect(cls, attr_name, env_var, ldap_only=True): + ldb_dc = None + attr_name_ldb = "ldb_" + attr_name + if hasattr(cls, attr_name_ldb): + ldb_dc = getattr(cls, attr_name_ldb) + if ldb_dc is None: + url_dc = samba.tests.env_get_var_value(env_var) + ldb_dc = samba.tests.connect_samdb(url_dc, ldap_only=ldap_only) + res = ldb_dc.search(base="", expression="", scope=SCOPE_BASE, attrs=["*"]) + info_dc = res[0] + setattr(cls, "ldb_" + attr_name, ldb_dc) + setattr(cls, "url_" + attr_name, url_dc) + setattr(cls, "info_" + attr_name, info_dc) + return ldb_dc + + def _net_drs_replicate(self, DC, fromDC, nc_dn): + """Triggers replication cycle on 'DC' to + replicate from 'fromDC'. Naming context to + be replicated is 'nc_dn' dn""" + # find out where is net command + samba_tool_cmd = os.path.abspath("./bin/samba-tool") + # make command line credentials string + creds = samba.tests.cmdline_credentials + cmd_line_auth = "-U%s/%s%%%s" % (creds.get_domain(), + creds.get_username(), creds.get_password()) + # bin/samba-tool drs replicate + cmd_line = "%s drs replicate %s %s %s %s" % (samba_tool_cmd, DC, fromDC, + nc_dn, cmd_line_auth) + ret = os.system(cmd_line) + self.assertEquals(ret, 0, "Replicating %s from %s has failed!" % (DC, fromDC)) + + def _GUID_string(self, guid): + return self.ldb_dc1.schema_format_value("objectGUID", guid) + + def _ldap_schemaUpdateNow(self, sam_db): + ldif = """ +dn: +changetype: modify +add: schemaUpdateNow +schemaUpdateNow: 1 +""" + sam_db.modify_ldif(ldif) + + def _make_obj_names(self, base_name): + '''Try to create a unique name for an object + that is to be added to schema''' + obj_name = self.obj_prefix + base_name + obj_dn = "CN=%s,%s" % (obj_name, self.schema_dn) + return (obj_name, obj_dn) + + def _make_class_ldif(self, class_name, class_dn, attrs=None): + ldif = """ +dn: """ + class_dn + """ +objectClass: top +objectClass: classSchema +cn: """ + class_name + """ +governsId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.13 +instanceType: 4 +objectClassCategory: 1 +subClassOf: organizationalPerson +systemOnly: FALSE +""" + return ldif + + def _check_object(self, obj_dn): + '''Check if object obj_dn exists on both DCs''' + res_dc1 = self.ldb_dc1.search(base=obj_dn, + scope=SCOPE_BASE, + attrs=["*"]) + self.assertEquals(len(res_dc1), 1, + "%s doesn't exists on %s" % (obj_dn, self.dnsname_dc1)) + try: + res_dc2 = self.ldb_dc2.search(base=obj_dn, + scope=SCOPE_BASE, + attrs=["*"]) + except LdbError, (ERR_NO_SUCH_OBJECT, _): + self.fail("%s doesn't exists on %s" % (obj_dn, self.dnsname_dc2)) + self.assertEquals(len(res_dc2), 1, + "%s doesn't exists on %s" % (obj_dn, self.dnsname_dc2)) + + def test_all(self): + """Basic plan is to create bunch of classSchema + and attributeSchema objects, replicate Schema NC + and then check all objects are replicated correctly""" + + # add new classSchema object + (class_name, class_dn) = self._make_obj_names("cls-A") + ldif = self._make_class_ldif(class_name, class_dn) + self.ldb_dc1.add_ldif(ldif) + self._ldap_schemaUpdateNow(self.ldb_dc1) + # force replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, nc_dn=self.schema_dn) + # check object is replicated + self._check_object(class_dn) -- cgit