#!/usr/bin/env python # -*- coding: utf-8 -*- import optparse import sys import os sys.path.insert(0, "bin/python") import samba samba.ensure_external_module("testtools", "testtools") samba.ensure_external_module("subunit", "subunit/python") import samba.getopt as options from samba.auth import system_session from ldb import SCOPE_BASE, LdbError from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF from ldb import ERR_UNWILLING_TO_PERFORM from samba.samdb import SamDB from samba.tests import delete_force from subunit.run import SubunitTestRunner import unittest parser = optparse.OptionParser("deletetest.py [options] <host|file>") sambaopts = options.SambaOptions(parser) parser.add_option_group(sambaopts) parser.add_option_group(options.VersionOptions(parser)) # use command line creds if available credopts = options.CredentialsOptions(parser) parser.add_option_group(credopts) opts, args = parser.parse_args() if len(args) < 1: parser.print_usage() sys.exit(1) host = args[0] lp = sambaopts.get_loadparm() creds = credopts.get_credentials(lp) class BasicDeleteTests(unittest.TestCase): def GUID_string(self, guid): return self.ldb.schema_format_value("objectGUID", guid) def setUp(self): self.ldb = ldb self.base_dn = ldb.domain_dn() self.configuration_dn = ldb.get_config_basedn().get_linearized() def search_guid(self, guid): print "SEARCH by GUID %s" % self.GUID_string(guid) res = ldb.search(base="<GUID=%s>" % self.GUID_string(guid), scope=SCOPE_BASE, controls=["show_deleted:1"]) self.assertEquals(len(res), 1) return res[0] def search_dn(self,dn): print "SEARCH by DN %s" % dn res = ldb.search(expression="(objectClass=*)", base=dn, scope=SCOPE_BASE, controls=["show_deleted:1"]) self.assertEquals(len(res), 1) return res[0] def del_attr_values(self, delObj): print "Checking attributes for %s" % delObj["dn"] self.assertEquals(delObj["isDeleted"][0],"TRUE") self.assertTrue(not("objectCategory" in delObj)) self.assertTrue(not("sAMAccountType" in delObj)) def preserved_attributes_list(self, liveObj, delObj): print "Checking for preserved attributes list" preserved_list = ["nTSecurityDescriptor", "attributeID", "attributeSyntax", "dNReferenceUpdate", "dNSHostName", "flatName", "governsID", "groupType", "instanceType", "lDAPDisplayName", "legacyExchangeDN", "isDeleted", "isRecycled", "lastKnownParent", "msDS-LastKnownRDN", "mS-DS-CreatorSID", "mSMQOwnerID", "nCName", "objectClass", "distinguishedName", "objectGUID", "objectSid", "oMSyntax", "proxiedObjectName", "name", "replPropertyMetaData", "sAMAccountName", "securityIdentifier", "sIDHistory", "subClassOf", "systemFlags", "trustPartner", "trustDirection", "trustType", "trustAttributes", "userAccountControl", "uSNChanged", "uSNCreated", "whenCreated"] for a in liveObj: if a in preserved_list: self.assertTrue(a in delObj) def check_rdn(self, liveObj, delObj, rdnName): print "Checking for correct rDN" rdn=liveObj[rdnName][0] rdn2=delObj[rdnName][0] name2=delObj[rdnName][0] guid=liveObj["objectGUID"][0] self.assertEquals(rdn2, rdn + "\nDEL:" + self.GUID_string(guid)) self.assertEquals(name2, rdn + "\nDEL:" + self.GUID_string(guid)) def delete_deleted(self, ldb, dn): print "Testing the deletion of the already deleted dn %s" % dn try: ldb.delete(dn) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_NO_SUCH_OBJECT) def test_delete_protection(self): """Delete protection tests""" print self.base_dn delete_force(self.ldb, "cn=entry1,cn=ldaptestcontainer," + self.base_dn) delete_force(self.ldb, "cn=entry2,cn=ldaptestcontainer," + self.base_dn) delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn) ldb.add({ "dn": "cn=ldaptestcontainer," + self.base_dn, "objectclass": "container"}) ldb.add({ "dn": "cn=entry1,cn=ldaptestcontainer," + self.base_dn, "objectclass": "container"}) ldb.add({ "dn": "cn=entry2,cn=ldaptestcontainer," + self.base_dn, "objectclass": "container"}) try: ldb.delete("cn=ldaptestcontainer," + self.base_dn) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF) ldb.delete("cn=ldaptestcontainer," + self.base_dn, ["tree_delete:1"]) try: res = ldb.search("cn=ldaptestcontainer," + self.base_dn, scope=SCOPE_BASE, attrs=[]) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_NO_SUCH_OBJECT) try: res = ldb.search("cn=entry1,cn=ldaptestcontainer," + self.base_dn, scope=SCOPE_BASE, attrs=[]) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_NO_SUCH_OBJECT) try: res = ldb.search("cn=entry2,cn=ldaptestcontainer," + self.base_dn, scope=SCOPE_BASE, attrs=[]) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_NO_SUCH_OBJECT) delete_force(self.ldb, "cn=entry1,cn=ldaptestcontainer," + self.base_dn) delete_force(self.ldb, "cn=entry2,cn=ldaptestcontainer," + self.base_dn) delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn) # Performs some protected object delete testing res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["dsServiceName", "dNSHostName"]) self.assertEquals(len(res), 1) # Delete failing since DC's nTDSDSA object is protected try: ldb.delete(res[0]["dsServiceName"][0]) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) res = ldb.search(self.base_dn, attrs=["rIDSetReferences"], expression="(&(objectClass=computer)(dNSHostName=" + res[0]["dNSHostName"][0] + "))") self.assertEquals(len(res), 1) # Deletes failing since DC's rIDSet object is protected try: ldb.delete(res[0]["rIDSetReferences"][0]) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) try: ldb.delete(res[0]["rIDSetReferences"][0], ["tree_delete:1"]) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) # Deletes failing since three main crossRef objects are protected try: ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) try: ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn, ["tree_delete:1"]) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) try: ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF) try: ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn, ["tree_delete:1"]) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF) res = ldb.search("cn=Partitions," + self.configuration_dn, attrs=[], expression="(nCName=%s)" % self.base_dn) self.assertEquals(len(res), 1) try: ldb.delete(res[0].dn) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF) try: ldb.delete(res[0].dn, ["tree_delete:1"]) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF) # Delete failing since "SYSTEM_FLAG_DISALLOW_DELETE" try: ldb.delete("CN=Users," + self.base_dn) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) # Tree-delete failing since "isCriticalSystemObject" try: ldb.delete("CN=Computers," + self.base_dn, ["tree_delete:1"]) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) def test_all(self): """Basic delete tests""" print self.base_dn usr1="cn=testuser,cn=users," + self.base_dn usr2="cn=testuser2,cn=users," + self.base_dn grp1="cn=testdelgroup1,cn=users," + self.base_dn sit1="cn=testsite1,cn=sites," + self.configuration_dn ss1="cn=NTDS Site Settings,cn=testsite1,cn=sites," + self.configuration_dn srv1="cn=Servers,cn=testsite1,cn=sites," + self.configuration_dn srv2="cn=TESTSRV,cn=Servers,cn=testsite1,cn=sites," + self.configuration_dn delete_force(self.ldb, usr1) delete_force(self.ldb, usr2) delete_force(self.ldb, grp1) delete_force(self.ldb, ss1) delete_force(self.ldb, srv2) delete_force(self.ldb, srv1) delete_force(self.ldb, sit1) ldb.add({ "dn": usr1, "objectclass": "user", "description": "test user description", "samaccountname": "testuser"}) ldb.add({ "dn": usr2, "objectclass": "user", "description": "test user 2 description", "samaccountname": "testuser2"}) ldb.add({ "dn": grp1, "objectclass": "group", "description": "test group", "samaccountname": "testdelgroup1", "member": [ usr1, usr2 ], "isDeleted": "FALSE" }) ldb.add({ "dn": sit1, "objectclass": "site" }) ldb.add({ "dn": ss1, "objectclass": ["applicationSiteSettings", "nTDSSiteSettings"] }) ldb.add({ "dn": srv1, "objectclass": "serversContainer" }) ldb.add({ "dn": srv2, "objectClass": "server" }) objLive1 = self.search_dn(usr1) guid1=objLive1["objectGUID"][0] objLive2 = self.search_dn(usr2) guid2=objLive2["objectGUID"][0] objLive3 = self.search_dn(grp1) guid3=objLive3["objectGUID"][0] objLive4 = self.search_dn(sit1) guid4=objLive4["objectGUID"][0] objLive5 = self.search_dn(ss1) guid5=objLive5["objectGUID"][0] objLive6 = self.search_dn(srv1) guid6=objLive6["objectGUID"][0] objLive7 = self.search_dn(srv2) guid7=objLive7["objectGUID"][0] ldb.delete(usr1) ldb.delete(usr2) ldb.delete(grp1) ldb.delete(srv1, ["tree_delete:1"]) ldb.delete(sit1, ["tree_delete:1"]) objDeleted1 = self.search_guid(guid1) objDeleted2 = self.search_guid(guid2) objDeleted3 = self.search_guid(guid3) objDeleted4 = self.search_guid(guid4) objDeleted5 = self.search_guid(guid5) objDeleted6 = self.search_guid(guid6) objDeleted7 = self.search_guid(guid7) self.del_attr_values(objDeleted1) self.del_attr_values(objDeleted2) self.del_attr_values(objDeleted3) self.del_attr_values(objDeleted4) self.del_attr_values(objDeleted5) self.del_attr_values(objDeleted6) self.del_attr_values(objDeleted7) self.preserved_attributes_list(objLive1, objDeleted1) self.preserved_attributes_list(objLive2, objDeleted2) self.preserved_attributes_list(objLive3, objDeleted3) self.preserved_attributes_list(objLive4, objDeleted4) self.preserved_attributes_list(objLive5, objDeleted5) self.preserved_attributes_list(objLive6, objDeleted6) self.preserved_attributes_list(objLive7, objDeleted7) self.check_rdn(objLive1, objDeleted1, "cn") self.check_rdn(objLive2, objDeleted2, "cn") self.check_rdn(objLive3, objDeleted3, "cn") self.check_rdn(objLive4, objDeleted4, "cn") self.check_rdn(objLive5, objDeleted5, "cn") self.check_rdn(objLive6, objDeleted6, "cn") self.check_rdn(objLive7, objDeleted7, "cn") self.delete_deleted(ldb, usr1) self.delete_deleted(ldb, usr2) self.delete_deleted(ldb, grp1) self.delete_deleted(ldb, sit1) self.delete_deleted(ldb, ss1) self.delete_deleted(ldb, srv1) self.delete_deleted(ldb, srv2) self.assertTrue("CN=Deleted Objects" in str(objDeleted1.dn)) self.assertTrue("CN=Deleted Objects" in str(objDeleted2.dn)) self.assertTrue("CN=Deleted Objects" in str(objDeleted3.dn)) self.assertFalse("CN=Deleted Objects" in str(objDeleted4.dn)) self.assertTrue("CN=Deleted Objects" in str(objDeleted5.dn)) self.assertFalse("CN=Deleted Objects" in str(objDeleted6.dn)) self.assertFalse("CN=Deleted Objects" in str(objDeleted7.dn)) if not "://" in host: if os.path.isfile(host): host = "tdb://%s" % host else: host = "ldap://%s" % host ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp) runner = SubunitTestRunner() rc = 0 if not runner.run(unittest.makeSuite(BasicDeleteTests)).wasSuccessful(): rc = 1 sys.exit(rc)