diff options
Diffstat (limited to 'source4/scripting/python')
5 files changed, 78 insertions, 61 deletions
diff --git a/source4/scripting/python/samba/provision.py b/source4/scripting/python/samba/provision.py index 5aaa833030..873be6730d 100644 --- a/source4/scripting/python/samba/provision.py +++ b/source4/scripting/python/samba/provision.py @@ -81,6 +81,7 @@ def find_setup_dir():  # hard coded at this point, but will probably be changed when  # we enable different fsmo roles +  def get_config_descriptor(domain_sid):      sddl = "O:EAG:EAD:(OA;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;ED)" \             "(OA;;CR;1131f6ab-9c07-11d1-f79f-00c04fc2dcd2;;ED)" \ @@ -192,8 +193,10 @@ class ProvisionNames(object):          self.sitename = None          self.smbconf = None -def updateProvisionUSN(samdb, low, high, replace = 0): + +def update_provision_usn(samdb, low, high, replace=False):      """Update the field provisionUSN in sam.ldb +      This field is used to track range of USN modified by provision and       upgradeprovision.      This value is used afterward by next provision to figure out if  @@ -203,26 +206,28 @@ def updateProvisionUSN(samdb, low, high, replace = 0):      :param low: The lowest USN modified by this upgrade      :param high: The highest USN modified by this upgrade      :param replace: A boolean indicating if the range should replace any  -                    existing one or appended (default)""" +                    existing one or appended (default) +    """      tab = []      if not replace:          entry = samdb.search(expression="(&(dn=@PROVISION)(%s=*))" % \                                  LAST_PROVISION_USN_ATTRIBUTE, base="",                                   scope=ldb.SCOPE_SUBTREE, -                                attrs=[LAST_PROVISION_USN_ATTRIBUTE,"dn"]) +                                attrs=[LAST_PROVISION_USN_ATTRIBUTE, "dn"])          for e in entry[0][LAST_PROVISION_USN_ATTRIBUTE]:              tab.append(str(e)) -    tab.append("%s-%s"%(str(low), str(high))) +    tab.append("%s-%s" % (low, high))      delta = ldb.Message() -    delta.dn = ldb.Dn(samdb,"@PROVISION") +    delta.dn = ldb.Dn(samdb, "@PROVISION")      delta[LAST_PROVISION_USN_ATTRIBUTE] = ldb.MessageElement(tab,                                                      ldb.FLAG_MOD_REPLACE,                                                      LAST_PROVISION_USN_ATTRIBUTE)      samdb.modify(delta) -def setProvisionUSN(samdb, low, high): + +def set_provision_usn(samdb, low, high):      """Set the field provisionUSN in sam.ldb      This field is used to track range of USN modified by provision and      upgradeprovision. @@ -233,14 +238,15 @@ def setProvisionUSN(samdb, low, high):      :param low: The lowest USN modified by this upgrade      :param high: The highest USN modified by this upgrade"""      tab = [] -    tab.append("%s-%s"%(str(low), str(high))) +    tab.append("%s-%s" % (low, high))      delta = ldb.Message() -    delta.dn = ldb.Dn(samdb,"@PROVISION") +    delta.dn = ldb.Dn(samdb, "@PROVISION")      delta[LAST_PROVISION_USN_ATTRIBUTE] = ldb.MessageElement(tab,                                                    ldb.FLAG_MOD_ADD,                                                    LAST_PROVISION_USN_ATTRIBUTE)      samdb.add(delta) +  def get_max_usn(samdb,basedn):      """ This function return the biggest USN present in the provision @@ -256,7 +262,7 @@ def get_max_usn(samdb,basedn):                                     "paged_results:1:1"])      return res[0]["uSNChanged"] -def getLastProvisionUSN(sam): +def get_last_provision_usn(sam):      """Get the lastest USN modified by a provision or an upgradeprovision      :param sam: An LDB object pointing to the sam.ldb @@ -541,7 +547,7 @@ def make_smbconf(smbconf, setup_path, hostname, domain, realm, serverrole,              privdir = os.path.join(targetdir, "private")          else:              privdir = default_lp.get("private dir") -        posixeadb_line = "posix:eadb = " + os.path.abspath(os.path.join(privdir,"eadb.tdb")) +        posixeadb_line = "posix:eadb = " + os.path.abspath(os.path.join(privdir, "eadb.tdb"))      else:          posixeadb_line = "" @@ -1159,7 +1165,7 @@ def set_gpo_acl(sysvol, dnsdomain, domainsid, domaindn, samdb, lp):      set_dir_acl(policy_path,dsacl2fsacl(POLICIES_ACL, str(domainsid)),           lp, str(domainsid))      res = samdb.search(base="CN=Policies,CN=System,%s"%(domaindn), -                        attrs=["cn","nTSecurityDescriptor"], +                        attrs=["cn", "nTSecurityDescriptor"],                          expression="", scope=ldb.SCOPE_ONELEVEL)      for policy in res:          acl = ndr_unpack(security.descriptor,  @@ -1322,8 +1328,8 @@ def provision(setup_dir, logger, session_info,      if not os.path.exists(paths.private_dir):          os.mkdir(paths.private_dir) -    if not os.path.exists(os.path.join(paths.private_dir,"tls")): -        os.mkdir(os.path.join(paths.private_dir,"tls")) +    if not os.path.exists(os.path.join(paths.private_dir, "tls")): +        os.mkdir(os.path.join(paths.private_dir, "tls"))      ldapi_url = "ldapi://%s" % urllib.quote(paths.s4_ldapi_path, safe="") @@ -1489,12 +1495,12 @@ def provision(setup_dir, logger, session_info,              logger.info("A Kerberos configuration suitable for Samba 4 has been "                      "generated at %s", paths.krb5conf) -        lastProvisionUSNs = getLastProvisionUSN(samdb) +        lastProvisionUSNs = get_last_provision_usn(samdb)          maxUSN = get_max_usn(samdb, str(names.rootdn))          if lastProvisionUSNs != None: -            updateProvisionUSN(samdb, 0, maxUSN, 1) +            update_provision_usn(samdb, 0, maxUSN, 1)          else: -            setProvisionUSN(samdb, 0, maxUSN) +            set_provision_usn(samdb, 0, maxUSN)      if serverrole == "domain controller":          create_dns_update_list(lp, logger, paths, setup_path) @@ -1545,7 +1551,6 @@ def provision(setup_dir, logger, session_info,              logger.info("This slapd-Commandline is also stored under: %s/ldap_backend_startup.sh",                       provision_backend.ldapdir) -      result = ProvisionResult()      result.domaindn = domaindn      result.paths = paths @@ -1708,6 +1713,7 @@ def create_named_conf(paths, setup_path, realm, dnsdomain,      setup_file(setup_path("named.conf.update"), paths.namedconf_update) +  def create_named_txt(path, setup_path, realm, dnsdomain,                        private_dir, keytab_name):      """Write out a file containing zone statements suitable for inclusion in a @@ -1729,6 +1735,7 @@ def create_named_txt(path, setup_path, realm, dnsdomain,              "PRIVATE_DIR": private_dir          }) +  def create_krb5_conf(path, setup_path, dnsdomain, hostname, realm):      """Write out a file containing zone statements suitable for inclusion in a      named.conf file (including GSS-TSIG configuration). diff --git a/source4/scripting/python/samba/tests/provision.py b/source4/scripting/python/samba/tests/provision.py index b5cc57c4d9..aa4de660a0 100644 --- a/source4/scripting/python/samba/tests/provision.py +++ b/source4/scripting/python/samba/tests/provision.py @@ -40,9 +40,11 @@ def create_dummy_secretsdb(path, lp=None):      secrets_ldb.transaction_commit()      return secrets_ldb +  class ProvisionTestCase(samba.tests.TestCaseInTempDir):      """Some simple tests for individual functions in the provisioning code.      """ +      def test_setup_secretsdb(self):          path = os.path.join(self.tempdir, "secrets.ldb")          ldb = setup_secretsdb(path, setup_path, None, None, lp=env_loadparm()) diff --git a/source4/scripting/python/samba/tests/upgradeprovision.py b/source4/scripting/python/samba/tests/upgradeprovision.py index f0306fe6a1..e40262b37a 100644 --- a/source4/scripting/python/samba/tests/upgradeprovision.py +++ b/source4/scripting/python/samba/tests/upgradeprovision.py @@ -23,16 +23,13 @@ from samba.upgradehelpers import  (usn_in_range, dn_sort,                                    construct_existor_expr)  from samba.tests.provision import create_dummy_secretsdb -from samba.tests import env_loadparm, TestCaseInTempDir +from samba.tests import TestCaseInTempDir  from samba import Ldb  from ldb import SCOPE_SUBTREE  import samba.tests -lp = env_loadparm() -  def dummymessage(a=None, b=None): -    if 0: -        print "none" +    pass  class UpgradeProvisionTestCase(TestCaseInTempDir): @@ -60,7 +57,8 @@ class UpgradeProvisionTestCase(TestCaseInTempDir):          self.assertEquals(dn_sort("dc=toto,dc=tata",                                      "cn=foo,dc=toto,dc=tata"), -1)          self.assertEquals(dn_sort("cn=bar, dc=toto,dc=tata", -                                    "cn=foo, dc=toto,dc=tata"),-1) +                                    "cn=foo, dc=toto,dc=tata"), -1) +      def test_get_diff_sddl(self):          sddl = "O:SAG:DUD:AI(A;CIID;RPWPCRCCLCLORCWOWDSW;;;SA)\  (A;CIID;RP LCLORC;;;AU)(A;CIID;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)S:AI(AU;CIIDSA;WP;;;WD)" @@ -75,19 +73,19 @@ class UpgradeProvisionTestCase(TestCaseInTempDir):          sddl5 = "O:SAG:DUD:AI(A;CIID;RPWPCRCCLCLORCWOWDSW;;;SA)\  (A;CIID;RP LCLORC;;;AU)(A;CIID;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" -        self.assertEquals(get_diff_sddls(sddl, sddl1) ,"") +        self.assertEquals(get_diff_sddls(sddl, sddl1), "")          txt = get_diff_sddls(sddl, sddl2) -        self.assertEquals(txt ,"\tOwner mismatch: SA (in ref) BA(in current)\n") +        self.assertEquals(txt, "\tOwner mismatch: SA (in ref) BA(in current)\n")          txt = get_diff_sddls(sddl, sddl3) -        self.assertEquals(txt ,"\tGroup mismatch: DU (in ref) BA(in current)\n") +        self.assertEquals(txt, "\tGroup mismatch: DU (in ref) BA(in current)\n")          txt = get_diff_sddls(sddl, sddl4)          txtmsg = "\tPart dacl is different between reference and current here\   is the detail:\n\t\t(A;CIID;RPWPCRCCLCLORCWOWDSW;;;BA) ACE is not present in\   the reference\n\t\t(A;CIID;RPWPCRCCLCLORCWOWDSW;;;SA) ACE is not present in\   the current\n" -        self.assertEquals(txt , txtmsg) +        self.assertEquals(txt, txtmsg)          txt = get_diff_sddls(sddl, sddl5) -        self.assertEquals(txt ,"\tCurrent ACL hasn't a sacl part\n") +        self.assertEquals(txt, "\tCurrent ACL hasn't a sacl part\n")      def test_construct_existor_expr(self):          res = construct_existor_expr([]) @@ -99,7 +97,9 @@ class UpgradeProvisionTestCase(TestCaseInTempDir):          res = construct_existor_expr(["foo", "bar"])          self.assertEquals(res, "(|(foo=*)(bar=*))") +  class UpdateSecretsTests(samba.tests.TestCaseInTempDir): +      def setUp(self):          super(UpdateSecretsTests, self).setUp()          self.referencedb = create_dummy_secretsdb( diff --git a/source4/scripting/python/samba/tests/upgradeprovisionneeddc.py b/source4/scripting/python/samba/tests/upgradeprovisionneeddc.py index 32d6c0975b..3b0a695d83 100644 --- a/source4/scripting/python/samba/tests/upgradeprovisionneeddc.py +++ b/source4/scripting/python/samba/tests/upgradeprovisionneeddc.py @@ -20,6 +20,8 @@  import os  import re  import shutil + +from samba import param  from samba.credentials import Credentials  from samba.auth import system_session  from samba.provision import getpolicypath @@ -27,67 +29,66 @@ from samba.upgradehelpers import (get_paths, get_ldbs,                                   find_provision_key_parameters, identic_rename,                                   updateOEMInfo, getOEMInfo, update_gpo,                                   delta_update_basesamdb,search_constructed_attrs_stored) - -from samba.tests.provision import create_dummy_secretsdb -from samba import param  from samba.tests import env_loadparm, TestCaseInTempDir +from samba.tests.provision import create_dummy_secretsdb  import ldb  def dummymessage(a=None, b=None): -    if 0: -        print "none" +    pass -lp = env_loadparm() -smbConfPath = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "dc", "etc/smb.conf") +smb_conf_path = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "dc", "etc/smb.conf")  class UpgradeProvisionBasicLdbHelpersTestCase(TestCaseInTempDir):      """Some simple tests for individual functions in the provisioning code.      """      def test_get_ldbs(self): -        paths = get_paths(param, None, smbConfPath) +        paths = get_paths(param, None, smb_conf_path)          creds = Credentials() +        lp = env_loadparm()          creds.guess(lp)          get_ldbs(paths, creds, system_session(), lp)      def test_find_key_param(self): -        paths = get_paths(param, None, smbConfPath) +        paths = get_paths(param, None, smb_conf_path)          creds = Credentials() +        lp = env_loadparm()          creds.guess(lp)          rootdn = "dc=samba,dc=example,dc=com"          ldbs = get_ldbs(paths, creds, system_session(), lp)          names = find_provision_key_parameters(ldbs.sam, ldbs.secrets, ldbs.idmap, -                                                paths, smbConfPath, lp) +                                                paths, smb_conf_path, lp)          self.assertEquals(names.realm, "SAMBA.EXAMPLE.COM") -        self.assertTrue(str(names.rootdn).lower() == rootdn.lower()) +        self.assertEquals(str(names.rootdn).lower(), rootdn.lower())          self.assertTrue(names.policyid_dc != None)          self.assertTrue(names.ntdsguid != "")  class UpgradeProvisionWithLdbTestCase(TestCaseInTempDir): +      def _getEmptyDbName(self):          return os.path.join(self.tempdir, "sam.ldb")      def setUp(self):          super(UpgradeProvisionWithLdbTestCase, self).setUp() -        paths = get_paths(param, None, smbConfPath) +        paths = get_paths(param, None, smb_conf_path)          self.creds = Credentials() -        self.creds.guess(lp) +        self.lp = env_loadparm() +        self.creds.guess(self.lp)          self.paths = paths -        self.ldbs = get_ldbs(paths, self.creds, system_session(), lp) -        self.lp = lp +        self.ldbs = get_ldbs(paths, self.creds, system_session(), self.lp)          self.names = find_provision_key_parameters(self.ldbs.sam, self.ldbs.secrets, -                                                       self.ldbs.idmap, paths, smbConfPath, lp) +                                   self.ldbs.idmap, paths, smb_conf_path, self.lp)          self.referencedb = create_dummy_secretsdb(              os.path.join(self.tempdir, "ref.ldb")) -      def test_search_constructed_attrs_stored(self):          hashAtt = search_constructed_attrs_stored(self.ldbs.sam,                                                    self.names.rootdn,                                                    ["msds-KeyVersionNumber"])          self.assertFalse(hashAtt.has_key("msds-KeyVersionNumber")) +      def test_identic_rename(self):          rootdn = "DC=samba,DC=example,DC=com" diff --git a/source4/scripting/python/samba/upgradehelpers.py b/source4/scripting/python/samba/upgradehelpers.py index db6ea560a2..5a37dab108 100755 --- a/source4/scripting/python/samba/upgradehelpers.py +++ b/source4/scripting/python/samba/upgradehelpers.py @@ -166,6 +166,7 @@ def get_ldbs(paths, creds, session, lp):      return ldbs +  def usn_in_range(usn, range):      """Check if the usn is in one of the range provided.      To do so, the value is checked to be between the lower bound and @@ -174,25 +175,27 @@ def usn_in_range(usn, range):      :param usn: A integer value corresponding to the usn that we want to update      :param range: A list of integer representing ranges, lower bounds are in                    the even indices, higher in odd indices -    :return: 1 if the usn is in one of the range, 0 otherwise""" +    :return: True if the usn is in one of the range, False otherwise +    """      idx = 0 -    cont = 1 -    ok = 0 -    while (cont == 1): +    cont = True +    ok = False +    while cont:          if idx ==  len(range): -            cont = 0 +            cont = False              continue          if usn < int(range[idx]):              if idx %2 == 1: -                ok = 1 -            cont = 0 +                ok = True +            cont = False          if usn == int(range[idx]): -            cont = 0 -            ok = 1 +            cont = False +            ok = True          idx = idx + 1      return ok +  def get_paths(param, targetdir=None, smbconf=None):      """Get paths to important provision objects (smb.conf, ldb files, ...) @@ -237,6 +240,7 @@ def update_policyids(names, samdb):      else:          names.policyid_dc = None +  def find_provision_key_parameters(samdb, secretsdb, idmapdb, paths, smbconf, lp):      """Get key provision parameters (realm, domain, ...) from a given provision @@ -246,8 +250,8 @@ def find_provision_key_parameters(samdb, secretsdb, idmapdb, paths, smbconf, lp)      :param paths: A list of path to provision object      :param smbconf: Path to the smb.conf file      :param lp: A LoadParm object -    :return: A list of key provision parameters""" - +    :return: A list of key provision parameters +    """      names = ProvisionNames()      names.adminpass = None @@ -408,16 +412,19 @@ def dn_sort(x, y):                      return -1      return ret +  def identic_rename(ldbobj, dn):      """Perform a back and forth rename to trigger renaming on attribute that -       can't be directly modified. +    can't be directly modified.      :param lbdobj: An Ldb Object -    :param dn: DN of the object to manipulate """ +    :param dn: DN of the object to manipulate +    """      (before, sep, after)=str(dn).partition('=')      ldbobj.rename(dn, ldb.Dn(ldbobj, "%s=foo%s" % (before, after)))      ldbobj.rename(ldb.Dn(ldbobj, "%s=foo%s" % (before, after)), dn) +  def chunck_acl(acl):      """Return separate ACE of an ACL @@ -659,7 +666,7 @@ def update_gpo(paths, samdb, names, lp, message, force=0):      Set ACL correctly also.      Check ACLs for sysvol/netlogon dirs also      """ -    resetacls = 0 +    resetacls = False      try:          ntacls.checkset_backend(lp, None, None)          eadbname = lp.get("posix:eadb") @@ -674,10 +681,10 @@ def update_gpo(paths, samdb, names, lp, message, force=0):              attribute = samba.xattr_native.wrap_getxattr(paths.sysvol,                                  xattr.XATTR_NTACL_NAME)      except: -       resetacls = 1 +       resetacls = True      if force: -        resetacls = 1 +        resetacls = True      dir = getpolicypath(paths.sysvol, names.dnsdomain, names.policyid)      if not os.path.isdir(dir):  | 
