summaryrefslogtreecommitdiff
path: root/source4/dsdb/tests
diff options
context:
space:
mode:
Diffstat (limited to 'source4/dsdb/tests')
-rwxr-xr-xsource4/dsdb/tests/python/dirsync.py713
-rwxr-xr-xsource4/dsdb/tests/python/sam.py290
-rwxr-xr-xsource4/dsdb/tests/python/token_group.py4
3 files changed, 973 insertions, 34 deletions
diff --git a/source4/dsdb/tests/python/dirsync.py b/source4/dsdb/tests/python/dirsync.py
new file mode 100755
index 0000000000..96c6950841
--- /dev/null
+++ b/source4/dsdb/tests/python/dirsync.py
@@ -0,0 +1,713 @@
+#!/usr/bin/env python
+#
+# Unit tests for dirsync control
+# Copyright (C) Matthieu Patou <mat@matws.net> 2011
+#
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import optparse
+import sys
+sys.path.insert(0, "bin/python")
+import samba
+samba.ensure_external_module("testtools", "testtools")
+samba.ensure_external_module("subunit", "subunit/python")
+
+import samba.getopt as options
+import base64
+
+from ldb import LdbError, SCOPE_BASE
+from ldb import Message, MessageElement, Dn
+from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE
+from samba.dcerpc import security, misc, drsblobs
+from samba.ndr import ndr_unpack, ndr_pack
+
+from samba.auth import system_session
+from samba import gensec, sd_utils
+from samba.samdb import SamDB
+from samba.credentials import Credentials
+import samba.tests
+from samba.tests import delete_force
+from subunit.run import SubunitTestRunner
+import unittest
+
+parser = optparse.OptionParser("dirsync.py [options] <host>")
+sambaopts = options.SambaOptions(parser)
+parser.add_option_group(sambaopts)
+parser.add_option_group(options.VersionOptions(parser))
+
+# use command line creds if available
+credopts = options.CredentialsOptions(parser)
+parser.add_option_group(credopts)
+opts, args = parser.parse_args()
+
+if len(args) < 1:
+ parser.print_usage()
+ sys.exit(1)
+
+host = args[0]
+if not "://" in host:
+ ldaphost = "ldap://%s" % host
+ ldapshost = "ldaps://%s" % host
+else:
+ ldaphost = host
+ start = host.rindex("://")
+ host = host.lstrip(start+3)
+
+lp = sambaopts.get_loadparm()
+creds = credopts.get_credentials(lp)
+
+#
+# Tests start here
+#
+
+class DirsyncBaseTests(samba.tests.TestCase):
+
+ def setUp(self):
+ super(DirsyncBaseTests, self).setUp()
+ self.ldb_admin = ldb
+ self.base_dn = ldb.domain_dn()
+ self.domain_sid = security.dom_sid(ldb.get_domain_sid())
+ self.user_pass = "samba123@AAA"
+ self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
+ self.sd_utils = sd_utils.SDUtils(ldb)
+ #used for anonymous login
+ print "baseDN: %s" % self.base_dn
+
+ def get_user_dn(self, name):
+ return "CN=%s,CN=Users,%s" % (name, self.base_dn)
+
+ def get_ldb_connection(self, target_username, target_password):
+ creds_tmp = Credentials()
+ creds_tmp.set_username(target_username)
+ creds_tmp.set_password(target_password)
+ creds_tmp.set_domain(creds.get_domain())
+ creds_tmp.set_realm(creds.get_realm())
+ creds_tmp.set_workstation(creds.get_workstation())
+ creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
+ | gensec.FEATURE_SEAL)
+ ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
+ return ldb_target
+
+
+#tests on ldap add operations
+class SimpleDirsyncTests(DirsyncBaseTests):
+
+ def setUp(self):
+ super(SimpleDirsyncTests, self).setUp()
+ # Regular user
+ self.dirsync_user = "test_dirsync_user"
+ self.simple_user = "test_simple_user"
+ self.admin_user = "test_admin_user"
+ self.ouname = None
+
+ self.ldb_admin.newuser(self.dirsync_user, self.user_pass)
+ self.ldb_admin.newuser(self.simple_user, self.user_pass)
+ self.ldb_admin.newuser(self.admin_user, self.user_pass)
+ self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
+
+ user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
+ mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid)
+ self.sd_utils.dacl_add_ace(self.base_dn, mod)
+
+ # add admins to the Domain Admins group
+ self.ldb_admin.add_remove_group_members("Domain Admins", self.admin_user,
+ add_members_operation=True)
+
+ def tearDown(self):
+ super(SimpleDirsyncTests, self).tearDown()
+ delete_force(self.ldb_admin, self.get_user_dn(self.dirsync_user))
+ delete_force(self.ldb_admin, self.get_user_dn(self.simple_user))
+ delete_force(self.ldb_admin, self.get_user_dn(self.admin_user))
+ if self.ouname:
+ delete_force(self.ldb_admin, self.ouname)
+ self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
+ try:
+ self.ldb_admin.deletegroup("testgroup")
+ except:
+ pass
+
+ #def test_dirsync_errors(self):
+
+
+ def test_dirsync_supported(self):
+ """Test the basic of the dirsync is supported"""
+ self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
+ self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
+ res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
+ res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
+ try:
+ self.ldb_simple.search(self.base_dn,
+ expression="samaccountname=*",
+ controls=["dirsync:1:0:1"])
+ except LdbError,l:
+ self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
+
+ def test_parentGUID_referrals(self):
+ res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"])
+
+ res = self.ldb_admin.search(self.base_dn,
+ expression="name=Configuration",
+ controls=["dirsync:1:0:1"])
+ self.assertEqual(res2[0].get("objectGUID"), res[0].get("parentGUID"))
+
+ def test_ok_not_rootdc(self):
+ """Test if it's ok to do dirsync on another NC that is not the root DC"""
+ try:
+ res = self.ldb_admin.search("CN=Configuration, %s" % self.base_dn,
+ expression="samaccountname=*",
+ controls=["dirsync:1:0:1"])
+ except:
+ self.assertTrue(False)
+
+ def test_dirsync_errors(self):
+ """Test if dirsync returns the correct LDAP errors in case of pb"""
+ self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
+ self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
+ try:
+ self.ldb_simple.search(self.base_dn,
+ expression="samaccountname=*",
+ controls=["dirsync:1:0:1"])
+ except LdbError,l:
+ print l
+ self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
+
+ try:
+ self.ldb_simple.search("CN=Users,%s" % self.base_dn,
+ expression="samaccountname=*",
+ controls=["dirsync:1:0:1"])
+ except LdbError,l:
+ print l
+ self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
+
+ try:
+ self.ldb_simple.search("CN=Users,%s" % self.base_dn,
+ expression="samaccountname=*",
+ controls=["dirsync:1:1:1"])
+ except LdbError,l:
+ print l
+ self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
+
+ try:
+ self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
+ expression="samaccountname=*",
+ controls=["dirsync:1:0:1"])
+ except LdbError,l:
+ print l
+ self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
+
+ try:
+ self.ldb_admin.search("CN=Users,%s" % self.base_dn,
+ expression="samaccountname=*",
+ controls=["dirsync:1:0:1"])
+ except LdbError,l:
+ print l
+ self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
+
+ try:
+ self.ldb_admin.search("CN=Users,%s" % self.base_dn,
+ expression="samaccountname=*",
+ controls=["dirsync:1:1:1"])
+ except LdbError,l:
+ print l
+ self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
+
+
+
+
+ def test_dirsync_attributes(self):
+ """Check behavior with some attributes """
+ res = self.ldb_admin.search(self.base_dn,
+ expression="samaccountname=*",
+ controls=["dirsync:1:0:1"])
+ # Check that nTSecurityDescriptor is returned as it's the case when doing dirsync
+ self.assertTrue(res.msgs[0].get("ntsecuritydescriptor") != None)
+ # Check that non replicated attributes are not returned
+ self.assertTrue(res.msgs[0].get("badPwdCount") == None)
+ # Check that non forward link are not returned
+ self.assertTrue(res.msgs[0].get("memberof") == None)
+
+ # Asking for instanceType will return also objectGUID
+ res = self.ldb_admin.search(self.base_dn,
+ expression="samaccountname=Administrator",
+ attrs=["instanceType"],
+ controls=["dirsync:1:0:1"])
+ self.assertTrue(res.msgs[0].get("objectGUID") != None)
+ self.assertTrue(res.msgs[0].get("instanceType") != None)
+
+ # We don't return an entry if asked for objectGUID
+ res = self.ldb_admin.search(self.base_dn,
+ expression="dn=%s" % self.base_dn,
+ attrs=["objectGUID"],
+ controls=["dirsync:1:0:1"])
+ self.assertEquals(len(res.msgs), 0)
+
+ # a request on the root of a NC didn't return parentGUID
+ res = self.ldb_admin.search(self.base_dn,
+ expression="dn=%s" % self.base_dn,
+ attrs=["name"],
+ controls=["dirsync:1:0:1"])
+ self.assertTrue(res.msgs[0].get("objectGUID") != None)
+ self.assertTrue(res.msgs[0].get("name") != None)
+ self.assertTrue(res.msgs[0].get("parentGUID") == None)
+ self.assertTrue(res.msgs[0].get("instanceType") != None)
+
+ # Asking for name will return also objectGUID and parentGUID
+ # and instanceType and of course name
+ res = self.ldb_admin.search(self.base_dn,
+ expression="samaccountname=Administrator",
+ attrs=["name"],
+ controls=["dirsync:1:0:1"])
+ self.assertTrue(res.msgs[0].get("objectGUID") != None)
+ self.assertTrue(res.msgs[0].get("name") != None)
+ self.assertTrue(res.msgs[0].get("parentGUID") != None)
+ self.assertTrue(res.msgs[0].get("instanceType") != None)
+
+ # Asking for dn will not return not only DN but more like if attrs=*
+ # parentGUID should be returned
+ res = self.ldb_admin.search(self.base_dn,
+ expression="samaccountname=Administrator",
+ attrs=["dn"],
+ controls=["dirsync:1:0:1"])
+ count = len(res.msgs[0])
+ res2 = self.ldb_admin.search(self.base_dn,
+ expression="samaccountname=Administrator",
+ controls=["dirsync:1:0:1"])
+ count2 = len(res2.msgs[0])
+ self.assertEqual(count, count2)
+
+ # Asking for cn will return nothing on objects that have CN as RDN
+ res = self.ldb_admin.search(self.base_dn,
+ expression="samaccountname=Administrator",
+ attrs=["cn"],
+ controls=["dirsync:1:0:1"])
+ self.assertEqual(len(res.msgs), 0)
+ # Asking for parentGUID will return nothing too
+ res = self.ldb_admin.search(self.base_dn,
+ expression="samaccountname=Administrator",
+ attrs=["parentGUID"],
+ controls=["dirsync:1:0:1"])
+ self.assertEqual(len(res.msgs), 0)
+ ouname="OU=testou,%s" % self.base_dn
+ self.ouname = ouname
+ self.ldb_admin.create_ou(ouname)
+ delta = Message()
+ delta.dn = Dn(self.ldb_admin, str(ouname))
+ delta["cn"] = MessageElement("test ou",
+ FLAG_MOD_ADD,
+ "cn" )
+ self.ldb_admin.modify(delta)
+ res = self.ldb_admin.search(self.base_dn,
+ expression="name=testou",
+ attrs=["cn"],
+ controls=["dirsync:1:0:1"])
+
+ self.assertEqual(len(res.msgs), 1)
+ self.assertEqual(len(res.msgs[0]), 3)
+ delete_force(self.ldb_admin, ouname)
+
+ def test_dirsync_with_controls(self):
+ """Check that dirsync return correct informations when dealing with the NC"""
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(dn=%s)" % str(self.base_dn),
+ attrs=["name"],
+ controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"])
+
+ def test_dirsync_basenc(self):
+ """Check that dirsync return correct informations when dealing with the NC"""
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(dn=%s)" % str(self.base_dn),
+ attrs=["name"],
+ controls=["dirsync:1:0:10000"])
+ self.assertEqual(len(res.msgs), 1)
+ self.assertEqual(len(res.msgs[0]), 3)
+
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(dn=%s)" % str(self.base_dn),
+ attrs=["ntSecurityDescriptor"],
+ controls=["dirsync:1:0:10000"])
+ self.assertEqual(len(res.msgs), 1)
+ self.assertEqual(len(res.msgs[0]), 3)
+
+ def test_dirsync_othernc(self):
+ """Check that dirsync return information for entries that are normaly referrals (ie. other NCs)"""
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(objectclass=configuration)",
+ attrs=["name"],
+ controls=["dirsync:1:0:10000"])
+ self.assertEqual(len(res.msgs), 1)
+ self.assertEqual(len(res.msgs[0]), 4)
+
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(objectclass=configuration)",
+ attrs=["ntSecurityDescriptor"],
+ controls=["dirsync:1:0:10000"])
+ self.assertEqual(len(res.msgs), 1)
+ self.assertEqual(len(res.msgs[0]), 3)
+
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(objectclass=domaindns)",
+ attrs=["ntSecurityDescriptor"],
+ controls=["dirsync:1:0:10000"])
+ nb = len(res.msgs)
+
+ # only sub nc returns a result when asked for objectGUID
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(objectclass=domaindns)",
+ attrs=["objectGUID"],
+ controls=["dirsync:1:0:0"])
+ self.assertEqual(len(res.msgs), nb - 1)
+ if nb > 1:
+ self.assertTrue(res.msgs[0].get("objectGUID") != None)
+ else:
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(objectclass=configuration)",
+ attrs=["objectGUID"],
+ controls=["dirsync:1:0:0"])
+
+
+ def test_dirsync_send_delta(self):
+ """Check that dirsync return correct delta when sending the last cookie"""
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(&(samaccountname=test*)(!(isDeleted=*)))",
+ controls=["dirsync:1:0:10000"])
+ ctl = str(res.controls[0]).split(":")
+ ctl[1] = "1"
+ ctl[2] = "0"
+ ctl[3] = "10000"
+ control = str(":".join(ctl))
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(&(samaccountname=test*)(!(isDeleted=*)))",
+ controls=[control])
+ self.assertEqual(len(res), 0)
+
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
+ controls=["dirsync:1:0:100000"])
+
+ ctl = str(res.controls[0]).split(":")
+ ctl[1] = "1"
+ ctl[2] = "0"
+ ctl[3] = "10000"
+ control2 = str(":".join(ctl))
+
+ # Let's create an OU
+ ouname="OU=testou2,%s" % self.base_dn
+ self.ouname = ouname
+ self.ldb_admin.create_ou(ouname)
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
+ controls=[control2])
+ self.assertEqual(len(res), 1)
+ ctl = str(res.controls[0]).split(":")
+ ctl[1] = "1"
+ ctl[2] = "0"
+ ctl[3] = "10000"
+ control3 = str(":".join(ctl))
+
+ delta = Message()
+ delta.dn = Dn(self.ldb_admin, str(ouname))
+
+ delta["cn"] = MessageElement("test ou",
+ FLAG_MOD_ADD,
+ "cn" )
+ self.ldb_admin.modify(delta)
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
+ controls=[control3])
+
+ self.assertEqual(len(res.msgs), 1)
+ # 3 attributes: instanceType, cn and objectGUID
+ self.assertEqual(len(res.msgs[0]), 3)
+
+ delta = Message()
+ delta.dn = Dn(self.ldb_admin, str(ouname))
+ delta["cn"] = MessageElement([],
+ FLAG_MOD_DELETE,
+ "cn" )
+ self.ldb_admin.modify(delta)
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
+ controls=[control3])
+
+ self.assertEqual(len(res.msgs), 1)
+ # So we won't have much attribute returned but instanceType and GUID
+ # are.
+ # 3 attributes: instanceType and objectGUID and cn but empty
+ self.assertEqual(len(res.msgs[0]), 3)
+ ouname = "OU=newouname,%s" % self.base_dn
+ self.ldb_admin.rename(str(res[0].dn), str(Dn(self.ldb_admin, ouname)))
+ self.ouname = ouname
+ ctl = str(res.controls[0]).split(":")
+ ctl[1] = "1"
+ ctl[2] = "0"
+ ctl[3] = "10000"
+ control4 = str(":".join(ctl))
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
+ controls=[control3])
+
+ self.assertTrue(res[0].get("parentGUID") != None)
+ self.assertTrue(res[0].get("name") != None)
+ delete_force(self.ldb_admin, ouname)
+
+ def test_dirsync_linkedattributes(self):
+ """Check that dirsync returnd deleted objects too"""
+ # Let's search for members
+ self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
+ res = self.ldb_simple.search(self.base_dn,
+ expression="(name=Administrators)",
+ controls=["dirsync:1:1:1"])
+
+ self.assertTrue(len(res[0].get("member")) > 0)
+ size = len(res[0].get("member"))
+
+ ctl = str(res.controls[0]).split(":")
+ ctl[1] = "1"
+ ctl[2] = "1"
+ ctl[3] = "10000"
+ control1 = str(":".join(ctl))
+ self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
+ add_members_operation=True)
+
+ res = self.ldb_simple.search(self.base_dn,
+ expression="(name=Administrators)",
+ controls=[control1])
+
+ self.assertEqual(len(res[0].get("member")), size + 1)
+ ctl = str(res.controls[0]).split(":")
+ ctl[1] = "1"
+ ctl[2] = "1"
+ ctl[3] = "10000"
+ control1 = str(":".join(ctl))
+
+ # remove the user from the group
+ self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
+ add_members_operation=False)
+
+ res = self.ldb_simple.search(self.base_dn,
+ expression="(name=Administrators)",
+ controls=[control1])
+
+ self.assertEqual(len(res[0].get("member")), size )
+
+ self.ldb_admin.newgroup("testgroup")
+ self.ldb_admin.add_remove_group_members("testgroup", self.simple_user,
+ add_members_operation=True)
+
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(name=testgroup)",
+ controls=["dirsync:1:0:1"])
+
+ self.assertEqual(len(res[0].get("member")), 1)
+ self.assertTrue(res[0].get("member") != "" )
+
+ ctl = str(res.controls[0]).split(":")
+ ctl[1] = "1"
+ ctl[2] = "0"
+ ctl[3] = "1"
+ control1 = str(":".join(ctl))
+
+ # Check that reasking the same question but with an updated cookie
+ # didn't return any results.
+ print control1
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(name=testgroup)",
+ controls=[control1])
+ self.assertEqual(len(res), 0)
+
+ ctl = str(res.controls[0]).split(":")
+ ctl[1] = "1"
+ ctl[2] = "1"
+ ctl[3] = "10000"
+ control1 = str(":".join(ctl))
+
+ self.ldb_admin.add_remove_group_members("testgroup", self.simple_user,
+ add_members_operation=False)
+
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(name=testgroup)",
+ attrs=["member"],
+ controls=[control1])
+
+ self.ldb_admin.deletegroup("testgroup")
+ self.assertEqual(len(res[0].get("member")), 0)
+
+
+
+ def test_dirsync_deleted_items(self):
+ """Check that dirsync returnd deleted objects too"""
+ # Let's create an OU
+ ouname="OU=testou3,%s" % self.base_dn
+ self.ouname = ouname
+ self.ldb_admin.create_ou(ouname)
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
+ controls=["dirsync:1:0:1"])
+ guid = None
+ for e in res:
+ if str(e["name"]) == "testou3":
+ guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
+
+ ctl = str(res.controls[0]).split(":")
+ ctl[1] = "1"
+ ctl[2] = "0"
+ ctl[3] = "10000"
+ control1 = str(":".join(ctl))
+
+ # So now delete the object and check that
+ # we can see the object but deleted when admin
+ delete_force(self.ldb_admin, ouname)
+
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(objectClass=organizationalUnit)",
+ controls=[control1])
+ self.assertEqual(len(res), 1)
+ guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
+ self.assertEqual(guid2, guid)
+ self.assertTrue(res[0].get("isDeleted"))
+ self.assertTrue(res[0].get("name") != None)
+
+ def test_cookie_from_others(self):
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
+ controls=["dirsync:1:0:1"])
+ ctl = str(res.controls[0]).split(":")
+ cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
+ cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
+ controls=["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie))]
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
+ controls=controls)
+
+class ExtendedDirsyncTests(SimpleDirsyncTests):
+ def test_dirsync_linkedattributes(self):
+ flag_incr_linked = 2147483648
+ self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
+ res = self.ldb_admin.search(self.base_dn,
+ attrs=["member"],
+ expression="(name=Administrators)",
+ controls=["dirsync:1:%d:1" % flag_incr_linked])
+
+ self.assertTrue(res[0].get("member;range=1-1") != None )
+ self.assertTrue(len(res[0].get("member;range=1-1")) > 0)
+ size = len(res[0].get("member;range=1-1"))
+
+ ctl = str(res.controls[0]).split(":")
+ ctl[1] = "1"
+ ctl[2] = "%d" % flag_incr_linked
+ ctl[3] = "10000"
+ control1 = str(":".join(ctl))
+ self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
+ add_members_operation=True)
+ self.ldb_admin.add_remove_group_members("Administrators", self.dirsync_user,
+ add_members_operation=True)
+
+
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(name=Administrators)",
+ controls=[control1])
+
+ self.assertEqual(len(res[0].get("member;range=1-1")), 2)
+ ctl = str(res.controls[0]).split(":")
+ ctl[1] = "1"
+ ctl[2] = "%d" % flag_incr_linked
+ ctl[3] = "10000"
+ control1 = str(":".join(ctl))
+
+ # remove the user from the group
+ self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
+ add_members_operation=False)
+
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(name=Administrators)",
+ controls=[control1])
+
+ self.assertEqual(res[0].get("member;range=1-1"), None )
+ self.assertEqual(len(res[0].get("member;range=0-0")), 1)
+
+ ctl = str(res.controls[0]).split(":")
+ ctl[1] = "1"
+ ctl[2] = "%d" % flag_incr_linked
+ ctl[3] = "10000"
+ control2 = str(":".join(ctl))
+
+ self.ldb_admin.add_remove_group_members("Administrators", self.dirsync_user,
+ add_members_operation=False)
+
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(name=Administrators)",
+ controls=[control2])
+
+ self.assertEqual(res[0].get("member;range=1-1"), None )
+ self.assertEqual(len(res[0].get("member;range=0-0")), 1)
+
+ res = self.ldb_admin.search(self.base_dn,
+ expression="(name=Administrators)",
+ controls=[control1])
+
+ self.assertEqual(res[0].get("member;range=1-1"), None )
+ self.assertEqual(len(res[0].get("member;range=0-0")), 2)
+
+ def test_dirsync_deleted_items(self):
+ """Check that dirsync returnd deleted objects too"""
+ # Let's create an OU
+ self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
+ ouname="OU=testou3,%s" % self.base_dn
+ self.ouname = ouname
+ self.ldb_admin.create_ou(ouname)
+
+ # Specify LDAP_DIRSYNC_OBJECT_SECURITY
+ res = self.ldb_simple.search(self.base_dn,
+ expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
+ controls=["dirsync:1:1:1"])
+
+ guid = None
+ for e in res:
+ if str(e["name"]) == "testou3":
+ guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
+
+ self.assertTrue(guid != None)
+ ctl = str(res.controls[0]).split(":")
+ ctl[1] = "1"
+ ctl[2] = "1"
+ ctl[3] = "10000"
+ control1 = str(":".join(ctl))
+
+ # So now delete the object and check that
+ # we can see the object but deleted when admin
+ # we just see the objectGUID when simple user
+ delete_force(self.ldb_admin, ouname)
+
+ res = self.ldb_simple.search(self.base_dn,
+ expression="(objectClass=organizationalUnit)",
+ controls=[control1])
+ self.assertEqual(len(res), 1)
+ guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
+ self.assertEqual(guid2, guid)
+ self.assertEqual(str(res[0].dn), "")
+
+
+ldb = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
+
+runner = SubunitTestRunner()
+rc = 0
+#
+if not runner.run(unittest.makeSuite(SimpleDirsyncTests)).wasSuccessful():
+ rc = 1
+if not runner.run(unittest.makeSuite(ExtendedDirsyncTests)).wasSuccessful():
+ rc = 1
+
+sys.exit(rc)
diff --git a/source4/dsdb/tests/python/sam.py b/source4/dsdb/tests/python/sam.py
index 5f7c90db23..b08fba5a7d 100755
--- a/source4/dsdb/tests/python/sam.py
+++ b/source4/dsdb/tests/python/sam.py
@@ -36,8 +36,8 @@ from samba.dsdb import (UF_NORMAL_ACCOUNT, UF_ACCOUNTDISABLE,
ATYPE_SECURITY_LOCAL_GROUP, ATYPE_DISTRIBUTION_GLOBAL_GROUP,
ATYPE_DISTRIBUTION_UNIVERSAL_GROUP, ATYPE_DISTRIBUTION_LOCAL_GROUP,
ATYPE_WORKSTATION_TRUST)
-from samba.dcerpc.security import (DOMAIN_RID_USERS, DOMAIN_RID_DOMAIN_MEMBERS,
- DOMAIN_RID_DCS, DOMAIN_RID_READONLY_DCS)
+from samba.dcerpc.security import (DOMAIN_RID_USERS, DOMAIN_RID_ADMINS,
+ DOMAIN_RID_DOMAIN_MEMBERS, DOMAIN_RID_DCS, DOMAIN_RID_READONLY_DCS)
from subunit.run import SubunitTestRunner
import unittest
@@ -1471,25 +1471,33 @@ class SamTests(unittest.TestCase):
self.assertEquals(num, ERR_OTHER)
delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
-# This isn't supported yet in s4
-# try:
-# ldb.add({
-# "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
-# "objectclass": "user",
-# "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT)})
-# self.fail()
-# except LdbError, (num, _):
-# self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION)
-# delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
-#
-# try:
-# ldb.add({
-# "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
-# "objectclass": "user",
-# "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT)})
-# except LdbError, (num, _):
-# self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION)
-# delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
+ try:
+ ldb.add({
+ "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
+ "objectclass": "user",
+ "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT)})
+ self.fail()
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION)
+ delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
+
+ try:
+ ldb.add({
+ "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
+ "objectclass": "user",
+ "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT)})
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION)
+ delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
+
+ try:
+ ldb.add({
+ "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
+ "objectclass": "user",
+ "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT)})
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION)
+ delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
# This isn't supported yet in s4 - needs ACL module adaption
# try:
@@ -1570,17 +1578,16 @@ class SamTests(unittest.TestCase):
except LdbError, (num, _):
self.assertEquals(num, ERR_OTHER)
-# This isn't supported yet in s4
-# try:
-# m = Message()
-# m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
-# m["userAccountControl"] = MessageElement(
-# str(UF_SERVER_TRUST_ACCOUNT),
-# FLAG_MOD_REPLACE, "userAccountControl")
-# ldb.modify(m)
-# self.fail()
-# except LdbError, (num, _):
-# self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
+ try:
+ m = Message()
+ m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
+ m["userAccountControl"] = MessageElement(
+ str(UF_SERVER_TRUST_ACCOUNT),
+ FLAG_MOD_REPLACE, "userAccountControl")
+ ldb.modify(m)
+ self.fail()
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
m = Message()
m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
@@ -1589,6 +1596,17 @@ class SamTests(unittest.TestCase):
FLAG_MOD_REPLACE, "userAccountControl")
ldb.modify(m)
+ try:
+ m = Message()
+ m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
+ m["userAccountControl"] = MessageElement(
+ str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT),
+ FLAG_MOD_REPLACE, "userAccountControl")
+ ldb.modify(m)
+ self.fail()
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
+
res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn,
scope=SCOPE_BASE, attrs=["sAMAccountType"])
self.assertTrue(len(res1) == 1)
@@ -1866,7 +1884,215 @@ class SamTests(unittest.TestCase):
# except LdbError, (num, _):
# self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
+ # "primaryGroupID" does not change if account type remains the same
+
+ # For a user account
+
+ ldb.add({
+ "dn": "cn=ldaptestuser2,cn=users," + self.base_dn,
+ "objectclass": "user",
+ "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_ACCOUNTDISABLE)})
+
+ res1 = ldb.search("cn=ldaptestuser2,cn=users," + self.base_dn,
+ scope=SCOPE_BASE,
+ attrs=["userAccountControl"])
+ self.assertTrue(len(res1) == 1)
+ self.assertEquals(int(res1[0]["userAccountControl"][0]),
+ UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD | UF_ACCOUNTDISABLE)
+
+ m = Message()
+ m.dn = Dn(ldb, "<SID=" + ldb.get_domain_sid() + "-" + str(DOMAIN_RID_ADMINS) + ">")
+ m["member"] = MessageElement(
+ "cn=ldaptestuser2,cn=users," + self.base_dn, FLAG_MOD_ADD, "member")
+ ldb.modify(m)
+
+ m = Message()
+ m.dn = Dn(ldb, "cn=ldaptestuser2,cn=users," + self.base_dn)
+ m["primaryGroupID"] = MessageElement(str(DOMAIN_RID_ADMINS),
+ FLAG_MOD_REPLACE, "primaryGroupID")
+ ldb.modify(m)
+
+ m = Message()
+ m.dn = Dn(ldb, "cn=ldaptestuser2,cn=users," + self.base_dn)
+ m["userAccountControl"] = MessageElement(
+ str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD),
+ FLAG_MOD_REPLACE, "userAccountControl")
+ ldb.modify(m)
+
+ res1 = ldb.search("cn=ldaptestuser2,cn=users," + self.base_dn,
+ scope=SCOPE_BASE,
+ attrs=["userAccountControl", "primaryGroupID"])
+ self.assertTrue(len(res1) == 1)
+ self.assertTrue(int(res1[0]["userAccountControl"][0]) & UF_ACCOUNTDISABLE == 0)
+ self.assertEquals(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_ADMINS)
+
+ # For a workstation account
+
+ res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
+ scope=SCOPE_BASE,
+ attrs=["primaryGroupID"])
+ self.assertTrue(len(res1) == 1)
+ self.assertEquals(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_DOMAIN_MEMBERS)
+
+ m = Message()
+ m.dn = Dn(ldb, "<SID=" + ldb.get_domain_sid() + "-" + str(DOMAIN_RID_USERS) + ">")
+ m["member"] = MessageElement(
+ "cn=ldaptestcomputer,cn=computers," + self.base_dn, FLAG_MOD_ADD, "member")
+ ldb.modify(m)
+
+ m = Message()
+ m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
+ m["primaryGroupID"] = MessageElement(str(DOMAIN_RID_USERS),
+ FLAG_MOD_REPLACE, "primaryGroupID")
+ ldb.modify(m)
+
+ m = Message()
+ m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
+ m["userAccountControl"] = MessageElement(
+ str(UF_WORKSTATION_TRUST_ACCOUNT),
+ FLAG_MOD_REPLACE, "userAccountControl")
+ ldb.modify(m)
+
+ res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
+ scope=SCOPE_BASE,
+ attrs=["primaryGroupID"])
+ self.assertTrue(len(res1) == 1)
+ self.assertEquals(int(res1[0]["primaryGroupID"][0]), DOMAIN_RID_USERS)
+
delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
+ delete_force(self.ldb, "cn=ldaptestuser2,cn=users," + self.base_dn)
+ delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
+
+ def test_isCriticalSystemObject(self):
+ """Test the isCriticalSystemObject behaviour"""
+ print "Testing isCriticalSystemObject behaviour\n"
+
+ # Add tests
+
+ ldb.add({
+ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn,
+ "objectclass": "computer"})
+
+ res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
+ scope=SCOPE_BASE,
+ attrs=["isCriticalSystemObject"])
+ self.assertTrue(len(res1) == 1)
+ self.assertTrue("isCriticalSystemObject" not in res1[0])
+
+ delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
+
+ ldb.add({
+ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn,
+ "objectclass": "computer",
+ "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT)})
+
+ res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
+ scope=SCOPE_BASE,
+ attrs=["isCriticalSystemObject"])
+ self.assertTrue(len(res1) == 1)
+ self.assertEquals(res1[0]["isCriticalSystemObject"][0], "FALSE")
+
+ delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
+
+ ldb.add({
+ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn,
+ "objectclass": "computer",
+ "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT)})
+
+ res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
+ scope=SCOPE_BASE,
+ attrs=["isCriticalSystemObject"])
+ self.assertTrue(len(res1) == 1)
+ self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE")
+
+ delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
+
+ ldb.add({
+ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn,
+ "objectclass": "computer",
+ "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT)})
+
+ res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
+ scope=SCOPE_BASE,
+ attrs=["isCriticalSystemObject"])
+ self.assertTrue(len(res1) == 1)
+ self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE")
+
+ # Modification tests
+
+ m = Message()
+ m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
+ m["userAccountControl"] = MessageElement(str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD),
+ FLAG_MOD_REPLACE, "userAccountControl")
+ ldb.modify(m)
+
+ res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
+ scope=SCOPE_BASE,
+ attrs=["isCriticalSystemObject"])
+ self.assertTrue(len(res1) == 1)
+ self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE")
+
+ m = Message()
+ m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
+ m["userAccountControl"] = MessageElement(str(UF_WORKSTATION_TRUST_ACCOUNT),
+ FLAG_MOD_REPLACE, "userAccountControl")
+ ldb.modify(m)
+
+ res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
+ scope=SCOPE_BASE,
+ attrs=["isCriticalSystemObject"])
+ self.assertTrue(len(res1) == 1)
+ self.assertEquals(res1[0]["isCriticalSystemObject"][0], "FALSE")
+
+ m = Message()
+ m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
+ m["userAccountControl"] = MessageElement(
+ str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PARTIAL_SECRETS_ACCOUNT),
+ FLAG_MOD_REPLACE, "userAccountControl")
+ ldb.modify(m)
+
+ res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
+ scope=SCOPE_BASE,
+ attrs=["isCriticalSystemObject"])
+ self.assertTrue(len(res1) == 1)
+ self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE")
+
+ m = Message()
+ m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
+ m["userAccountControl"] = MessageElement(str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD),
+ FLAG_MOD_REPLACE, "userAccountControl")
+ ldb.modify(m)
+
+ res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
+ scope=SCOPE_BASE,
+ attrs=["isCriticalSystemObject"])
+ self.assertTrue(len(res1) == 1)
+ self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE")
+
+ m = Message()
+ m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
+ m["userAccountControl"] = MessageElement(str(UF_SERVER_TRUST_ACCOUNT),
+ FLAG_MOD_REPLACE, "userAccountControl")
+ ldb.modify(m)
+
+ res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
+ scope=SCOPE_BASE,
+ attrs=["isCriticalSystemObject"])
+ self.assertTrue(len(res1) == 1)
+ self.assertEquals(res1[0]["isCriticalSystemObject"][0], "TRUE")
+
+ m = Message()
+ m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
+ m["userAccountControl"] = MessageElement(str(UF_WORKSTATION_TRUST_ACCOUNT),
+ FLAG_MOD_REPLACE, "userAccountControl")
+ ldb.modify(m)
+
+ res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn,
+ scope=SCOPE_BASE,
+ attrs=["isCriticalSystemObject"])
+ self.assertTrue(len(res1) == 1)
+ self.assertEquals(res1[0]["isCriticalSystemObject"][0], "FALSE")
+
delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
def test_service_principal_name_updates(self):
diff --git a/source4/dsdb/tests/python/token_group.py b/source4/dsdb/tests/python/token_group.py
index 62bdbd5ee0..fb7654e7e0 100755
--- a/source4/dsdb/tests/python/token_group.py
+++ b/source4/dsdb/tests/python/token_group.py
@@ -78,7 +78,7 @@ class TokenTest(samba.tests.TestCase):
res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
self.assertEquals(len(res), 1)
- print("Geting tokenGroups from rootDSE")
+ print("Getting tokenGroups from rootDSE")
tokengroups = []
for sid in res[0]['tokenGroups']:
tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid)))
@@ -93,7 +93,7 @@ class TokenTest(samba.tests.TestCase):
self.fail(msg="calculated groups don't match against rootDSE tokenGroups")
def test_dn_tokenGroups(self):
- print("Geting tokenGroups from user DN")
+ print("Getting tokenGroups from user DN")
res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
self.assertEquals(len(res), 1)