summaryrefslogtreecommitdiff
path: root/source4/lib/ldb/tests
diff options
context:
space:
mode:
Diffstat (limited to 'source4/lib/ldb/tests')
-rw-r--r--source4/lib/ldb/tests/python/urgent_replication.py414
1 files changed, 414 insertions, 0 deletions
diff --git a/source4/lib/ldb/tests/python/urgent_replication.py b/source4/lib/ldb/tests/python/urgent_replication.py
new file mode 100644
index 0000000000..d0a63a1146
--- /dev/null
+++ b/source4/lib/ldb/tests/python/urgent_replication.py
@@ -0,0 +1,414 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+# This is a port of the original in testprogs/ejs/ldap.js
+
+import getopt
+import optparse
+import sys
+import time
+import random
+import base64
+import os
+
+sys.path.append("bin/python")
+sys.path.append("../lib/subunit/python")
+
+import samba.getopt as options
+
+from samba.auth import system_session
+from ldb import SCOPE_BASE, LdbError
+from ldb import ERR_NO_SUCH_OBJECT
+from ldb import Message, MessageElement, Dn
+from ldb import FLAG_MOD_REPLACE
+from samba import Ldb
+from samba import glue
+
+from subunit.run import SubunitTestRunner
+import unittest
+
+from samba.ndr import ndr_pack, ndr_unpack
+from samba.dcerpc import security
+
+parser = optparse.OptionParser("ldap [options] <host>")
+sambaopts = options.SambaOptions(parser)
+parser.add_option_group(sambaopts)
+parser.add_option_group(options.VersionOptions(parser))
+# use command line creds if available
+credopts = options.CredentialsOptions(parser)
+parser.add_option_group(credopts)
+opts, args = parser.parse_args()
+
+if len(args) < 1:
+ parser.print_usage()
+ sys.exit(1)
+
+host = args[0]
+
+lp = sambaopts.get_loadparm()
+creds = credopts.get_credentials(lp)
+
+class UrgentReplicationTests(unittest.TestCase):
+
+ def delete_force(self, ldb, dn):
+ try:
+ ldb.delete(dn)
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_NO_SUCH_OBJECT)
+
+ def find_basedn(self, ldb):
+ res = ldb.search(base="", expression="", scope=SCOPE_BASE,
+ attrs=["defaultNamingContext"])
+ self.assertEquals(len(res), 1)
+ return res[0]["defaultNamingContext"][0]
+
+ def find_configurationdn(self, ldb):
+ res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["configurationNamingContext"])
+ self.assertEquals(len(res), 1)
+ return res[0]["configurationNamingContext"][0]
+
+ def find_schemadn(self, ldb):
+ res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["schemaNamingContext"])
+ self.assertEquals(len(res), 1)
+ return res[0]["schemaNamingContext"][0]
+
+ def find_domain_sid(self):
+ res = self.ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE)
+ return ndr_unpack( security.dom_sid,res[0]["objectSid"][0])
+
+ def setUp(self):
+ self.ldb = ldb
+ self.gc_ldb = gc_ldb
+ self.base_dn = self.find_basedn(ldb)
+ self.configuration_dn = self.find_configurationdn(ldb)
+ self.schema_dn = self.find_schemadn(ldb)
+ self.domain_sid = self.find_domain_sid()
+
+ print "baseDN: %s\n" % self.base_dn
+
+ def test_nonurgent_object(self):
+ '''Test if the urgent replication is not activated
+ when handling a non urgent object'''
+ self.ldb.add({
+ "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
+ "objectclass":"user",
+ "samaccountname":"nonurgenttest",
+ "description":"nonurgenttest description"});
+
+ ''' urgent replication shouldn't be enabled when creating '''
+ res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ ''' urgent replication shouldn't be enabled when modifying '''
+ m = Message()
+ m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
+ m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
+ "description")
+ ldb.modify(m)
+ res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ ''' urgent replication shouldn't be enabled when deleting '''
+ self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
+ res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+ def test_nTDSDSA_object(self):
+ '''Test if the urgent replication is activated
+ when handling a nTDSDSA object'''
+ self.ldb.add({
+ "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn,
+ "objectclass":"server",
+ "cn":"test server",
+ "name":"test server",
+ "systemFlags":"50000000"});
+
+ self.ldb.add_ldif(
+ """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
+objectclass: nTDSDSA
+cn: NTDS Settings test
+options: 1
+instanceType: 4
+systemFlags: 33554432""", ["relax:0"]);
+
+ ''' urgent replication should be enabled when creation '''
+ res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ ''' urgent replication should NOT be enabled when modifying '''
+ m = Message()
+ m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
+ m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
+ "options")
+ ldb.modify(m)
+ res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ ''' urgent replication should be enabled when deleting '''
+ self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
+ res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
+
+
+ def test_crossRef_object(self):
+ '''Test if the urgent replication is activated
+ when handling a crossRef object'''
+ self.ldb.add({
+ "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
+ "objectClass": "crossRef",
+ "cn": "test crossRef",
+ "instanceType": "4",
+ "nCName": self.base_dn,
+ "showInAdvancedViewOnly": "TRUE",
+ "name": "test crossRef",
+ "systemFlags": "1"});
+
+ ''' urgent replication should be enabled when creating '''
+ res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ ''' urgent replication should NOT be enabled when modifying '''
+ m = Message()
+ m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
+ m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
+ "systemFlags")
+ ldb.modify(m)
+ res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+ ''' urgent replication should be enabled when deleting '''
+ self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
+ res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+
+ def test_attributeSchema_object(self):
+ '''Test if the urgent replication is activated
+ when handling an attributeSchema object'''
+
+ try:
+ self.ldb.add_ldif(
+ """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
+objectClass: attributeSchema
+cn: test attributeSchema
+instanceType: 4
+isSingleValued: FALSE
+showInAdvancedViewOnly: FALSE
+attributeID: 0.9.2342.19200300.100.1.1
+attributeSyntax: 2.5.5.12
+adminDisplayName: test attributeSchema
+adminDescription: test attributeSchema
+oMSyntax: 64
+systemOnly: FALSE
+searchFlags: 8
+lDAPDisplayName: test attributeSchema
+name: test attributeSchema
+systemFlags: 0""", ["relax:0"]);
+
+ ''' urgent replication should be enabled when creating '''
+ res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ except LdbError:
+ print "Not testing urgent replication when creating attributeSchema object ...\n"
+
+ ''' urgent replication should be enabled when modifying '''
+ m = Message()
+ m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
+ m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE,
+ "lDAPDisplayName")
+ ldb.modify(m)
+ res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+ def test_classSchema_object(self):
+ '''Test if the urgent replication is activated
+ when handling a classSchema object'''
+ try:
+ self.ldb.add_ldif(
+ """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
+objectClass: classSchema
+cn: test classSchema
+instanceType: 4
+subClassOf: top
+governsID: 1.2.840.113556.1.5.999
+rDNAttID: cn
+showInAdvancedViewOnly: TRUE
+adminDisplayName: test classSchema
+adminDescription: test classSchema
+objectClassCategory: 1
+lDAPDisplayName: test classSchema
+name: test classSchema
+systemOnly: FALSE
+systemPossSuperiors: dfsConfiguration
+systemMustContain: msDFS-SchemaMajorVersion
+defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
+ CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
+systemFlags: 16
+defaultHidingValue: TRUE""", ["relax:0"]);
+
+ ''' urgent replication should be enabled when creating '''
+ res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ except LdbError:
+ print "Not testing urgent replication when creating classSchema object ...\n"
+
+ ''' urgent replication should be enabled when modifying '''
+ m = Message()
+ m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
+ m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE,
+ "lDAPDisplayName")
+ ldb.modify(m)
+ res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+ def test_secret_object(self):
+
+ '''Test if the urgent replication is activated
+ when handling a secret object'''
+
+ self.ldb.add({
+ "dn": "cn=test secret,cn=System," + self.base_dn,
+ "objectClass":"secret",
+ "cn":"test secret",
+ "name":"test secret",
+ "currentValue":"xxxxxxx"});
+
+
+ ''' urgent replication should be enabled when creationg '''
+ res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ ''' urgent replication should be enabled when modifying '''
+ m = Message()
+ m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn)
+ m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
+ "currentValue")
+ ldb.modify(m)
+ res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ ''' urgent replication should NOT be enabled when deleting '''
+ self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
+ res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+ def test_rIDManager_object(self):
+ '''Test if the urgent replication is activated
+ when handling a rIDManager object'''
+ self.ldb.add_ldif(
+ """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
+objectClass: rIDManager
+cn: RID Manager test
+instanceType: 4
+showInAdvancedViewOnly: TRUE
+name: RID Manager test
+systemFlags: -1946157056
+isCriticalSystemObject: TRUE
+rIDAvailablePool: 133001-1073741823""", ["relax:0"])
+
+ ''' urgent replication should be enabled when creating '''
+ res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ ''' urgent replication should be enabled when modifying '''
+ m = Message()
+ m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn)
+ m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
+ "systemFlags")
+ ldb.modify(m)
+ res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ ''' urgent replication should NOT be enabled when deleting '''
+ self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
+ res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+ def test_urgent_attributes(self):
+ '''Test if the urgent replication is activated
+ when handling urgent attributes of an object'''
+
+ self.ldb.add({
+ "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
+ "objectclass":"user",
+ "samaccountname":"user UrgAttr test",
+ "userAccountControl":"1",
+ "lockoutTime":"0",
+ "pwdLastSet":"0",
+ "description":"urgent attributes test description"});
+
+ ''' urgent replication should NOT be enabled when creating '''
+ res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ ''' urgent replication should be enabled when modifying userAccountControl '''
+ m = Message()
+ m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE,
+ "userAccountControl")
+ ldb.modify(m)
+ res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ ''' urgent replication should be enabled when modifying lockoutTime '''
+ m = Message()
+ m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
+ "lockoutTime")
+ ldb.modify(m)
+ res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ ''' urgent replication should be enabled when modifying pwdLastSet '''
+ m = Message()
+ m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE,
+ "pwdLastSet")
+ ldb.modify(m)
+ res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ ''' urgent replication should NOT be enabled when modifying a not-urgent attribute '''
+ m = Message()
+ m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ m["description"] = MessageElement("updated urgent attributes test description",
+ FLAG_MOD_REPLACE, "description")
+ ldb.modify(m)
+ res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ ''' urgent replication should NOT be enabled when deleting '''
+ self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+if not "://" in host:
+ if os.path.isfile(host):
+ host = "tdb://%s" % host
+ else:
+ host = "ldap://%s" % host
+
+
+ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp)
+if not "tdb://" in host:
+ gc_ldb = Ldb("%s:3268" % host, credentials=creds,
+ session_info=system_session(), lp=lp)
+else:
+ gc_ldb = None
+
+runner = SubunitTestRunner()
+rc = 0
+if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful():
+ rc = 1
+sys.exit(rc)