#!/usr/bin/python # -*- coding: utf-8 -*- # This is a port of the original in testprogs/ejs/ldap.js import getopt import optparse import sys import time import random import base64 import os sys.path.append("bin/python") sys.path.append("../lib/subunit/python") import samba.getopt as options from samba.auth import system_session from ldb import SCOPE_BASE, LdbError from ldb import ERR_NO_SUCH_OBJECT from ldb import Message, MessageElement, Dn from ldb import FLAG_MOD_REPLACE from samba import Ldb from samba import glue from subunit.run import SubunitTestRunner import unittest from samba.ndr import ndr_pack, ndr_unpack from samba.dcerpc import security parser = optparse.OptionParser("urgent_replication [options] ") sambaopts = options.SambaOptions(parser) parser.add_option_group(sambaopts) parser.add_option_group(options.VersionOptions(parser)) # use command line creds if available credopts = options.CredentialsOptions(parser) parser.add_option_group(credopts) opts, args = parser.parse_args() if len(args) < 1: parser.print_usage() sys.exit(1) host = args[0] lp = sambaopts.get_loadparm() creds = credopts.get_credentials(lp) class UrgentReplicationTests(unittest.TestCase): def delete_force(self, ldb, dn): try: ldb.delete(dn) except LdbError, (num, _): self.assertEquals(num, ERR_NO_SUCH_OBJECT) def find_basedn(self, ldb): res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["defaultNamingContext"]) self.assertEquals(len(res), 1) return res[0]["defaultNamingContext"][0] def setUp(self): self.ldb = ldb self.base_dn = self.find_basedn(ldb) print "baseDN: %s\n" % self.base_dn def test_nonurgent_object(self): '''Test if the urgent replication is not activated when handling a non urgent object''' self.ldb.add({ "dn": "cn=nonurgenttest,cn=users," + self.base_dn, "objectclass":"user", "samaccountname":"nonurgenttest", "description":"nonurgenttest description"}); ''' urgent replication should not be enabled when creating ''' res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn) self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); ''' urgent replication should not be enabled when modifying ''' m = Message() m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn) m["description"] = MessageElement("new description", FLAG_MOD_REPLACE, "description") ldb.modify(m) res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn) self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); ''' urgent replication should not be enabled when deleting ''' self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn) res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn) self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); def test_nTDSDSA_object(self): '''Test if the urgent replication is activated when handling a nTDSDSA object''' self.ldb.add({ "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn, "objectclass":"server", "cn":"test server", "name":"test server", "systemFlags":"50000000"}); self.ldb.add_ldif( """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """ objectclass: nTDSDSA cn: NTDS Settings test options: 1 instanceType: 4 systemFlags: 33554432""", ["relax:0"]); ''' urgent replication should be enabled when creation ''' res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn) self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); ''' urgent replication should NOT be enabled when modifying ''' m = Message() m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn) m["options"] = MessageElement("0", FLAG_MOD_REPLACE, "options") ldb.modify(m) res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn) self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); ''' urgent replication should be enabled when deleting ''' self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn) res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn) self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn) def test_crossRef_object(self): '''Test if the urgent replication is activated when handling a crossRef object''' self.ldb.add({ "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn, "objectClass": "crossRef", "cn": "test crossRef", "instanceType": "4", "nCName": self.base_dn, "showInAdvancedViewOnly": "TRUE", "name": "test crossRef", "systemFlags": "1"}); ''' urgent replication should be enabled when creating ''' res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn) self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); ''' urgent replication should NOT be enabled when modifying ''' m = Message() m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn) m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE, "systemFlags") ldb.modify(m) res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn) self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); ''' urgent replication should be enabled when deleting ''' self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn) res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn) self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); def test_attributeSchema_object(self): '''Test if the urgent replication is activated when handling an attributeSchema object''' try: self.ldb.add_ldif( """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """ objectClass: attributeSchema cn: test attributeSchema instanceType: 4 isSingleValued: FALSE showInAdvancedViewOnly: FALSE attributeID: 0.9.2342.19200300.100.1.1 attributeSyntax: 2.5.5.12 adminDisplayName: test attributeSchema adminDescription: test attributeSchema oMSyntax: 64 systemOnly: FALSE searchFlags: 8 lDAPDisplayName: test attributeSchema name: test attributeSchema systemFlags: 0""", ["relax:0"]); ''' urgent replication should be enabled when creating ''' res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn) self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); except LdbError: print "Not testing urgent replication when creating attributeSchema object ...\n" ''' urgent replication should be enabled when modifying ''' m = Message() m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn) m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE, "lDAPDisplayName") ldb.modify(m) res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn) self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); def test_classSchema_object(self): '''Test if the urgent replication is activated when handling a classSchema object''' try: self.ldb.add_ldif( """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """ objectClass: classSchema cn: test classSchema instanceType: 4 subClassOf: top governsID: 1.2.840.113556.1.5.999 rDNAttID: cn showInAdvancedViewOnly: TRUE adminDisplayName: test classSchema adminDescription: test classSchema objectClassCategory: 1 lDAPDisplayName: test classSchema name: test classSchema systemOnly: FALSE systemPossSuperiors: dfsConfiguration systemMustContain: msDFS-SchemaMajorVersion defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO) systemFlags: 16 defaultHidingValue: TRUE""", ["relax:0"]); ''' urgent replication should be enabled when creating ''' res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn) self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); except LdbError: print "Not testing urgent replication when creating classSchema object ...\n" ''' urgent replication should be enabled when modifying ''' m = Message() m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn) m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE, "lDAPDisplayName") ldb.modify(m) res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn) self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); def test_secret_object(self): '''Test if the urgent replication is activated when handling a secret object''' self.ldb.add({ "dn": "cn=test secret,cn=System," + self.base_dn, "objectClass":"secret", "cn":"test secret", "name":"test secret", "currentValue":"xxxxxxx"}); ''' urgent replication should be enabled when creationg ''' res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn) self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); ''' urgent replication should be enabled when modifying ''' m = Message() m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn) m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE, "currentValue") ldb.modify(m) res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn) self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); ''' urgent replication should NOT be enabled when deleting ''' self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn) res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn) self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); def test_rIDManager_object(self): '''Test if the urgent replication is activated when handling a rIDManager object''' self.ldb.add_ldif( """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """ objectClass: rIDManager cn: RID Manager test instanceType: 4 showInAdvancedViewOnly: TRUE name: RID Manager test systemFlags: -1946157056 isCriticalSystemObject: TRUE rIDAvailablePool: 133001-1073741823""", ["relax:0"]) ''' urgent replication should be enabled when creating ''' res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn) self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); ''' urgent replication should be enabled when modifying ''' m = Message() m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn) m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE, "systemFlags") ldb.modify(m) res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn) self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); ''' urgent replication should NOT be enabled when deleting ''' self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn) res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn) self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); def test_urgent_attributes(self): '''Test if the urgent replication is activated when handling urgent attributes of an object''' self.ldb.add({ "dn": "cn=user UrgAttr test,cn=users," + self.base_dn, "objectclass":"user", "samaccountname":"user UrgAttr test", "userAccountControl":"1", "lockoutTime":"0", "pwdLastSet":"0", "description":"urgent attributes test description"}); ''' urgent replication should NOT be enabled when creating ''' res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn) self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); ''' urgent replication should be enabled when modifying userAccountControl ''' m = Message() m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn) self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); ''' urgent replication should be enabled when modifying lockoutTime ''' m = Message() m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn) m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE, "lockoutTime") ldb.modify(m) res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn) self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); ''' urgent replication should be enabled when modifying pwdLastSet ''' m = Message() m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn) m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE, "pwdLastSet") ldb.modify(m) res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn) self.assertEquals(res["uSNHighest"], res["uSNUrgent"]); ''' urgent replication should NOT be enabled when modifying a not-urgent attribute ''' m = Message() m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn) m["description"] = MessageElement("updated urgent attributes test description", FLAG_MOD_REPLACE, "description") ldb.modify(m) res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn) self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); ''' urgent replication should NOT be enabled when deleting ''' self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn) res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn) self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]); if not "://" in host: if os.path.isfile(host): host = "tdb://%s" % host else: host = "ldap://%s" % host ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp) runner = SubunitTestRunner() rc = 0 if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful(): rc = 1 sys.exit(rc)