diff options
Diffstat (limited to 'source4/lib/ldb/tests')
| -rwxr-xr-x | source4/lib/ldb/tests/python/acl.py | 189 | 
1 files changed, 69 insertions, 120 deletions
diff --git a/source4/lib/ldb/tests/python/acl.py b/source4/lib/ldb/tests/python/acl.py index 37265ef3d5..8a3f4cb019 100755 --- a/source4/lib/ldb/tests/python/acl.py +++ b/source4/lib/ldb/tests/python/acl.py @@ -20,6 +20,9 @@ from samba.dcerpc import security  from samba.auth import system_session  from samba import Ldb +from samba import gensec +from samba.samdb import SamDB +from samba.credentials import Credentials  from subunit.run import SubunitTestRunner  import unittest @@ -41,6 +44,7 @@ host = args[0]  lp = sambaopts.get_loadparm()  creds = credopts.get_credentials(lp) +creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)  #  # Tests start here @@ -70,13 +74,6 @@ class AclTests(unittest.TestCase):          self.domain_sid = self.find_domain_sid(self.ldb_admin)          self.user_pass = "samba123@"          print "baseDN: %s" % self.base_dn -        self.SAMBA = False; self.WIN = False -        res = self.ldb_admin.search(base="",expression="", scope=SCOPE_BASE, -                                    attrs=["vendorName"]) -        if res and "vendorName" in res[0].keys() and res[0]["vendorName"][0].find("Samba Team") != -1: -            self.SAMBA = True -        else: -            self.WIN = True      def get_user_dn(self, name):          return "CN=%s,CN=Users,%s" % (name, self.base_dn) @@ -96,15 +93,6 @@ replace: nTSecurityDescriptor          elif isinstance(desc, security.descriptor):              mod += "nTSecurityDescriptor:: %s" % base64.b64encode(ndr_pack(desc))          self.ldb_admin.modify_ldif(mod) -        return -        # Everything below is used in case of emergency or  -        # double modify verification of some sort -        assert(isinstance(desc, security.descriptor)) -        fn = "/tmp/tmpMod" -        f = open(fn, "w"); f.write(mod); f.close() -        cmd = "ldapmodify -x -h %s -D %s -w %s -f %s" \ -                % (host[7:], self.get_user_dn(creds.get_username()), creds.get_password(), fn) -        return os.system( cmd ) == 0      def add_group_member(self, _ldb, group_dn, member_dn):          """ Modify user to ge member of a group  @@ -132,7 +120,17 @@ url: www.example.com                  ldif += "nTSecurityDescriptor:: %s" % base64.b64encode(ndr_pack(desc))          _ldb.add_ldif(ldif) -    def create_user(self, _ldb, user_dn, desc=None): +    def create_active_user(self, _ldb, user_dn): +        ldif = """ +dn: """ + user_dn + """ +sAMAccountName: """ + user_dn.split(",")[0][3:] + """ +objectClass: user +unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """ +url: www.example.com +""" +        _ldb.add_ldif(ldif) + +    def create_test_user(self, _ldb, user_dn, desc=None):          ldif = """  dn: """ + user_dn + """  sAMAccountName: """ + user_dn.split(",")[0][3:] + """ @@ -169,33 +167,16 @@ url: www.example.com          desc = res[0]["nTSecurityDescriptor"][0]          return ndr_unpack( security.descriptor, desc ) -    def enable_account(self,  user_dn): -        """Enable an account. -        :param user_dn: Dn of the account to enable. -        """ -        res = self.ldb_admin.search(user_dn, SCOPE_BASE, None, ["userAccountControl"]) -        assert len(res) == 1 -        userAccountControl = res[0]["userAccountControl"][0] -        userAccountControl = int(userAccountControl) -        if (userAccountControl & 0x2): -            userAccountControl = userAccountControl & ~0x2 # remove disabled bit -        if (userAccountControl & 0x20): -            userAccountControl = userAccountControl & ~0x20 # remove 'no password required' bit -        mod = """ -dn: """ + user_dn + """ -changetype: modify -replace: userAccountControl -userAccountControl: %s""" % userAccountControl -        if self.WIN: -            mod = re.sub("userAccountControl: \d.*", "userAccountControl: 544", mod) -        self.ldb_admin.modify_ldif(mod) - -    def get_ldb_connection(self, target_username): -        username_save = creds.get_username(); password_save = creds.get_password() -        creds.set_username(target_username) -        creds.set_password(self.user_pass) -        ldb_target = Ldb(host, credentials=creds, session_info=system_session(), lp=lp) -        creds.set_username(username_save); creds.set_password(password_save) +    def get_ldb_connection(self, target_username, target_password): +        creds_tmp = Credentials() +        creds_tmp.set_username(target_username) +        creds_tmp.set_password(target_password) +        creds_tmp.set_domain(creds.get_domain()) +        creds_tmp.set_realm(creds.get_realm()) +        creds_tmp.set_workstation(creds.get_workstation()) +        creds_tmp.set_gensec_features(creds_tmp.get_gensec_features() +                                      | gensec.FEATURE_SEAL) +        ldb_target = SamDB(url=host, credentials=creds_tmp, lp=lp)          return ldb_target      def get_object_sid(self, object_dn): @@ -231,8 +212,8 @@ userAccountControl: %s""" % userAccountControl              self.fail()      def create_enable_user(self, username): -        self.create_user(self.ldb_admin, self.get_user_dn(username)) -        self.enable_account(self.get_user_dn(username)) +        self.create_active_user(self.ldb_admin, self.get_user_dn(username)) +        self.ldb_admin.enable_account("(sAMAccountName=" + username + ")")  #tests on ldap add operations  class AclAddTests(AclTests): @@ -244,15 +225,9 @@ class AclAddTests(AclTests):          self.usr_admin_not_owner = "acl_add_user2"          # Regular user          self.regular_user = "acl_add_user3" -        if self.SAMBA: -            self.create_enable_user(self.usr_admin_owner) -            self.create_enable_user(self.usr_admin_not_owner) -            self.create_enable_user(self.regular_user) - -        if self.WIN: -            self.assert_user_no_group_member(self.usr_admin_owner) -            self.assert_user_no_group_member(self.usr_admin_not_owner) -            self.assert_user_no_group_member(self.regular_user) +        self.create_enable_user(self.usr_admin_owner) +        self.create_enable_user(self.usr_admin_not_owner) +        self.create_enable_user(self.regular_user)          # add admins to the Domain Admins group          self.add_group_member(self.ldb_admin, "CN=Domain Admins,CN=Users," + self.base_dn, \ @@ -260,19 +235,18 @@ class AclAddTests(AclTests):          self.add_group_member(self.ldb_admin, "CN=Domain Admins,CN=Users," + self.base_dn, \                  self.get_user_dn(self.usr_admin_not_owner)) -        self.ldb_owner = self.get_ldb_connection(self.usr_admin_owner) -        self.ldb_notowner = self.get_ldb_connection(self.usr_admin_not_owner) -        self.ldb_user = self.get_ldb_connection(self.regular_user) +        self.ldb_owner = self.get_ldb_connection(self.usr_admin_owner, self.user_pass) +        self.ldb_notowner = self.get_ldb_connection(self.usr_admin_not_owner, self.user_pass) +        self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)      def tearDown(self):          self.delete_force(self.ldb_admin, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)          self.delete_force(self.ldb_admin, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)          self.delete_force(self.ldb_admin, "OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)          self.delete_force(self.ldb_admin, "OU=test_add_ou1," + self.base_dn) -        if self.SAMBA: -            self.delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner)) -            self.delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner)) -            self.delete_force(self.ldb_admin, self.get_user_dn(self.regular_user)) +        self.delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner)) +        self.delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner)) +        self.delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))      # Make sure top OU is deleted (and so everything under it)      def assert_top_ou_deleted(self): @@ -290,7 +264,7 @@ class AclAddTests(AclTests):          mod = "(D;CI;WPCC;;;%s)" % str(user_sid)          self.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)          # Test user and group creation with another domain admin's credentials -        self.create_user(self.ldb_notowner, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn) +        self.create_test_user(self.ldb_notowner, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)          self.create_group(self.ldb_notowner, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)          # Make sure we HAVE created the two objects -- user and group          # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE @@ -310,7 +284,7 @@ class AclAddTests(AclTests):          self.create_ou(self.ldb_owner, "OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)          # Test user and group creation with regular user credentials          try: -            self.create_user(self.ldb_user, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn) +            self.create_test_user(self.ldb_user, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)              self.create_group(self.ldb_user, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)          except LdbError, (num, _):              self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS) @@ -334,7 +308,7 @@ class AclAddTests(AclTests):          self.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)          self.create_ou(self.ldb_owner, "OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)          # Test user and group creation with granted user only to one of the objects -        self.create_user(self.ldb_user, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn) +        self.create_test_user(self.ldb_user, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)          try:              self.create_group(self.ldb_user, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)          except LdbError, (num, _): @@ -354,7 +328,7 @@ class AclAddTests(AclTests):          self.assert_top_ou_deleted()          self.create_ou(self.ldb_owner, "OU=test_add_ou1," + self.base_dn)          self.create_ou(self.ldb_owner, "OU=test_add_ou2,OU=test_add_ou1," + self.base_dn) -        self.create_user(self.ldb_owner, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn) +        self.create_test_user(self.ldb_owner, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)          self.create_group(self.ldb_owner, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)          # Make sure we have successfully created the two objects -- user and group          res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s,%s)" \ @@ -370,22 +344,15 @@ class AclModifyTests(AclTests):      def setUp(self):          AclTests.setUp(self)          self.user_with_wp = "acl_mod_user1" - -        if self.SAMBA: -            # Create regular user -            self.create_enable_user(self.user_with_wp) -        if self.WIN: -            self.assert_user_no_group_member(self.user_with_wp) - -        self.ldb_user = self.get_ldb_connection(self.user_with_wp) +        self.create_enable_user(self.user_with_wp) +        self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)          self.user_sid = self.get_object_sid( self.get_user_dn(self.user_with_wp))      def tearDown(self):          self.delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))          self.delete_force(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)          self.delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn) -        if self.SAMBA: -            self.delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp)) +        self.delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))      def test_modify_u1(self):          """5 Modify one attribute if you have DS_WRITE_PROPERTY for it""" @@ -393,7 +360,7 @@ class AclModifyTests(AclTests):          # First test object -- User          print "Testing modify on User object"          #self.delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1")) -        self.create_user(self.ldb_admin, self.get_user_dn("test_modify_user1")) +        self.create_test_user(self.ldb_admin, self.get_user_dn("test_modify_user1"))          self.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)          ldif = """  dn: """ + self.get_user_dn("test_modify_user1") + """ @@ -439,7 +406,7 @@ displayName: test_changed"""          # First test object -- User          print "Testing modify on User object"          #self.delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1")) -        self.create_user(self.ldb_admin, self.get_user_dn("test_modify_user1")) +        self.create_test_user(self.ldb_admin, self.get_user_dn("test_modify_user1"))          self.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)          # Modify on attribute you have rights for          ldif = """ @@ -521,7 +488,7 @@ url: www.samba.org"""          """7 Modify one attribute as you have no what so ever rights granted"""          # First test object -- User          print "Testing modify on User object" -        self.create_user(self.ldb_admin, self.get_user_dn("test_modify_user1")) +        self.create_test_user(self.ldb_admin, self.get_user_dn("test_modify_user1"))          # Modify on attribute you do not have rights for granted          ldif = """  dn: """ + self.get_user_dn("test_modify_user1") + """ @@ -602,20 +569,13 @@ class AclSearchTests(AclTests):      def setUp(self):          AclTests.setUp(self)          self.regular_user = "acl_search_user1" - -        if self.SAMBA: -            # Create regular user -            self.create_enable_user(self.regular_user) -        if self.WIN: -            self.assert_user_no_group_member(self.regular_user) - -        self.ldb_user = self.get_ldb_connection(self.regular_user) +        self.create_enable_user(self.regular_user) +        self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)      def tearDown(self):          self.delete_force(self.ldb_admin, "CN=test_search_user1,OU=test_search_ou1," + self.base_dn)          self.delete_force(self.ldb_admin, "OU=test_search_ou1," + self.base_dn) -        if self.SAMBA: -            self.delete_force(self.ldb_admin, self.get_user_dn(self.regular_user)) +        self.delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))      def test_search_u1(self):          """See if can prohibit user to read another User object""" @@ -640,7 +600,7 @@ class AclSearchTests(AclTests):          self.create_ou(self.ldb_admin, ou_dn, desc)          # Create clean user          self.delete_force(self.ldb_admin, user_dn) -        self.create_user(self.ldb_admin, user_dn) +        self.create_test_user(self.ldb_admin, user_dn)          desc = self.read_desc( user_dn )          desc_sddl = desc.as_sddl( self.domain_sid )          # Parse security descriptor SDDL and remove all 'Read' ACEs @@ -652,7 +612,7 @@ class AclSearchTests(AclTests):          # Create user with the edited descriptor          desc = security.descriptor.from_sddl( desc_sddl, self.domain_sid )          self.delete_force(self.ldb_admin, user_dn) -        self.create_user(self.ldb_admin, user_dn, desc) +        self.create_test_user(self.ldb_admin, user_dn, desc)          res = ldb_user.search( self.base_dn, expression="(distinguishedName=%s)" \                                      % user_dn ) @@ -681,7 +641,7 @@ class AclSearchTests(AclTests):          self.create_ou(self.ldb_admin, ou_dn, desc)          # Create clean user          self.delete_force(self.ldb_admin, user_dn) -        self.create_user(self.ldb_admin, user_dn) +        self.create_test_user(self.ldb_admin, user_dn)          # Parse security descriptor SDDL and remove all 'Read' ACEs          # reffering to AU          desc_aces = re.findall("\(.*?\)", desc_sddl) @@ -701,24 +661,18 @@ class AclDeleteTests(AclTests):      def setUp(self):          AclTests.setUp(self)          self.regular_user = "acl_delete_user1" - -        if self.SAMBA:              # Create regular user -            self.create_enable_user(self.regular_user) -        if self.WIN: -            self.assert_user_no_group_member(self.regular_user) - -        self.ldb_user = self.get_ldb_connection(self.regular_user) +        self.create_enable_user(self.regular_user) +        self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)      def tearDown(self):          self.delete_force(self.ldb_admin, self.get_user_dn("test_delete_user1")) -        if self.SAMBA: -            self.delete_force(self.ldb_admin, self.get_user_dn(self.regular_user)) +        self.delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))      def test_delete_u1(self):          """User is prohibited by default to delete another User object"""          # Create user that we try to delete -        self.create_user(self.ldb_admin, self.get_user_dn("test_delete_user1")) +        self.create_test_user(self.ldb_admin, self.get_user_dn("test_delete_user1"))          # Here delete User object should ALWAYS through exception          try:              self.ldb_user.delete(self.get_user_dn("test_delete_user1")) @@ -731,7 +685,7 @@ class AclDeleteTests(AclTests):          """User's group has RIGHT_DELETE to another User object"""          user_dn = self.get_user_dn("test_delete_user1")          # Create user that we try to delete -        self.create_user(self.ldb_admin, user_dn) +        self.create_test_user(self.ldb_admin, user_dn)          mod = "(A;;SD;;;AU)"          self.dacl_add_ace(user_dn, mod)          # Try to delete User object @@ -744,7 +698,7 @@ class AclDeleteTests(AclTests):          """User indentified by SID has RIGHT_DELETE to another User object"""          user_dn = self.get_user_dn("test_delete_user1")          # Create user that we try to delete -        self.create_user(self.ldb_admin, user_dn) +        self.create_test_user(self.ldb_admin, user_dn)          mod = "(A;;SD;;;%s)" % str( self.get_object_sid(self.get_user_dn(self.regular_user)))          self.dacl_add_ace(user_dn, mod)          # Try to delete User object @@ -760,13 +714,9 @@ class AclRenameTests(AclTests):          AclTests.setUp(self)          self.regular_user = "acl_rename_user1" -        if self.SAMBA:              # Create regular user -            self.create_enable_user(self.regular_user) -        if self.WIN: -            self.assert_user_no_group_member(self.regular_user) - -        self.ldb_user = self.get_ldb_connection(self.regular_user) +        self.create_enable_user(self.regular_user) +        self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)      def tearDown(self):          # Rename OU3 @@ -785,14 +735,13 @@ class AclRenameTests(AclTests):          self.delete_force(self.ldb_admin, "CN=test_rename_user5,OU=test_rename_ou1," + self.base_dn)          self.delete_force(self.ldb_admin, "OU=test_rename_ou3,OU=test_rename_ou1," + self.base_dn)          self.delete_force(self.ldb_admin, "OU=test_rename_ou1," + self.base_dn) -        if self.SAMBA: -            self.delete_force(self.ldb_admin, self.get_user_dn(self.regular_user)) +        self.delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))      def test_rename_u1(self):          """Regular user fails to rename 'User object' within single OU"""          # Create OU structure          self.create_ou(self.ldb_admin, "OU=test_rename_ou1," + self.base_dn) -        self.create_user(self.ldb_admin, "CN=test_rename_user1,OU=test_rename_ou1," + self.base_dn) +        self.create_test_user(self.ldb_admin, "CN=test_rename_user1,OU=test_rename_ou1," + self.base_dn)          try:              self.ldb_user.rename("CN=test_rename_user1,OU=test_rename_ou1," + self.base_dn, \                      "CN=test_rename_user5,OU=test_rename_ou1," + self.base_dn) @@ -808,7 +757,7 @@ class AclRenameTests(AclTests):          rename_user_dn = "CN=test_rename_user5," + ou_dn          # Create OU structure          self.create_ou(self.ldb_admin, ou_dn) -        self.create_user(self.ldb_admin, user_dn) +        self.create_test_user(self.ldb_admin, user_dn)          mod = "(A;;WP;;;AU)"          self.dacl_add_ace(user_dn, mod)          # Rename 'User object' having WP to AU @@ -827,7 +776,7 @@ class AclRenameTests(AclTests):          rename_user_dn = "CN=test_rename_user5," + ou_dn          # Create OU structure          self.create_ou(self.ldb_admin, ou_dn) -        self.create_user(self.ldb_admin, user_dn) +        self.create_test_user(self.ldb_admin, user_dn)          sid = self.get_object_sid(self.get_user_dn(self.regular_user))          mod = "(A;;WP;;;%s)" % str(sid)          self.dacl_add_ace(user_dn, mod) @@ -849,7 +798,7 @@ class AclRenameTests(AclTests):          # Create OU structure          self.create_ou(self.ldb_admin, ou1_dn)          self.create_ou(self.ldb_admin, ou2_dn) -        self.create_user(self.ldb_admin, user_dn) +        self.create_test_user(self.ldb_admin, user_dn)          mod = "(A;;WPSD;;;AU)"          self.dacl_add_ace(user_dn, mod)          mod = "(A;;CC;;;AU)" @@ -872,7 +821,7 @@ class AclRenameTests(AclTests):          # Create OU structure          self.create_ou(self.ldb_admin, ou1_dn)          self.create_ou(self.ldb_admin, ou2_dn) -        self.create_user(self.ldb_admin, user_dn) +        self.create_test_user(self.ldb_admin, user_dn)          sid = self.get_object_sid(self.get_user_dn(self.regular_user))          mod = "(A;;WPSD;;;%s)" % str(sid)          self.dacl_add_ace(user_dn, mod) @@ -901,7 +850,7 @@ class AclRenameTests(AclTests):          self.dacl_add_ace(ou1_dn, mod)          mod = "(A;;CC;;;AU)"          self.dacl_add_ace(ou2_dn, mod) -        self.create_user(self.ldb_admin, user_dn) +        self.create_test_user(self.ldb_admin, user_dn)          mod = "(A;;WP;;;AU)"          self.dacl_add_ace(user_dn, mod)          # Rename 'User object' having SD and CC to AU @@ -928,7 +877,7 @@ class AclRenameTests(AclTests):          self.dacl_add_ace(ou1_dn, mod)          mod = "(A;;CC;;;AU)"          self.dacl_add_ace(ou3_dn, mod) -        self.create_user(self.ldb_admin, user_dn) +        self.create_test_user(self.ldb_admin, user_dn)          # Rename 'User object' having SD and CC to AU          self.ldb_user.rename(user_dn, rename_user_dn)          res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \ @@ -973,7 +922,7 @@ class AclRenameTests(AclTests):  if not "://" in host:      host = "ldap://%s" % host -ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp) +ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp)  runner = SubunitTestRunner()  rc = 0  | 
