diff options
Diffstat (limited to 'source4/torture/drs/python')
-rw-r--r-- | source4/torture/drs/python/repl_schema.py | 90 |
1 files changed, 59 insertions, 31 deletions
diff --git a/source4/torture/drs/python/repl_schema.py b/source4/torture/drs/python/repl_schema.py index 4016ed2105..16f00acdc5 100644 --- a/source4/torture/drs/python/repl_schema.py +++ b/source4/torture/drs/python/repl_schema.py @@ -33,11 +33,14 @@ import random import os sys.path.append("bin/python") +import samba +samba.ensure_external_module("testtools", "testtools") +samba.ensure_external_module("subunit", "subunit/python") -from samba.auth import system_session from ldb import LdbError, ERR_NO_SUCH_OBJECT -from ldb import SCOPE_BASE, SCOPE_SUBTREE -from samba.samdb import SamDB +from ldb import SCOPE_BASE +from ldb import Message +from ldb import FLAG_MOD_REPLACE import samba.tests @@ -96,34 +99,57 @@ class DrsReplSchemaTestCase(samba.tests.TestCase): return self.ldb_dc1.schema_format_value("objectGUID", guid) def _ldap_schemaUpdateNow(self, sam_db): - ldif = """ -dn: -changetype: modify -add: schemaUpdateNow -schemaUpdateNow: 1 -""" - sam_db.modify_ldif(ldif) + rec = {"dn": "", + "schemaUpdateNow": "1"} + m = Message.from_dict(sam_db, rec, FLAG_MOD_REPLACE) + sam_db.modify(m) def _make_obj_names(self, base_name): '''Try to create a unique name for an object that is to be added to schema''' obj_name = self.obj_prefix + base_name + obj_ldn = obj_name.replace("-", "") obj_dn = "CN=%s,%s" % (obj_name, self.schema_dn) - return (obj_name, obj_dn) - - def _make_class_ldif(self, class_name, class_dn, attrs=None): - ldif = """ -dn: """ + class_dn + """ -objectClass: top -objectClass: classSchema -cn: """ + class_name + """ -governsId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.13 -instanceType: 4 -objectClassCategory: 1 -subClassOf: organizationalPerson -systemOnly: FALSE -""" - return ldif + return (obj_dn, obj_name, obj_ldn) + + def _schema_new_class(self, ldb_ctx, base_name, attrs=None): + (class_dn, class_name, class_ldn) = self._make_obj_names(base_name) + rec = {"dn": class_dn, + "objectClass": ["top", "classSchema"], + "cn": class_name, + "lDAPDisplayName": class_ldn, + "governsId": "1.2.840." + str(random.randint(1,100000)) + ".1.5.13", + "instanceType": "4", + "objectClassCategory": "1", + "subClassOf": "organizationalPerson", + "systemOnly": "FALSE"} + # allow overriding/adding attributes + if not attrs is None: + rec.update(attrs) + # add it to the Schema + ldb_ctx.add(rec) + self._ldap_schemaUpdateNow(ldb_ctx) + return (rec["lDAPDisplayName"], rec["dn"]) + + def _schema_new_attr(self, ldb_ctx, base_name, attrs=None): + (attr_dn, attr_name, attr_ldn) = self._make_obj_names(base_name) + rec = {"dn": attr_dn, + "objectClass": ["top", "attributeSchema"], + "cn": attr_name, + "lDAPDisplayName": attr_ldn, + "attributeId": "1.2.841." + str(random.randint(1,100000)) + ".1.5.13", + "attributeSyntax": "2.5.5.12", + "omSyntax": "64", + "instanceType": "4", + "isSingleValued": "TRUE", + "systemOnly": "FALSE"} + # allow overriding/adding attributes + if not attrs is None: + rec.update(attrs) + # add it to the Schema + ldb_ctx.add(rec) + self._ldap_schemaUpdateNow(ldb_ctx) + return (rec["lDAPDisplayName"], rec["dn"]) def _check_object(self, obj_dn): '''Check if object obj_dn exists on both DCs''' @@ -146,12 +172,14 @@ systemOnly: FALSE and attributeSchema objects, replicate Schema NC and then check all objects are replicated correctly""" + # add new attributeSchema object + (a_ldn, a_dn) = self._schema_new_attr(self.ldb_dc1, "attr-A") # add new classSchema object - (class_name, class_dn) = self._make_obj_names("cls-A") - ldif = self._make_class_ldif(class_name, class_dn) - self.ldb_dc1.add_ldif(ldif) - self._ldap_schemaUpdateNow(self.ldb_dc1) + (c_ldn, c_dn) = self._schema_new_class(self.ldb_dc1, "cls-A") + # force replication from DC1 to DC2 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, nc_dn=self.schema_dn) - # check object is replicated - self._check_object(class_dn) + + # check objects are replicated + self._check_object(c_dn) + self._check_object(a_dn) |