summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xsource4/dsdb/tests/python/acl.py288
1 files changed, 274 insertions, 14 deletions
diff --git a/source4/dsdb/tests/python/acl.py b/source4/dsdb/tests/python/acl.py
index 12f653b2f2..977221fe11 100755
--- a/source4/dsdb/tests/python/acl.py
+++ b/source4/dsdb/tests/python/acl.py
@@ -21,10 +21,10 @@ from ldb import ERR_OPERATIONS_ERROR
from ldb import Message, MessageElement, Dn
from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD, FLAG_MOD_DELETE
from samba.ndr import ndr_pack, ndr_unpack
-from samba.dcerpc import security
+from samba.dcerpc import security, drsuapi, misc, netlogon, nbt
from samba.auth import system_session
-from samba import gensec, sd_utils
+from samba import gensec, sd_utils, join
from samba.samdb import SamDB
from samba.credentials import Credentials
import samba.tests
@@ -47,6 +47,12 @@ if len(args) < 1:
sys.exit(1)
host = args[0]
+if not "://" in host:
+ ldaphost = "ldap://%s" % host
+else:
+ ldaphost = host
+ start = host.rindex("://")
+ host = host.lstrip(start+3)
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
@@ -87,7 +93,7 @@ class AclTests(samba.tests.TestCase):
creds_tmp.set_workstation(creds.get_workstation())
creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
| gensec.FEATURE_SEAL)
- ldb_target = SamDB(url=host, credentials=creds_tmp, lp=lp)
+ ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
return ldb_target
# Test if we have any additional groups for users than default ones
@@ -238,7 +244,7 @@ class AclAddTests(AclTests):
def test_add_anonymous(self):
"""Test add operation with anonymous user"""
- anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
+ anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
try:
anonymous.newuser("test_add_anonymous", self.user_pass)
except LdbError, (num, _):
@@ -574,7 +580,7 @@ Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
def test_modify_anonymous(self):
"""Test add operation with anonymous user"""
- anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
+ anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
self.ldb_admin.newuser("test_anonymous", "samba123@")
m = Message()
m.dn = Dn(anonymous, self.get_user_dn("test_anonymous"))
@@ -658,7 +664,7 @@ class AclSearchTests(AclTests):
def test_search_anonymous1(self):
"""Verify access of rootDSE with the correct request"""
- anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
+ anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE)
self.assertEquals(len(res), 1)
#verify some of the attributes
@@ -674,7 +680,7 @@ class AclSearchTests(AclTests):
def test_search_anonymous2(self):
"""Make sure we cannot access anything else"""
- anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
+ anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
try:
res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_SUBTREE)
except LdbError, (num, _):
@@ -702,7 +708,7 @@ class AclSearchTests(AclTests):
mod = "(A;CI;LC;;;AN)"
self.sd_utils.dacl_add_ace("OU=test_search_ou1," + self.base_dn, mod)
self.ldb_admin.create_ou("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
- anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
+ anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
res = anonymous.search("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn,
expression="(objectClass=*)", scope=SCOPE_SUBTREE)
self.assertEquals(len(res), 1)
@@ -997,7 +1003,7 @@ class AclDeleteTests(AclTests):
def test_delete_anonymous(self):
"""Test add operation with anonymous user"""
- anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
+ anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
self.ldb_admin.newuser("test_anonymous", "samba123@")
try:
@@ -1556,11 +1562,265 @@ class AclExtendedTests(AclTests):
self.assertEqual(len(res),1)
self.assertTrue("nTSecurityDescriptor" in res[0].keys())
+class AclSPNTests(AclTests):
+
+ def setUp(self):
+ super(AclSPNTests, self).setUp()
+ self.dcname = "TESTSRV8"
+ self.rodcname = "TESTRODC8"
+ self.computername = "testcomp8"
+ self.test_user = "spn_test_user8"
+ self.computerdn = "CN=%s,CN=computers,%s" % (self.computername, self.base_dn)
+ self.dc_dn = "CN=%s,OU=Domain Controllers,%s" % (self.dcname, self.base_dn)
+ self.site = "Default-First-Site-Name"
+ self.rodcctx = samba.join.dc_join(server=host, creds=creds, lp=lp, site=self.site, netbios_name=self.rodcname,
+ targetdir=None, domain=None)
+ self.dcctx = samba.join.dc_join(server=host, creds=creds, lp=lp, site=self.site, netbios_name=self.dcname,
+ targetdir=None, domain=None)
+ self.ldb_admin.newuser(self.test_user, self.user_pass)
+ self.ldb_user1 = self.get_ldb_connection(self.test_user, self.user_pass)
+ self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.test_user))
+ self.create_computer(self.computername, self.dcctx.dnsdomain)
+ self.create_rodc(self.rodcctx)
+ self.create_dc(self.dcctx)
+
+ def tearDown(self):
+ super(AclSPNTests, self).tearDown()
+ self.rodcctx.cleanup_old_join()
+ self.dcctx.cleanup_old_join()
+ delete_force(self.ldb_admin, "cn=%s,cn=computers,%s" % (self.computername, self.base_dn))
+ delete_force(self.ldb_admin, self.get_user_dn(self.test_user))
+
+ def replace_spn(self, _ldb, dn, spn):
+ print "Setting spn %s on %s" % (spn, dn)
+ res = self.ldb_admin.search(dn, expression="(objectClass=*)",
+ scope=SCOPE_BASE, attrs=["servicePrincipalName"])
+ if "servicePrincipalName" in res[0].keys():
+ flag = FLAG_MOD_REPLACE
+ else:
+ flag = FLAG_MOD_ADD
+
+ msg = Message()
+ msg.dn = Dn(self.ldb_admin, dn)
+ msg["servicePrincipalName"] = MessageElement(spn, flag,
+ "servicePrincipalName")
+ _ldb.modify(msg)
+
+ def create_computer(self, computername, domainname):
+ dn = "CN=%s,CN=computers,%s" % (computername, self.base_dn)
+ samaccountname = computername + "$"
+ dnshostname = "%s.%s" % (computername, domainname)
+ self.ldb_admin.add({
+ "dn": dn,
+ "objectclass": "computer",
+ "sAMAccountName": samaccountname,
+ "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
+ "dNSHostName": dnshostname})
+
+ # same as for join_RODC, but do not set any SPNs
+ def create_rodc(self, ctx):
+ ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
+
+ ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
+ "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
+ "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
+ "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
+ "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
+ ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
+
+ mysid = ctx.get_mysid()
+ admin_dn = "<SID=%s>" % mysid
+ ctx.managedby = admin_dn
+
+ ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
+ samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
+ samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
+
+ ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
+ ctx.secure_channel_type = misc.SEC_CHAN_RODC
+ ctx.RODC = True
+ ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
+ drsuapi.DRSUAPI_DRS_PER_SYNC |
+ drsuapi.DRSUAPI_DRS_GET_ANC |
+ drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
+ drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
+
+ ctx.join_add_objects()
+
+ def create_dc(self, ctx):
+ ctx.userAccountControl = samba.dsdb.UF_SERVER_TRUST_ACCOUNT | samba.dsdb.UF_TRUSTED_FOR_DELEGATION
+ ctx.secure_channel_type = misc.SEC_CHAN_BDC
+ ctx.replica_flags = (drsuapi.DRSUAPI_DRS_WRIT_REP |
+ drsuapi.DRSUAPI_DRS_INIT_SYNC |
+ drsuapi.DRSUAPI_DRS_PER_SYNC |
+ drsuapi.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS |
+ drsuapi.DRSUAPI_DRS_NEVER_SYNCED)
+
+ ctx.join_add_objects()
+
+ def dc_spn_test(self, ctx):
+ netbiosdomain = self.dcctx.get_domain_name()
+ try:
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
+
+ mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
+ self.sd_utils.dacl_add_ace(ctx.acct_dn, mod)
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s" % (ctx.myname))
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
+ (ctx.myname, ctx.dnsdomain, netbiosdomain))
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, ctx.dnsdomain))
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
+ (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
+ (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, netbiosdomain))
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
+ (ctx.myname, ctx.dnsdomain, netbiosdomain))
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s" % (ctx.myname))
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, ctx.dnsdomain))
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
+ (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "DNS/%s/%s" % (ctx.myname, ctx.dnsdomain))
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s/%s" %
+ (ctx.myname, ctx.dnsdomain))
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s" %
+ (ctx.myname))
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
+ (ctx.myname, ctx.dnsdomain))
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
+ (ctx.myname, ctx.dnsdomain))
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s._msdcs.%s" %
+ (ctx.ntds_guid, ctx.dnsdomain))
+
+ #the following spns do not match the restrictions and should fail
+ try:
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/ForestDnsZones.%s" %
+ (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+ try:
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/DomainDnsZones.%s" %
+ (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+ try:
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+ try:
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
+ (ctx.myname, ctx.dnsdomain, netbiosdomain))
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+ try:
+ self.replace_spn(self.ldb_user1, ctx.acct_dn, "E3514235-4B06-11D1-AB04-00C04FC2DCD2/%s/%s" %
+ (ctx.ntds_guid, ctx.dnsdomain))
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+
+ def test_computer_spn(self):
+ # with WP, any value can be set
+ netbiosdomain = self.dcctx.get_domain_name()
+ self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
+ (self.computername, netbiosdomain))
+ self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s" % (self.computername))
+ self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
+ (self.computername, self.dcctx.dnsdomain, netbiosdomain))
+ self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
+ (self.computername, self.dcctx.dnsdomain))
+ self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
+ (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
+ self.replace_spn(self.ldb_admin, self.computerdn, "GC/%s.%s/%s" %
+ (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
+ self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
+ self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
+ (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
+ self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/DomainDnsZones.%s" %
+ (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
+ self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
+ (self.computername, self.dcctx.dnsdomain, netbiosdomain))
+ self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s" % (self.computername))
+ self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" %
+ (self.computername, self.dcctx.dnsdomain))
+ self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
+ (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
+ self.replace_spn(self.ldb_admin, self.computerdn, "DNS/%s/%s" %
+ (self.computername, self.dcctx.dnsdomain))
+ self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s/%s" %
+ (self.computername, self.dcctx.dnsdomain))
+ self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s" %
+ (self.computername))
+ self.replace_spn(self.ldb_admin, self.computerdn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
+ (self.computername, self.dcctx.dnsdomain))
+ self.replace_spn(self.ldb_admin, self.computerdn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
+ (self.computername, self.dcctx.dnsdomain))
+ self.replace_spn(self.ldb_admin, self.computerdn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
+
+ #user has neither WP nor Validated-SPN, access denied expected
+ try:
+ self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
+
+ mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
+ self.sd_utils.dacl_add_ace(self.computerdn, mod)
+ #grant Validated-SPN and check which values are accepted
+ #see 3.1.1.5.3.1.1.4 servicePrincipalName for reference
+
+ # for regular computer objects we shouldalways get constraint violation
+
+ # This does not pass against Windows, although it should according to docs
+ self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s" % (self.computername))
+ self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s" %
+ (self.computername, self.dcctx.dnsdomain))
+
+ try:
+ self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+ try:
+ self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
+ (self.computername, self.dcctx.dnsdomain, netbiosdomain))
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+ try:
+ self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" %
+ (self.computername, self.dcctx.dnsdomain))
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+ try:
+ self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
+ (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+ try:
+ self.replace_spn(self.ldb_user1, self.computerdn, "GC/%s.%s/%s" %
+ (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+ try:
+ self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+ try:
+ self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
+ (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+
+ def test_spn_rwdc(self):
+ self.dc_spn_test(self.dcctx)
+
+ def test_spn_rodc(self):
+ self.dc_spn_test(self.rodcctx)
+
+
# Important unit running information
-if not "://" in host:
- host = "ldap://%s" % host
-ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp)
+ldb = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
runner = SubunitTestRunner()
rc = 0
@@ -1592,6 +1852,6 @@ ldb.set_minPwdAge(minPwdAge)
if not runner.run(unittest.makeSuite(AclExtendedTests)).wasSuccessful():
rc = 1
-
-
+if not runner.run(unittest.makeSuite(AclSPNTests)).wasSuccessful():
+ rc = 1
sys.exit(rc)