From 935b985d23f84738259a42cbcd889fa6022d5d65 Mon Sep 17 00:00:00 2001 From: Nadezhda Ivanova Date: Wed, 22 Dec 2010 12:27:50 +0200 Subject: s4-tests: Tests for Validated-SPN implementation. Test setting spn on RWDC, RODC and regular computer object. Autobuild-User: Nadezhda Ivanova Autobuild-Date: Wed Dec 22 12:20:24 CET 2010 on sn-devel-104 --- source4/dsdb/tests/python/acl.py | 288 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 274 insertions(+), 14 deletions(-) (limited to 'source4/dsdb/tests/python') diff --git a/source4/dsdb/tests/python/acl.py b/source4/dsdb/tests/python/acl.py index 12f653b2f2..977221fe11 100755 --- a/source4/dsdb/tests/python/acl.py +++ b/source4/dsdb/tests/python/acl.py @@ -21,10 +21,10 @@ from ldb import ERR_OPERATIONS_ERROR from ldb import Message, MessageElement, Dn from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD, FLAG_MOD_DELETE from samba.ndr import ndr_pack, ndr_unpack -from samba.dcerpc import security +from samba.dcerpc import security, drsuapi, misc, netlogon, nbt from samba.auth import system_session -from samba import gensec, sd_utils +from samba import gensec, sd_utils, join from samba.samdb import SamDB from samba.credentials import Credentials import samba.tests @@ -47,6 +47,12 @@ if len(args) < 1: sys.exit(1) host = args[0] +if not "://" in host: + ldaphost = "ldap://%s" % host +else: + ldaphost = host + start = host.rindex("://") + host = host.lstrip(start+3) lp = sambaopts.get_loadparm() creds = credopts.get_credentials(lp) @@ -87,7 +93,7 @@ class AclTests(samba.tests.TestCase): creds_tmp.set_workstation(creds.get_workstation()) creds_tmp.set_gensec_features(creds_tmp.get_gensec_features() | gensec.FEATURE_SEAL) - ldb_target = SamDB(url=host, credentials=creds_tmp, lp=lp) + ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp) return ldb_target # Test if we have any additional groups for users than default ones @@ -238,7 +244,7 @@ class AclAddTests(AclTests): def test_add_anonymous(self): """Test add operation with anonymous user""" - anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp) + anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp) try: anonymous.newuser("test_add_anonymous", self.user_pass) except LdbError, (num, _): @@ -574,7 +580,7 @@ Member: CN=test_modify_user2,CN=Users,""" + self.base_dn def test_modify_anonymous(self): """Test add operation with anonymous user""" - anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp) + anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp) self.ldb_admin.newuser("test_anonymous", "samba123@") m = Message() m.dn = Dn(anonymous, self.get_user_dn("test_anonymous")) @@ -658,7 +664,7 @@ class AclSearchTests(AclTests): def test_search_anonymous1(self): """Verify access of rootDSE with the correct request""" - anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp) + anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp) res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE) self.assertEquals(len(res), 1) #verify some of the attributes @@ -674,7 +680,7 @@ class AclSearchTests(AclTests): def test_search_anonymous2(self): """Make sure we cannot access anything else""" - anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp) + anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp) try: res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_SUBTREE) except LdbError, (num, _): @@ -702,7 +708,7 @@ class AclSearchTests(AclTests): mod = "(A;CI;LC;;;AN)" self.sd_utils.dacl_add_ace("OU=test_search_ou1," + self.base_dn, mod) self.ldb_admin.create_ou("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn) - anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp) + anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp) res = anonymous.search("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE) self.assertEquals(len(res), 1) @@ -997,7 +1003,7 @@ class AclDeleteTests(AclTests): def test_delete_anonymous(self): """Test add operation with anonymous user""" - anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp) + anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp) self.ldb_admin.newuser("test_anonymous", "samba123@") try: @@ -1556,11 +1562,265 @@ class AclExtendedTests(AclTests): self.assertEqual(len(res),1) self.assertTrue("nTSecurityDescriptor" in res[0].keys()) +class AclSPNTests(AclTests): + + def setUp(self): + super(AclSPNTests, self).setUp() + self.dcname = "TESTSRV8" + self.rodcname = "TESTRODC8" + self.computername = "testcomp8" + self.test_user = "spn_test_user8" + self.computerdn = "CN=%s,CN=computers,%s" % (self.computername, self.base_dn) + self.dc_dn = "CN=%s,OU=Domain Controllers,%s" % (self.dcname, self.base_dn) + self.site = "Default-First-Site-Name" + self.rodcctx = samba.join.dc_join(server=host, creds=creds, lp=lp, site=self.site, netbios_name=self.rodcname, + targetdir=None, domain=None) + self.dcctx = samba.join.dc_join(server=host, creds=creds, lp=lp, site=self.site, netbios_name=self.dcname, + targetdir=None, domain=None) + self.ldb_admin.newuser(self.test_user, self.user_pass) + self.ldb_user1 = self.get_ldb_connection(self.test_user, self.user_pass) + self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.test_user)) + self.create_computer(self.computername, self.dcctx.dnsdomain) + self.create_rodc(self.rodcctx) + self.create_dc(self.dcctx) + + def tearDown(self): + super(AclSPNTests, self).tearDown() + self.rodcctx.cleanup_old_join() + self.dcctx.cleanup_old_join() + delete_force(self.ldb_admin, "cn=%s,cn=computers,%s" % (self.computername, self.base_dn)) + delete_force(self.ldb_admin, self.get_user_dn(self.test_user)) + + def replace_spn(self, _ldb, dn, spn): + print "Setting spn %s on %s" % (spn, dn) + res = self.ldb_admin.search(dn, expression="(objectClass=*)", + scope=SCOPE_BASE, attrs=["servicePrincipalName"]) + if "servicePrincipalName" in res[0].keys(): + flag = FLAG_MOD_REPLACE + else: + flag = FLAG_MOD_ADD + + msg = Message() + msg.dn = Dn(self.ldb_admin, dn) + msg["servicePrincipalName"] = MessageElement(spn, flag, + "servicePrincipalName") + _ldb.modify(msg) + + def create_computer(self, computername, domainname): + dn = "CN=%s,CN=computers,%s" % (computername, self.base_dn) + samaccountname = computername + "$" + dnshostname = "%s.%s" % (computername, domainname) + self.ldb_admin.add({ + "dn": dn, + "objectclass": "computer", + "sAMAccountName": samaccountname, + "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT), + "dNSHostName": dnshostname}) + + # same as for join_RODC, but do not set any SPNs + def create_rodc(self, ctx): + ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn) + + ctx.never_reveal_sid = [ "" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY), + "" % security.SID_BUILTIN_ADMINISTRATORS, + "" % security.SID_BUILTIN_SERVER_OPERATORS, + "" % security.SID_BUILTIN_BACKUP_OPERATORS, + "" % security.SID_BUILTIN_ACCOUNT_OPERATORS ] + ctx.reveal_sid = "" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW) + + mysid = ctx.get_mysid() + admin_dn = "" % mysid + ctx.managedby = admin_dn + + ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT | + samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION | + samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT) + + ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn + ctx.secure_channel_type = misc.SEC_CHAN_RODC + ctx.RODC = True + ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC | + drsuapi.DRSUAPI_DRS_PER_SYNC | + drsuapi.DRSUAPI_DRS_GET_ANC | + drsuapi.DRSUAPI_DRS_NEVER_SYNCED | + drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING) + + ctx.join_add_objects() + + def create_dc(self, ctx): + ctx.userAccountControl = samba.dsdb.UF_SERVER_TRUST_ACCOUNT | samba.dsdb.UF_TRUSTED_FOR_DELEGATION + ctx.secure_channel_type = misc.SEC_CHAN_BDC + ctx.replica_flags = (drsuapi.DRSUAPI_DRS_WRIT_REP | + drsuapi.DRSUAPI_DRS_INIT_SYNC | + drsuapi.DRSUAPI_DRS_PER_SYNC | + drsuapi.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS | + drsuapi.DRSUAPI_DRS_NEVER_SYNCED) + + ctx.join_add_objects() + + def dc_spn_test(self, ctx): + netbiosdomain = self.dcctx.get_domain_name() + try: + self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain)) + except LdbError, (num, _): + self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) + + mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1) + self.sd_utils.dacl_add_ace(ctx.acct_dn, mod) + self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain)) + self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s" % (ctx.myname)) + self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" % + (ctx.myname, ctx.dnsdomain, netbiosdomain)) + self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, ctx.dnsdomain)) + self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" % + (ctx.myname, ctx.dnsdomain, ctx.dnsdomain)) + self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" % + (ctx.myname, ctx.dnsdomain, ctx.dnsdomain)) + self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, netbiosdomain)) + self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" % + (ctx.myname, ctx.dnsdomain, netbiosdomain)) + self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s" % (ctx.myname)) + self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, ctx.dnsdomain)) + self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" % + (ctx.myname, ctx.dnsdomain, ctx.dnsdomain)) + self.replace_spn(self.ldb_user1, ctx.acct_dn, "DNS/%s/%s" % (ctx.myname, ctx.dnsdomain)) + self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s/%s" % + (ctx.myname, ctx.dnsdomain)) + self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s" % + (ctx.myname)) + self.replace_spn(self.ldb_user1, ctx.acct_dn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" % + (ctx.myname, ctx.dnsdomain)) + self.replace_spn(self.ldb_user1, ctx.acct_dn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" % + (ctx.myname, ctx.dnsdomain)) + self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s._msdcs.%s" % + (ctx.ntds_guid, ctx.dnsdomain)) + + #the following spns do not match the restrictions and should fail + try: + self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/ForestDnsZones.%s" % + (ctx.myname, ctx.dnsdomain, ctx.dnsdomain)) + except LdbError, (num, _): + self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) + try: + self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/DomainDnsZones.%s" % + (ctx.myname, ctx.dnsdomain, ctx.dnsdomain)) + except LdbError, (num, _): + self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) + try: + self.replace_spn(self.ldb_user1, ctx.acct_dn, "nosuchservice/%s/%s" % ("abcd", "abcd")) + except LdbError, (num, _): + self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) + try: + self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" % + (ctx.myname, ctx.dnsdomain, netbiosdomain)) + except LdbError, (num, _): + self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) + try: + self.replace_spn(self.ldb_user1, ctx.acct_dn, "E3514235-4B06-11D1-AB04-00C04FC2DCD2/%s/%s" % + (ctx.ntds_guid, ctx.dnsdomain)) + except LdbError, (num, _): + self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) + + def test_computer_spn(self): + # with WP, any value can be set + netbiosdomain = self.dcctx.get_domain_name() + self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" % + (self.computername, netbiosdomain)) + self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s" % (self.computername)) + self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" % + (self.computername, self.dcctx.dnsdomain, netbiosdomain)) + self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" % + (self.computername, self.dcctx.dnsdomain)) + self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" % + (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain)) + self.replace_spn(self.ldb_admin, self.computerdn, "GC/%s.%s/%s" % + (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain)) + self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain)) + self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" % + (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain)) + self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/DomainDnsZones.%s" % + (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain)) + self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" % + (self.computername, self.dcctx.dnsdomain, netbiosdomain)) + self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s" % (self.computername)) + self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" % + (self.computername, self.dcctx.dnsdomain)) + self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" % + (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain)) + self.replace_spn(self.ldb_admin, self.computerdn, "DNS/%s/%s" % + (self.computername, self.dcctx.dnsdomain)) + self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s/%s" % + (self.computername, self.dcctx.dnsdomain)) + self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s" % + (self.computername)) + self.replace_spn(self.ldb_admin, self.computerdn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" % + (self.computername, self.dcctx.dnsdomain)) + self.replace_spn(self.ldb_admin, self.computerdn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" % + (self.computername, self.dcctx.dnsdomain)) + self.replace_spn(self.ldb_admin, self.computerdn, "nosuchservice/%s/%s" % ("abcd", "abcd")) + + #user has neither WP nor Validated-SPN, access denied expected + try: + self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain)) + except LdbError, (num, _): + self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) + + mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1) + self.sd_utils.dacl_add_ace(self.computerdn, mod) + #grant Validated-SPN and check which values are accepted + #see 3.1.1.5.3.1.1.4 servicePrincipalName for reference + + # for regular computer objects we shouldalways get constraint violation + + # This does not pass against Windows, although it should according to docs + self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s" % (self.computername)) + self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s" % + (self.computername, self.dcctx.dnsdomain)) + + try: + self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain)) + except LdbError, (num, _): + self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) + try: + self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" % + (self.computername, self.dcctx.dnsdomain, netbiosdomain)) + except LdbError, (num, _): + self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) + try: + self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % + (self.computername, self.dcctx.dnsdomain)) + except LdbError, (num, _): + self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) + try: + self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" % + (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain)) + except LdbError, (num, _): + self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) + try: + self.replace_spn(self.ldb_user1, self.computerdn, "GC/%s.%s/%s" % + (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain)) + except LdbError, (num, _): + self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) + try: + self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain)) + except LdbError, (num, _): + self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) + try: + self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" % + (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain)) + except LdbError, (num, _): + self.assertEquals(num, ERR_CONSTRAINT_VIOLATION) + + def test_spn_rwdc(self): + self.dc_spn_test(self.dcctx) + + def test_spn_rodc(self): + self.dc_spn_test(self.rodcctx) + + # Important unit running information -if not "://" in host: - host = "ldap://%s" % host -ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp) +ldb = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp) runner = SubunitTestRunner() rc = 0 @@ -1592,6 +1852,6 @@ ldb.set_minPwdAge(minPwdAge) if not runner.run(unittest.makeSuite(AclExtendedTests)).wasSuccessful(): rc = 1 - - +if not runner.run(unittest.makeSuite(AclSPNTests)).wasSuccessful(): + rc = 1 sys.exit(rc) -- cgit