diff options
Diffstat (limited to 'source4/lib/ldb/tests/python/urgent_replication.py')
-rwxr-xr-x | source4/lib/ldb/tests/python/urgent_replication.py | 386 |
1 files changed, 0 insertions, 386 deletions
diff --git a/source4/lib/ldb/tests/python/urgent_replication.py b/source4/lib/ldb/tests/python/urgent_replication.py deleted file mode 100755 index 5e9f4ad128..0000000000 --- a/source4/lib/ldb/tests/python/urgent_replication.py +++ /dev/null @@ -1,386 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import optparse -import sys -import os - -sys.path.append("bin/python") -import samba -samba.ensure_external_module("subunit", "subunit/python") -samba.ensure_external_module("testtools", "testtools") - -import samba.getopt as options - -from samba.auth import system_session -from ldb import (SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT, Message, - MessageElement, Dn, FLAG_MOD_REPLACE) -from samba.samdb import SamDB -import samba.tests - -from subunit.run import SubunitTestRunner -import unittest - -parser = optparse.OptionParser("urgent_replication [options] <host>") -sambaopts = options.SambaOptions(parser) -parser.add_option_group(sambaopts) -parser.add_option_group(options.VersionOptions(parser)) -# use command line creds if available -credopts = options.CredentialsOptions(parser) -parser.add_option_group(credopts) -opts, args = parser.parse_args() - -if len(args) < 1: - parser.print_usage() - sys.exit(1) - -host = args[0] - -lp = sambaopts.get_loadparm() -creds = credopts.get_credentials(lp) - -class UrgentReplicationTests(samba.tests.TestCase): - - def delete_force(self, ldb, dn): - try: - ldb.delete(dn) - except LdbError, (num, _): - self.assertEquals(num, ERR_NO_SUCH_OBJECT) - - def find_basedn(self, ldb): - res = ldb.search(base="", expression="", scope=SCOPE_BASE, - attrs=["defaultNamingContext"]) - self.assertEquals(len(res), 1) - return res[0]["defaultNamingContext"][0] - - def setUp(self): - super(UrgentReplicationTests, self).setUp() - self.ldb = ldb - self.base_dn = self.find_basedn(ldb) - - print "baseDN: %s\n" % self.base_dn - - def test_nonurgent_object(self): - """Test if the urgent replication is not activated - when handling a non urgent object""" - self.ldb.add({ - "dn": "cn=nonurgenttest,cn=users," + self.base_dn, - "objectclass":"user", - "samaccountname":"nonurgenttest", - "description":"nonurgenttest description"}); - - # urgent replication should not be enabled when creating - res = self.ldb.load_partition_usn(self.base_dn) - self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); - - # urgent replication should not be enabled when modifying - m = Message() - m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn) - m["description"] = MessageElement("new description", FLAG_MOD_REPLACE, - "description") - ldb.modify(m) - res = self.ldb.load_partition_usn(self.base_dn) - self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); - - # urgent replication should not be enabled when deleting - self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn) - res = self.ldb.load_partition_usn(self.base_dn) - self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); - - - def test_nTDSDSA_object(self): - '''Test if the urgent replication is activated - when handling a nTDSDSA object''' - self.ldb.add({ - "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn, - "objectclass":"server", - "cn":"test server", - "name":"test server", - "systemFlags":"50000000"}); - - self.ldb.add_ldif( - """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """ -objectclass: nTDSDSA -cn: NTDS Settings test -options: 1 -instanceType: 4 -systemFlags: 33554432""", ["relax:0"]); - - # urgent replication should be enabled when creation - res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn) - self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); - - # urgent replication should NOT be enabled when modifying - m = Message() - m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn) - m["options"] = MessageElement("0", FLAG_MOD_REPLACE, - "options") - ldb.modify(m) - res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn) - self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); - - # urgent replication should be enabled when deleting - self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn) - res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn) - self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); - - self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn) - - - def test_crossRef_object(self): - '''Test if the urgent replication is activated - when handling a crossRef object''' - self.ldb.add({ - "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn, - "objectClass": "crossRef", - "cn": "test crossRef", - "dnsRoot": lp.get("realm").lower(), - "instanceType": "4", - "nCName": self.base_dn, - "showInAdvancedViewOnly": "TRUE", - "name": "test crossRef", - "systemFlags": "1"}); - - # urgent replication should be enabled when creating - res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn) - self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); - - # urgent replication should NOT be enabled when modifying - m = Message() - m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn) - m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE, - "systemFlags") - ldb.modify(m) - res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn) - self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); - - - # urgent replication should be enabled when deleting - self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn) - res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn) - self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); - - - - def test_attributeSchema_object(self): - '''Test if the urgent replication is activated - when handling an attributeSchema object''' - - try: - self.ldb.add_ldif( - """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """ -objectClass: attributeSchema -cn: test attributeSchema -instanceType: 4 -isSingleValued: FALSE -showInAdvancedViewOnly: FALSE -attributeID: 0.9.2342.19200300.100.1.1 -attributeSyntax: 2.5.5.12 -adminDisplayName: test attributeSchema -adminDescription: test attributeSchema -oMSyntax: 64 -systemOnly: FALSE -searchFlags: 8 -lDAPDisplayName: test attributeSchema -name: test attributeSchema -systemFlags: 0""", ["relax:0"]); - - # urgent replication should be enabled when creating - res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn) - self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); - - except LdbError: - print "Not testing urgent replication when creating attributeSchema object ...\n" - - # urgent replication should be enabled when modifying - m = Message() - m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn) - m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE, - "lDAPDisplayName") - ldb.modify(m) - res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn) - self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); - - - def test_classSchema_object(self): - '''Test if the urgent replication is activated - when handling a classSchema object''' - try: - self.ldb.add_ldif( - """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """ -objectClass: classSchema -cn: test classSchema -instanceType: 4 -subClassOf: top -governsID: 1.2.840.113556.1.5.999 -rDNAttID: cn -showInAdvancedViewOnly: TRUE -adminDisplayName: test classSchema -adminDescription: test classSchema -objectClassCategory: 1 -lDAPDisplayName: test classSchema -name: test classSchema -systemOnly: FALSE -systemPossSuperiors: dfsConfiguration -systemMustContain: msDFS-SchemaMajorVersion -defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD - CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO) -systemFlags: 16 -defaultHidingValue: TRUE""", ["relax:0"]); - - # urgent replication should be enabled when creating - res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn) - self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); - - except LdbError: - print "Not testing urgent replication when creating classSchema object ...\n" - - # urgent replication should be enabled when modifying - m = Message() - m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn) - m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE, - "lDAPDisplayName") - ldb.modify(m) - res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn) - self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); - - - def test_secret_object(self): - '''Test if the urgent replication is activated - when handling a secret object''' - - self.ldb.add({ - "dn": "cn=test secret,cn=System," + self.base_dn, - "objectClass":"secret", - "cn":"test secret", - "name":"test secret", - "currentValue":"xxxxxxx"}); - - - # urgent replication should be enabled when creating - res = self.ldb.load_partition_usn(self.base_dn) - self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); - - # urgent replication should be enabled when modifying - m = Message() - m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn) - m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE, - "currentValue") - ldb.modify(m) - res = self.ldb.load_partition_usn(self.base_dn) - self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); - - # urgent replication should NOT be enabled when deleting - self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn) - res = self.ldb.load_partition_usn(self.base_dn) - self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); - - - def test_rIDManager_object(self): - '''Test if the urgent replication is activated - when handling a rIDManager object''' - self.ldb.add_ldif( - """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """ -objectClass: rIDManager -cn: RID Manager test -instanceType: 4 -showInAdvancedViewOnly: TRUE -name: RID Manager test -systemFlags: -1946157056 -isCriticalSystemObject: TRUE -rIDAvailablePool: 133001-1073741823""", ["relax:0"]) - - # urgent replication should be enabled when creating - res = self.ldb.load_partition_usn(self.base_dn) - self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); - - # urgent replication should be enabled when modifying - m = Message() - m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn) - m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE, - "systemFlags") - ldb.modify(m) - res = self.ldb.load_partition_usn(self.base_dn) - self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); - - # urgent replication should NOT be enabled when deleting - self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn) - res = self.ldb.load_partition_usn(self.base_dn) - self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); - - - def test_urgent_attributes(self): - '''Test if the urgent replication is activated - when handling urgent attributes of an object''' - - self.ldb.add({ - "dn": "cn=user UrgAttr test,cn=users," + self.base_dn, - "objectclass":"user", - "samaccountname":"user UrgAttr test", - "userAccountControl":"1", - "lockoutTime":"0", - "pwdLastSet":"0", - "description":"urgent attributes test description"}); - - # urgent replication should NOT be enabled when creating - res = self.ldb.load_partition_usn(self.base_dn) - self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); - - # urgent replication should be enabled when modifying userAccountControl - m = Message() - m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn) - m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE, - "userAccountControl") - ldb.modify(m) - res = self.ldb.load_partition_usn(self.base_dn) - self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); - - # urgent replication should be enabled when modifying lockoutTime - m = Message() - m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn) - m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE, - "lockoutTime") - ldb.modify(m) - res = self.ldb.load_partition_usn(self.base_dn) - self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); - - # urgent replication should be enabled when modifying pwdLastSet - m = Message() - m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn) - m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE, - "pwdLastSet") - ldb.modify(m) - res = self.ldb.load_partition_usn(self.base_dn) - self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); - - # urgent replication should NOT be enabled when modifying a not-urgent - # attribute - m = Message() - m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn) - m["description"] = MessageElement("updated urgent attributes test description", - FLAG_MOD_REPLACE, "description") - ldb.modify(m) - res = self.ldb.load_partition_usn(self.base_dn) - self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); - - # urgent replication should NOT be enabled when deleting - self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn) - res = self.ldb.load_partition_usn(self.base_dn) - self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); - - -if not "://" in host: - if os.path.isfile(host): - host = "tdb://%s" % host - else: - host = "ldap://%s" % host - - -ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp, - global_schema=False) - -runner = SubunitTestRunner() -rc = 0 -if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful(): - rc = 1 -sys.exit(rc) |