summaryrefslogtreecommitdiff
path: root/source4/torture
diff options
context:
space:
mode:
Diffstat (limited to 'source4/torture')
-rw-r--r--source4/torture/drs/python/repl_schema.py90
1 files changed, 59 insertions, 31 deletions
diff --git a/source4/torture/drs/python/repl_schema.py b/source4/torture/drs/python/repl_schema.py
index 4016ed2105..16f00acdc5 100644
--- a/source4/torture/drs/python/repl_schema.py
+++ b/source4/torture/drs/python/repl_schema.py
@@ -33,11 +33,14 @@ import random
import os
sys.path.append("bin/python")
+import samba
+samba.ensure_external_module("testtools", "testtools")
+samba.ensure_external_module("subunit", "subunit/python")
-from samba.auth import system_session
from ldb import LdbError, ERR_NO_SUCH_OBJECT
-from ldb import SCOPE_BASE, SCOPE_SUBTREE
-from samba.samdb import SamDB
+from ldb import SCOPE_BASE
+from ldb import Message
+from ldb import FLAG_MOD_REPLACE
import samba.tests
@@ -96,34 +99,57 @@ class DrsReplSchemaTestCase(samba.tests.TestCase):
return self.ldb_dc1.schema_format_value("objectGUID", guid)
def _ldap_schemaUpdateNow(self, sam_db):
- ldif = """
-dn:
-changetype: modify
-add: schemaUpdateNow
-schemaUpdateNow: 1
-"""
- sam_db.modify_ldif(ldif)
+ rec = {"dn": "",
+ "schemaUpdateNow": "1"}
+ m = Message.from_dict(sam_db, rec, FLAG_MOD_REPLACE)
+ sam_db.modify(m)
def _make_obj_names(self, base_name):
'''Try to create a unique name for an object
that is to be added to schema'''
obj_name = self.obj_prefix + base_name
+ obj_ldn = obj_name.replace("-", "")
obj_dn = "CN=%s,%s" % (obj_name, self.schema_dn)
- return (obj_name, obj_dn)
-
- def _make_class_ldif(self, class_name, class_dn, attrs=None):
- ldif = """
-dn: """ + class_dn + """
-objectClass: top
-objectClass: classSchema
-cn: """ + class_name + """
-governsId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.13
-instanceType: 4
-objectClassCategory: 1
-subClassOf: organizationalPerson
-systemOnly: FALSE
-"""
- return ldif
+ return (obj_dn, obj_name, obj_ldn)
+
+ def _schema_new_class(self, ldb_ctx, base_name, attrs=None):
+ (class_dn, class_name, class_ldn) = self._make_obj_names(base_name)
+ rec = {"dn": class_dn,
+ "objectClass": ["top", "classSchema"],
+ "cn": class_name,
+ "lDAPDisplayName": class_ldn,
+ "governsId": "1.2.840." + str(random.randint(1,100000)) + ".1.5.13",
+ "instanceType": "4",
+ "objectClassCategory": "1",
+ "subClassOf": "organizationalPerson",
+ "systemOnly": "FALSE"}
+ # allow overriding/adding attributes
+ if not attrs is None:
+ rec.update(attrs)
+ # add it to the Schema
+ ldb_ctx.add(rec)
+ self._ldap_schemaUpdateNow(ldb_ctx)
+ return (rec["lDAPDisplayName"], rec["dn"])
+
+ def _schema_new_attr(self, ldb_ctx, base_name, attrs=None):
+ (attr_dn, attr_name, attr_ldn) = self._make_obj_names(base_name)
+ rec = {"dn": attr_dn,
+ "objectClass": ["top", "attributeSchema"],
+ "cn": attr_name,
+ "lDAPDisplayName": attr_ldn,
+ "attributeId": "1.2.841." + str(random.randint(1,100000)) + ".1.5.13",
+ "attributeSyntax": "2.5.5.12",
+ "omSyntax": "64",
+ "instanceType": "4",
+ "isSingleValued": "TRUE",
+ "systemOnly": "FALSE"}
+ # allow overriding/adding attributes
+ if not attrs is None:
+ rec.update(attrs)
+ # add it to the Schema
+ ldb_ctx.add(rec)
+ self._ldap_schemaUpdateNow(ldb_ctx)
+ return (rec["lDAPDisplayName"], rec["dn"])
def _check_object(self, obj_dn):
'''Check if object obj_dn exists on both DCs'''
@@ -146,12 +172,14 @@ systemOnly: FALSE
and attributeSchema objects, replicate Schema NC
and then check all objects are replicated correctly"""
+ # add new attributeSchema object
+ (a_ldn, a_dn) = self._schema_new_attr(self.ldb_dc1, "attr-A")
# add new classSchema object
- (class_name, class_dn) = self._make_obj_names("cls-A")
- ldif = self._make_class_ldif(class_name, class_dn)
- self.ldb_dc1.add_ldif(ldif)
- self._ldap_schemaUpdateNow(self.ldb_dc1)
+ (c_ldn, c_dn) = self._schema_new_class(self.ldb_dc1, "cls-A")
+
# force replication from DC1 to DC2
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, nc_dn=self.schema_dn)
- # check object is replicated
- self._check_object(class_dn)
+
+ # check objects are replicated
+ self._check_object(c_dn)
+ self._check_object(a_dn)