summaryrefslogtreecommitdiff
path: root/source4/dsdb/tests/python/urgent_replication.py
diff options
context:
space:
mode:
authorStefan Metzmacher <metze@samba.org>2010-06-30 11:09:10 +0200
committerStefan Metzmacher <metze@samba.org>2010-06-30 11:10:28 +0200
commit14f8953aa4f000173a051b8010252063db5295c1 (patch)
treee599a4c86d34306e3d6de8ddc806033ae1e72641 /source4/dsdb/tests/python/urgent_replication.py
parent19d93c6a1e810dbd634f35cf440412c1ff958448 (diff)
downloadsamba-14f8953aa4f000173a051b8010252063db5295c1.tar.gz
samba-14f8953aa4f000173a051b8010252063db5295c1.tar.bz2
samba-14f8953aa4f000173a051b8010252063db5295c1.zip
s4:dsdb: move dsdb python tests from lib/ldb/ to dsdb/
metze
Diffstat (limited to 'source4/dsdb/tests/python/urgent_replication.py')
-rwxr-xr-xsource4/dsdb/tests/python/urgent_replication.py386
1 files changed, 386 insertions, 0 deletions
diff --git a/source4/dsdb/tests/python/urgent_replication.py b/source4/dsdb/tests/python/urgent_replication.py
new file mode 100755
index 0000000000..5e9f4ad128
--- /dev/null
+++ b/source4/dsdb/tests/python/urgent_replication.py
@@ -0,0 +1,386 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import optparse
+import sys
+import os
+
+sys.path.append("bin/python")
+import samba
+samba.ensure_external_module("subunit", "subunit/python")
+samba.ensure_external_module("testtools", "testtools")
+
+import samba.getopt as options
+
+from samba.auth import system_session
+from ldb import (SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT, Message,
+ MessageElement, Dn, FLAG_MOD_REPLACE)
+from samba.samdb import SamDB
+import samba.tests
+
+from subunit.run import SubunitTestRunner
+import unittest
+
+parser = optparse.OptionParser("urgent_replication [options] <host>")
+sambaopts = options.SambaOptions(parser)
+parser.add_option_group(sambaopts)
+parser.add_option_group(options.VersionOptions(parser))
+# use command line creds if available
+credopts = options.CredentialsOptions(parser)
+parser.add_option_group(credopts)
+opts, args = parser.parse_args()
+
+if len(args) < 1:
+ parser.print_usage()
+ sys.exit(1)
+
+host = args[0]
+
+lp = sambaopts.get_loadparm()
+creds = credopts.get_credentials(lp)
+
+class UrgentReplicationTests(samba.tests.TestCase):
+
+ def delete_force(self, ldb, dn):
+ try:
+ ldb.delete(dn)
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_NO_SUCH_OBJECT)
+
+ def find_basedn(self, ldb):
+ res = ldb.search(base="", expression="", scope=SCOPE_BASE,
+ attrs=["defaultNamingContext"])
+ self.assertEquals(len(res), 1)
+ return res[0]["defaultNamingContext"][0]
+
+ def setUp(self):
+ super(UrgentReplicationTests, self).setUp()
+ self.ldb = ldb
+ self.base_dn = self.find_basedn(ldb)
+
+ print "baseDN: %s\n" % self.base_dn
+
+ def test_nonurgent_object(self):
+ """Test if the urgent replication is not activated
+ when handling a non urgent object"""
+ self.ldb.add({
+ "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
+ "objectclass":"user",
+ "samaccountname":"nonurgenttest",
+ "description":"nonurgenttest description"});
+
+ # urgent replication should not be enabled when creating
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ # urgent replication should not be enabled when modifying
+ m = Message()
+ m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
+ m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
+ "description")
+ ldb.modify(m)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ # urgent replication should not be enabled when deleting
+ self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+ def test_nTDSDSA_object(self):
+ '''Test if the urgent replication is activated
+ when handling a nTDSDSA object'''
+ self.ldb.add({
+ "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn,
+ "objectclass":"server",
+ "cn":"test server",
+ "name":"test server",
+ "systemFlags":"50000000"});
+
+ self.ldb.add_ldif(
+ """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
+objectclass: nTDSDSA
+cn: NTDS Settings test
+options: 1
+instanceType: 4
+systemFlags: 33554432""", ["relax:0"]);
+
+ # urgent replication should be enabled when creation
+ res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ # urgent replication should NOT be enabled when modifying
+ m = Message()
+ m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
+ m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
+ "options")
+ ldb.modify(m)
+ res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ # urgent replication should be enabled when deleting
+ self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
+ res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
+
+
+ def test_crossRef_object(self):
+ '''Test if the urgent replication is activated
+ when handling a crossRef object'''
+ self.ldb.add({
+ "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
+ "objectClass": "crossRef",
+ "cn": "test crossRef",
+ "dnsRoot": lp.get("realm").lower(),
+ "instanceType": "4",
+ "nCName": self.base_dn,
+ "showInAdvancedViewOnly": "TRUE",
+ "name": "test crossRef",
+ "systemFlags": "1"});
+
+ # urgent replication should be enabled when creating
+ res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ # urgent replication should NOT be enabled when modifying
+ m = Message()
+ m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
+ m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
+ "systemFlags")
+ ldb.modify(m)
+ res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+ # urgent replication should be enabled when deleting
+ self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
+ res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+
+ def test_attributeSchema_object(self):
+ '''Test if the urgent replication is activated
+ when handling an attributeSchema object'''
+
+ try:
+ self.ldb.add_ldif(
+ """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
+objectClass: attributeSchema
+cn: test attributeSchema
+instanceType: 4
+isSingleValued: FALSE
+showInAdvancedViewOnly: FALSE
+attributeID: 0.9.2342.19200300.100.1.1
+attributeSyntax: 2.5.5.12
+adminDisplayName: test attributeSchema
+adminDescription: test attributeSchema
+oMSyntax: 64
+systemOnly: FALSE
+searchFlags: 8
+lDAPDisplayName: test attributeSchema
+name: test attributeSchema
+systemFlags: 0""", ["relax:0"]);
+
+ # urgent replication should be enabled when creating
+ res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ except LdbError:
+ print "Not testing urgent replication when creating attributeSchema object ...\n"
+
+ # urgent replication should be enabled when modifying
+ m = Message()
+ m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
+ m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE,
+ "lDAPDisplayName")
+ ldb.modify(m)
+ res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+ def test_classSchema_object(self):
+ '''Test if the urgent replication is activated
+ when handling a classSchema object'''
+ try:
+ self.ldb.add_ldif(
+ """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
+objectClass: classSchema
+cn: test classSchema
+instanceType: 4
+subClassOf: top
+governsID: 1.2.840.113556.1.5.999
+rDNAttID: cn
+showInAdvancedViewOnly: TRUE
+adminDisplayName: test classSchema
+adminDescription: test classSchema
+objectClassCategory: 1
+lDAPDisplayName: test classSchema
+name: test classSchema
+systemOnly: FALSE
+systemPossSuperiors: dfsConfiguration
+systemMustContain: msDFS-SchemaMajorVersion
+defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
+ CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
+systemFlags: 16
+defaultHidingValue: TRUE""", ["relax:0"]);
+
+ # urgent replication should be enabled when creating
+ res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ except LdbError:
+ print "Not testing urgent replication when creating classSchema object ...\n"
+
+ # urgent replication should be enabled when modifying
+ m = Message()
+ m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
+ m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE,
+ "lDAPDisplayName")
+ ldb.modify(m)
+ res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+ def test_secret_object(self):
+ '''Test if the urgent replication is activated
+ when handling a secret object'''
+
+ self.ldb.add({
+ "dn": "cn=test secret,cn=System," + self.base_dn,
+ "objectClass":"secret",
+ "cn":"test secret",
+ "name":"test secret",
+ "currentValue":"xxxxxxx"});
+
+
+ # urgent replication should be enabled when creating
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ # urgent replication should be enabled when modifying
+ m = Message()
+ m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn)
+ m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
+ "currentValue")
+ ldb.modify(m)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ # urgent replication should NOT be enabled when deleting
+ self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+ def test_rIDManager_object(self):
+ '''Test if the urgent replication is activated
+ when handling a rIDManager object'''
+ self.ldb.add_ldif(
+ """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
+objectClass: rIDManager
+cn: RID Manager test
+instanceType: 4
+showInAdvancedViewOnly: TRUE
+name: RID Manager test
+systemFlags: -1946157056
+isCriticalSystemObject: TRUE
+rIDAvailablePool: 133001-1073741823""", ["relax:0"])
+
+ # urgent replication should be enabled when creating
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ # urgent replication should be enabled when modifying
+ m = Message()
+ m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn)
+ m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
+ "systemFlags")
+ ldb.modify(m)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ # urgent replication should NOT be enabled when deleting
+ self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+ def test_urgent_attributes(self):
+ '''Test if the urgent replication is activated
+ when handling urgent attributes of an object'''
+
+ self.ldb.add({
+ "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
+ "objectclass":"user",
+ "samaccountname":"user UrgAttr test",
+ "userAccountControl":"1",
+ "lockoutTime":"0",
+ "pwdLastSet":"0",
+ "description":"urgent attributes test description"});
+
+ # urgent replication should NOT be enabled when creating
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ # urgent replication should be enabled when modifying userAccountControl
+ m = Message()
+ m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE,
+ "userAccountControl")
+ ldb.modify(m)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ # urgent replication should be enabled when modifying lockoutTime
+ m = Message()
+ m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
+ "lockoutTime")
+ ldb.modify(m)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ # urgent replication should be enabled when modifying pwdLastSet
+ m = Message()
+ m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE,
+ "pwdLastSet")
+ ldb.modify(m)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ # urgent replication should NOT be enabled when modifying a not-urgent
+ # attribute
+ m = Message()
+ m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ m["description"] = MessageElement("updated urgent attributes test description",
+ FLAG_MOD_REPLACE, "description")
+ ldb.modify(m)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+ # urgent replication should NOT be enabled when deleting
+ self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ res = self.ldb.load_partition_usn(self.base_dn)
+ self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+if not "://" in host:
+ if os.path.isfile(host):
+ host = "tdb://%s" % host
+ else:
+ host = "ldap://%s" % host
+
+
+ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp,
+ global_schema=False)
+
+runner = SubunitTestRunner()
+rc = 0
+if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful():
+ rc = 1
+sys.exit(rc)