#!/usr/bin/env python # -*- coding: utf-8 -*- # This is a port of the original in testprogs/ejs/ldap.js import optparse import sys import time import base64 import os sys.path.append("bin/python") import samba samba.ensure_external_module("subunit", "subunit/python") samba.ensure_external_module("testtools", "testtools") import samba.getopt as options from samba.auth import system_session from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE, LdbError from ldb import ERR_NO_SUCH_OBJECT, ERR_ATTRIBUTE_OR_VALUE_EXISTS from ldb import ERR_ENTRY_ALREADY_EXISTS, ERR_UNWILLING_TO_PERFORM from ldb import ERR_NOT_ALLOWED_ON_NON_LEAF, ERR_OTHER, ERR_INVALID_DN_SYNTAX from ldb import ERR_NO_SUCH_ATTRIBUTE from ldb import ERR_OBJECT_CLASS_VIOLATION, ERR_NOT_ALLOWED_ON_RDN from ldb import ERR_NAMING_VIOLATION, ERR_CONSTRAINT_VIOLATION from ldb import ERR_UNDEFINED_ATTRIBUTE_TYPE, ERR_INSUFFICIENT_ACCESS_RIGHTS from ldb import Message, MessageElement, Dn from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE from samba import Ldb from samba.dsdb import (UF_NORMAL_ACCOUNT, UF_INTERDOMAIN_TRUST_ACCOUNT, UF_WORKSTATION_TRUST_ACCOUNT, UF_SERVER_TRUST_ACCOUNT, UF_PARTIAL_SECRETS_ACCOUNT, UF_TEMP_DUPLICATE_ACCOUNT, UF_PASSWD_NOTREQD, UF_ACCOUNTDISABLE, ATYPE_NORMAL_ACCOUNT, GTYPE_SECURITY_BUILTIN_LOCAL_GROUP, GTYPE_SECURITY_DOMAIN_LOCAL_GROUP, GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP, GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP, GTYPE_DISTRIBUTION_GLOBAL_GROUP, GTYPE_DISTRIBUTION_UNIVERSAL_GROUP, ATYPE_SECURITY_GLOBAL_GROUP, ATYPE_SECURITY_UNIVERSAL_GROUP, ATYPE_SECURITY_LOCAL_GROUP, ATYPE_DISTRIBUTION_GLOBAL_GROUP, ATYPE_DISTRIBUTION_UNIVERSAL_GROUP, ATYPE_DISTRIBUTION_LOCAL_GROUP, ATYPE_WORKSTATION_TRUST, SYSTEM_FLAG_DOMAIN_DISALLOW_MOVE, SYSTEM_FLAG_CONFIG_ALLOW_RENAME, SYSTEM_FLAG_CONFIG_ALLOW_MOVE, SYSTEM_FLAG_CONFIG_ALLOW_LIMITED_MOVE) from samba.dcerpc.security import (DOMAIN_RID_USERS, DOMAIN_RID_DOMAIN_MEMBERS, DOMAIN_RID_DCS, DOMAIN_RID_READONLY_DCS) from subunit.run import SubunitTestRunner import unittest from samba.ndr import ndr_pack, ndr_unpack from samba.dcerpc import security parser = optparse.OptionParser("sam.py [options] <host>") sambaopts = options.SambaOptions(parser) parser.add_option_group(sambaopts) parser.add_option_group(options.VersionOptions(parser)) # use command line creds if available credopts = options.CredentialsOptions(parser) parser.add_option_group(credopts) opts, args = parser.parse_args() if len(args) < 1: parser.print_usage() sys.exit(1) host = args[0] lp = sambaopts.get_loadparm() creds = credopts.get_credentials(lp) class SamTests(unittest.TestCase): def delete_force(self, ldb, dn): try: ldb.delete(dn) except LdbError, (num, _): self.assertEquals(num, ERR_NO_SUCH_OBJECT) def find_basedn(self, ldb): res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["defaultNamingContext"]) self.assertEquals(len(res), 1) return res[0]["defaultNamingContext"][0] def find_domain_sid(self): res = self.ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE) return ndr_unpack( security.dom_sid,res[0]["objectSid"][0]) def setUp(self): super(SamTests, self).setUp() self.ldb = ldb self.gc_ldb = gc_ldb self.base_dn = self.find_basedn(ldb) self.domain_sid = self.find_domain_sid() print "baseDN: %s\n" % self.base_dn self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) self.delete_force(self.ldb, "cn=ldaptestcomputer,cn=users," + self.base_dn) self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) self.delete_force(self.ldb, "cn=ldaptestuser2,cn=users," + self.base_dn) def test_users_groups(self): """This tests the SAM users and groups behaviour""" print "Testing users and groups behaviour\n" ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group"}) ldb.add({ "dn": "cn=ldaptestgroup2,cn=users," + self.base_dn, "objectclass": "group"}) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["objectSID"]) self.assertTrue(len(res1) == 1) group_rid_1 = security.dom_sid(ldb.schema_format_value("objectSID", res1[0]["objectSID"][0])).split()[1] res1 = ldb.search("cn=ldaptestgroup2,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["objectSID"]) self.assertTrue(len(res1) == 1) group_rid_2 = security.dom_sid(ldb.schema_format_value("objectSID", res1[0]["objectSID"][0])).split()[1] # Try to create a user with an invalid primary group try: ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": ["user", "person"], "primaryGroupID": "0"}) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # Try to Create a user with a valid primary group try: ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": ["user", "person"], "primaryGroupID": str(group_rid_1)}) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # Test to see how we should behave when the user account doesn't # exist m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement("0", FLAG_MOD_REPLACE, "primaryGroupID") try: ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_NO_SUCH_OBJECT) # Test to see how we should behave when the account isn't a user m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement("0", FLAG_MOD_REPLACE, "primaryGroupID") try: ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION) # Test default primary groups on add operations ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": ["user", "person"]}) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEquals(res1[0]["primaryGroupID"][0], str(DOMAIN_RID_USERS)) self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": ["user", "person"], "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD) }) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEquals(res1[0]["primaryGroupID"][0], str(DOMAIN_RID_USERS)) self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # unfortunately the INTERDOMAIN_TRUST_ACCOUNT case cannot be tested # since such accounts aren't directly creatable (ACCESS_DENIED) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": ["computer"], "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD) }) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEquals(res1[0]["primaryGroupID"][0], str(DOMAIN_RID_DOMAIN_MEMBERS)) self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": ["computer"], "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT | UF_PASSWD_NOTREQD) }) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEquals(res1[0]["primaryGroupID"][0], str(DOMAIN_RID_DCS)) self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # Read-only DC accounts are only creatable by # UF_WORKSTATION_TRUST_ACCOUNT and work only on DCs >= 2008 (therefore # we have a fallback in the assertion) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": ["computer"], "userAccountControl": str(UF_PARTIAL_SECRETS_ACCOUNT | UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD) }) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertTrue(res1[0]["primaryGroupID"][0] == str(DOMAIN_RID_READONLY_DCS) or res1[0]["primaryGroupID"][0] == str(DOMAIN_RID_DOMAIN_MEMBERS)) self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # Test default primary groups on modify operations ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": ["user", "person"]}) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement(str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEquals(res1[0]["primaryGroupID"][0], str(DOMAIN_RID_USERS)) # unfortunately the INTERDOMAIN_TRUST_ACCOUNT case cannot be tested # since such accounts aren't directly creatable (ACCESS_DENIED) self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": ["computer"]}) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEquals(res1[0]["primaryGroupID"][0], str(DOMAIN_RID_USERS)) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement(str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEquals(res1[0]["primaryGroupID"][0], str(DOMAIN_RID_DOMAIN_MEMBERS)) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement(str(UF_SERVER_TRUST_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertEquals(res1[0]["primaryGroupID"][0], str(DOMAIN_RID_DCS)) # Read-only DC accounts are only creatable by # UF_WORKSTATION_TRUST_ACCOUNT and work only on DCs >= 2008 (therefore # we have a fallback in the assertion) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement(str(UF_PARTIAL_SECRETS_ACCOUNT | UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupID"]) self.assertTrue(len(res1) == 1) self.assertTrue(res1[0]["primaryGroupID"][0] == str(DOMAIN_RID_READONLY_DCS) or res1[0]["primaryGroupID"][0] == str(DOMAIN_RID_DOMAIN_MEMBERS)) self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # Recreate account for further tests ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": ["user", "person"]}) # We should be able to reset our actual primary group m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement(str(DOMAIN_RID_USERS), FLAG_MOD_REPLACE, "primaryGroupID") ldb.modify(m) # Try to add invalid primary group m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement("0", FLAG_MOD_REPLACE, "primaryGroupID") try: ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) # Try to make group 1 primary - should be denied since it is not yet # secondary m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement(str(group_rid_1), FLAG_MOD_REPLACE, "primaryGroupID") try: ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) # Make group 1 secondary m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["member"] = MessageElement("cn=ldaptestuser,cn=users," + self.base_dn, FLAG_MOD_REPLACE, "member") ldb.modify(m) # Make group 1 primary m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement(str(group_rid_1), FLAG_MOD_REPLACE, "primaryGroupID") ldb.modify(m) # Try to delete group 1 - should be denied try: ldb.delete("cn=ldaptestgroup,cn=users," + self.base_dn) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_ENTRY_ALREADY_EXISTS) # Try to add group 1 also as secondary - should be denied m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["member"] = MessageElement("cn=ldaptestuser,cn=users," + self.base_dn, FLAG_MOD_ADD, "member") try: ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_ENTRY_ALREADY_EXISTS) # Try to add invalid member to group 1 - should be denied m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["member"] = MessageElement( "cn=ldaptestuser3,cn=users," + self.base_dn, FLAG_MOD_ADD, "member") try: ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_NO_SUCH_OBJECT) # Make group 2 secondary m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) m["member"] = MessageElement("cn=ldaptestuser,cn=users," + self.base_dn, FLAG_MOD_ADD, "member") ldb.modify(m) # Swap the groups m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement(str(group_rid_2), FLAG_MOD_REPLACE, "primaryGroupID") ldb.modify(m) # Old primary group should contain a "member" attribute for the user, # the new shouldn't contain anymore one res1 = ldb.search("cn=ldaptestgroup, cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["member"]) self.assertTrue(len(res1) == 1) self.assertTrue(len(res1[0]["member"]) == 1) self.assertEquals(res1[0]["member"][0].lower(), ("cn=ldaptestuser,cn=users," + self.base_dn).lower()) res1 = ldb.search("cn=ldaptestgroup2, cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["member"]) self.assertTrue(len(res1) == 1) self.assertFalse("member" in res1[0]) # Also this should be denied try: ldb.add({ "dn": "cn=ldaptestuser1,cn=users," + self.base_dn, "objectclass": ["user", "person"], "primaryGroupID": "0"}) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) self.delete_force(self.ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn) def test_sam_attributes(self): """Test the behaviour of special attributes of SAM objects""" print "Testing the behaviour of special attributes of SAM objects\n""" ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": ["user", "person"]}) ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group"}) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement("0", FLAG_MOD_ADD, "groupType") try: ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement([], FLAG_MOD_DELETE, "groupType") try: ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement("0", FLAG_MOD_ADD, "primaryGroupID") try: ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["primaryGroupID"] = MessageElement([], FLAG_MOD_DELETE, "primaryGroupID") try: ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement("0", FLAG_MOD_ADD, "userAccountControl") try: ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement([], FLAG_MOD_DELETE, "userAccountControl") try: ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["sAMAccountType"] = MessageElement("0", FLAG_MOD_ADD, "sAMAccountType") try: ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["sAMAccountType"] = MessageElement([], FLAG_MOD_REPLACE, "sAMAccountType") try: ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["sAMAccountType"] = MessageElement([], FLAG_MOD_DELETE, "sAMAccountType") try: ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) def test_primary_group_token_constructed(self): """Test the primary group token behaviour (hidden-generated-readonly attribute on groups) and some other constructed attributes""" print "Testing primary group token behaviour and other constructed attributes\n" try: ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "primaryGroupToken": "100"}) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNDEFINED_ATTRIBUTE_TYPE) self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": ["user", "person"]}) ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group"}) # Testing for one invalid, and one valid operational attribute, but also the things they are built from res1 = ldb.search(self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupToken", "canonicalName", "objectClass", "objectSid"]) self.assertTrue(len(res1) == 1) self.assertFalse("primaryGroupToken" in res1[0]) self.assertTrue("canonicalName" in res1[0]) self.assertTrue("objectClass" in res1[0]) self.assertTrue("objectSid" in res1[0]) res1 = ldb.search(self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupToken", "canonicalName"]) self.assertTrue(len(res1) == 1) self.assertFalse("primaryGroupToken" in res1[0]) self.assertFalse("objectSid" in res1[0]) self.assertFalse("objectClass" in res1[0]) self.assertTrue("canonicalName" in res1[0]) res1 = ldb.search("cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupToken"]) self.assertTrue(len(res1) == 1) self.assertFalse("primaryGroupToken" in res1[0]) res1 = ldb.search("cn=ldaptestuser, cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupToken"]) self.assertTrue(len(res1) == 1) self.assertFalse("primaryGroupToken" in res1[0]) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE) self.assertTrue(len(res1) == 1) self.assertFalse("primaryGroupToken" in res1[0]) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["primaryGroupToken", "objectSID"]) self.assertTrue(len(res1) == 1) primary_group_token = int(res1[0]["primaryGroupToken"][0]) rid = security.dom_sid(ldb.schema_format_value("objectSID", res1[0]["objectSID"][0])).split()[1] self.assertEquals(primary_group_token, rid) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["primaryGroupToken"] = "100" try: ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) def test_tokenGroups(self): """Test the tokenGroups behaviour (hidden-generated-readonly attribute on SAM objects)""" print "Testing tokenGroups behaviour\n" # The domain object shouldn't contain any "tokenGroups" entry res = ldb.search(self.base_dn, scope=SCOPE_BASE, attrs=["tokenGroups"]) self.assertTrue(len(res) == 1) self.assertFalse("tokenGroups" in res[0]) # The domain administrator should contain "tokenGroups" entries # (the exact number depends on the domain/forest function level and the # DC software versions) res = ldb.search("cn=Administrator,cn=Users," + self.base_dn, scope=SCOPE_BASE, attrs=["tokenGroups"]) self.assertTrue(len(res) == 1) self.assertTrue("tokenGroups" in res[0]) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": ["user", "person"]}) # This testuser should contain at least two "tokenGroups" entries # (exactly two on an unmodified "Domain Users" and "Users" group) res = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["tokenGroups"]) self.assertTrue(len(res) == 1) self.assertTrue(len(res[0]["tokenGroups"]) >= 2) # one entry which we need to find should point to domains "Domain Users" # group and another entry should point to the builtin "Users"group domain_users_group_found = False users_group_found = False for sid in res[0]["tokenGroups"]: rid = security.dom_sid(ldb.schema_format_value("objectSID", sid)).split()[1] if rid == 513: domain_users_group_found = True if rid == 545: users_group_found = True self.assertTrue(domain_users_group_found) self.assertTrue(users_group_found) self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def test_groupType(self): """Test the groupType behaviour""" print "Testing groupType behaviour\n" # You can never create or change to a # "GTYPE_SECURITY_BUILTIN_LOCAL_GROUP" # Add operation # Invalid attribute try: ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "groupType": "0"}) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) try: ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "groupType": str(GTYPE_SECURITY_BUILTIN_LOCAL_GROUP)}) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "groupType": str(GTYPE_SECURITY_GLOBAL_GROUP)}) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_GLOBAL_GROUP) self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "groupType": str(GTYPE_SECURITY_UNIVERSAL_GROUP)}) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_UNIVERSAL_GROUP) self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "groupType": str(GTYPE_SECURITY_DOMAIN_LOCAL_GROUP)}) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_LOCAL_GROUP) self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "groupType": str(GTYPE_DISTRIBUTION_GLOBAL_GROUP)}) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_GLOBAL_GROUP) self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "groupType": str(GTYPE_DISTRIBUTION_UNIVERSAL_GROUP)}) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_UNIVERSAL_GROUP) self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group", "groupType": str(GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)}) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_LOCAL_GROUP) self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) # Modify operation ldb.add({ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn, "objectclass": "group"}) # We can change in this direction: global <-> universal <-> local # On each step also the group type itself (security/distribution) is # variable. # After creation we should have a "security global group" res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_GLOBAL_GROUP) # Invalid attribute try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement("0", FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) # Security groups # Default is "global group" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_GLOBAL_GROUP) # Change to "local" (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_DOMAIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) # Change to "universal" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_UNIVERSAL_GROUP) # Change back to "global" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_GLOBAL_GROUP) # Change back to "universal" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_UNIVERSAL_GROUP) # Change to "local" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_DOMAIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_LOCAL_GROUP) # Change to "global" (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) # Change to "builtin local" (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_BUILTIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) # Change back to "universal" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_UNIVERSAL_GROUP) # Change to "builtin local" (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_BUILTIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) # Change back to "global" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_GLOBAL_GROUP) # Change to "builtin local" (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_BUILTIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) # Distribution groups # Default is "global group" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_GLOBAL_GROUP) # Change to local (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) # Change to "universal" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_UNIVERSAL_GROUP) # Change back to "global" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_GLOBAL_GROUP) # Change back to "universal" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_UNIVERSAL_GROUP) # Change to "local" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_LOCAL_GROUP) # Change to "global" (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) # Change back to "universal" # Try to add invalid member to group 1 - should be denied m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["member"] = MessageElement( "cn=ldaptestuser3,cn=users," + self.base_dn, FLAG_MOD_ADD, "member") try: ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_NO_SUCH_OBJECT) # Make group 2 secondary m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_UNIVERSAL_GROUP) # Change back to "global" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_GLOBAL_GROUP) # Both group types: this performs only random checks - all possibilities # would require too much code. # Default is "global group" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_GLOBAL_GROUP) # Change to "local" (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) # Change to "universal" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_UNIVERSAL_GROUP) # Change back to "global" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_GLOBAL_GROUP) # Change back to "universal" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_UNIVERSAL_GROUP) # Change to "local" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_DISTRIBUTION_LOCAL_GROUP) # Change to "global" (shouldn't work) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_DISTRIBUTION_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) # Change back to "universal" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_UNIVERSAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_UNIVERSAL_GROUP) # Change back to "global" m = Message() m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) m["groupType"] = MessageElement( str(GTYPE_SECURITY_GLOBAL_GROUP), FLAG_MOD_REPLACE, "groupType") ldb.modify(m) res1 = ldb.search("cn=ldaptestgroup,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_SECURITY_GLOBAL_GROUP) self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn) def test_userAccountControl(self): """Test the userAccountControl behaviour""" print "Testing userAccountControl behaviour\n" # With a user object # Add operation # As user you can only set a normal account. # The UF_PASSWD_NOTREQD flag is needed since we haven't requested a # password yet. # With SYSTEM rights you can set a interdomain trust account. # Invalid attribute try: ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": ["user", "person"], "userAccountControl": "0"}) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # This has to wait until s4 supports it (needs a password module change) # try: # ldb.add({ # "dn": "cn=ldaptestuser,cn=users," + self.base_dn, # "objectclass": ["user", "person"], # "userAccountControl": str(UF_NORMAL_ACCOUNT)}) # self.fail() # except LdbError, (num, _): # self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) # self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": ["user", "person"], "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD)}) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) try: ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": ["user", "person"], "userAccountControl": str(UF_TEMP_DUPLICATE_ACCOUNT)}) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_OTHER) self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # This isn't supported yet in s4 # try: # ldb.add({ # "dn": "cn=ldaptestuser,cn=users," + self.base_dn, # "objectclass": ["user", "person"], # "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT)}) # self.fail() # except LdbError, (num, _): # self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION) # self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # # try: # ldb.add({ # "dn": "cn=ldaptestuser,cn=users," + self.base_dn, # "objectclass": ["user", "person"], # "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT)}) # except LdbError, (num, _): # self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION) # self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # This isn't supported yet in s4 - needs ACL module adaption # try: # ldb.add({ # "dn": "cn=ldaptestuser,cn=users," + self.base_dn, # "objectclass": ["user", "person"], # "userAccountControl": str(UF_INTERDOMAIN_TRUST_ACCOUNT)}) # self.fail() # except LdbError, (num, _): # self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) # self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # Modify operation ldb.add({ "dn": "cn=ldaptestuser,cn=users," + self.base_dn, "objectclass": ["user", "person"]}) # After creation we should have a normal account res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) # As user you can only switch from a normal account to a workstation # trust account and back. # The UF_PASSWD_NOTREQD flag is needed since we haven't requested a # password yet. # With SYSTEM rights you can switch to a interdomain trust account. # Invalid attribute try: m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) # This has to wait until s4 supports it (needs a password module change) # try: # m = Message() # m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # m["userAccountControl"] = MessageElement( # str(UF_NORMAL_ACCOUNT), # FLAG_MOD_REPLACE, "userAccountControl") # ldb.modify(m) # except LdbError, (num, _): # self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_TEMP_DUPLICATE_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_OTHER) # This isn't supported yet in s4 # try: # m = Message() # m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # m["userAccountControl"] = MessageElement( # str(UF_SERVER_TRUST_ACCOUNT), # FLAG_MOD_REPLACE, "userAccountControl") # ldb.modify(m) # self.fail() # except LdbError, (num, _): # self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_WORKSTATION_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_WORKSTATION_TRUST) m = Message() m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) # This isn't supported yet in s4 - needs ACL module adaption # try: # m = Message() # m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) # m["userAccountControl"] = MessageElement( # str(UF_INTERDOMAIN_TRUST_ACCOUNT), # FLAG_MOD_REPLACE, "userAccountControl") # ldb.modify(m) # self.fail() # except LdbError, (num, _): # self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) # With a computer object # Add operation # As computer you can set a normal account and a server trust account. # The UF_PASSWD_NOTREQD flag is needed since we haven't requested a # password yet. # With SYSTEM rights you can set a interdomain trust account. # Invalid attribute try: ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": ["computer"], "userAccountControl": "0"}) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) self.delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) # This has to wait until s4 supports it (needs a password module change) # try: # ldb.add({ # "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, # "objectclass": ["computer"], # "userAccountControl": str(UF_NORMAL_ACCOUNT)}) # self.fail() # except LdbError, (num, _): # self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) # self.delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": ["computer"], "userAccountControl": str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD)}) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) self.delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) try: ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": ["computer"], "userAccountControl": str(UF_TEMP_DUPLICATE_ACCOUNT)}) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_OTHER) self.delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": ["computer"], "userAccountControl": str(UF_SERVER_TRUST_ACCOUNT)}) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_WORKSTATION_TRUST) self.delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) try: ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": ["computer"], "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT)}) except LdbError, (num, _): self.assertEquals(num, ERR_OBJECT_CLASS_VIOLATION) self.delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) # This isn't supported yet in s4 - needs ACL module adaption # try: # ldb.add({ # "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, # "objectclass": ["computer"], # "userAccountControl": str(UF_INTERDOMAIN_TRUST_ACCOUNT)}) # self.fail() # except LdbError, (num, _): # self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) # self.delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) # Modify operation ldb.add({ "dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn, "objectclass": ["computer"]}) # After creation we should have a normal account res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) # As computer you can switch from a normal account to a workstation # or server trust account and back (also swapping between trust # accounts is allowed). # The UF_PASSWD_NOTREQD flag is needed since we haven't requested a # password yet. # With SYSTEM rights you can switch to a interdomain trust account. # Invalid attribute try: m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) except LdbError, (num, _): self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) # This has to wait until s4 supports it (needs a password module change) # try: # m = Message() # m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) # m["userAccountControl"] = MessageElement( # str(UF_NORMAL_ACCOUNT), # FLAG_MOD_REPLACE, "userAccountControl") # ldb.modify(m) # except LdbError, (num, _): # self.assertEquals(num, ERR_UNWILLING_TO_PERFORM) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) try: m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_TEMP_DUPLICATE_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_OTHER) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_SERVER_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_WORKSTATION_TRUST) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_WORKSTATION_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_WORKSTATION_TRUST) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_NORMAL_ACCOUNT | UF_PASSWD_NOTREQD), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_SERVER_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_WORKSTATION_TRUST) m = Message() m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) m["userAccountControl"] = MessageElement( str(UF_WORKSTATION_TRUST_ACCOUNT), FLAG_MOD_REPLACE, "userAccountControl") ldb.modify(m) res1 = ldb.search("cn=ldaptestcomputer,cn=computers," + self.base_dn, scope=SCOPE_BASE, attrs=["sAMAccountType"]) self.assertTrue(len(res1) == 1) self.assertEquals(int(res1[0]["sAMAccountType"][0]), ATYPE_WORKSTATION_TRUST) # This isn't supported yet in s4 - needs ACL module adaption # try: # m = Message() # m.dn = Dn(ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) # m["userAccountControl"] = MessageElement( # str(UF_INTERDOMAIN_TRUST_ACCOUNT), # FLAG_MOD_REPLACE, "userAccountControl") # ldb.modify(m) # self.fail() # except LdbError, (num, _): # self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) self.delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn) if not "://" in host: if os.path.isfile(host): host = "tdb://%s" % host else: host = "ldap://%s" % host ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp) if not "tdb://" in host: gc_ldb = Ldb("%s:3268" % host, credentials=creds, session_info=system_session(), lp=lp) else: gc_ldb = None runner = SubunitTestRunner() rc = 0 if not runner.run(unittest.makeSuite(SamTests)).wasSuccessful(): rc = 1 sys.exit(rc)