diff options
Diffstat (limited to 'source4/scripting/python')
5 files changed, 78 insertions, 61 deletions
diff --git a/source4/scripting/python/samba/provision.py b/source4/scripting/python/samba/provision.py index 5aaa833030..873be6730d 100644 --- a/source4/scripting/python/samba/provision.py +++ b/source4/scripting/python/samba/provision.py @@ -81,6 +81,7 @@ def find_setup_dir(): # hard coded at this point, but will probably be changed when # we enable different fsmo roles + def get_config_descriptor(domain_sid): sddl = "O:EAG:EAD:(OA;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;ED)" \ "(OA;;CR;1131f6ab-9c07-11d1-f79f-00c04fc2dcd2;;ED)" \ @@ -192,8 +193,10 @@ class ProvisionNames(object): self.sitename = None self.smbconf = None -def updateProvisionUSN(samdb, low, high, replace = 0): + +def update_provision_usn(samdb, low, high, replace=False): """Update the field provisionUSN in sam.ldb + This field is used to track range of USN modified by provision and upgradeprovision. This value is used afterward by next provision to figure out if @@ -203,26 +206,28 @@ def updateProvisionUSN(samdb, low, high, replace = 0): :param low: The lowest USN modified by this upgrade :param high: The highest USN modified by this upgrade :param replace: A boolean indicating if the range should replace any - existing one or appended (default)""" + existing one or appended (default) + """ tab = [] if not replace: entry = samdb.search(expression="(&(dn=@PROVISION)(%s=*))" % \ LAST_PROVISION_USN_ATTRIBUTE, base="", scope=ldb.SCOPE_SUBTREE, - attrs=[LAST_PROVISION_USN_ATTRIBUTE,"dn"]) + attrs=[LAST_PROVISION_USN_ATTRIBUTE, "dn"]) for e in entry[0][LAST_PROVISION_USN_ATTRIBUTE]: tab.append(str(e)) - tab.append("%s-%s"%(str(low), str(high))) + tab.append("%s-%s" % (low, high)) delta = ldb.Message() - delta.dn = ldb.Dn(samdb,"@PROVISION") + delta.dn = ldb.Dn(samdb, "@PROVISION") delta[LAST_PROVISION_USN_ATTRIBUTE] = ldb.MessageElement(tab, ldb.FLAG_MOD_REPLACE, LAST_PROVISION_USN_ATTRIBUTE) samdb.modify(delta) -def setProvisionUSN(samdb, low, high): + +def set_provision_usn(samdb, low, high): """Set the field provisionUSN in sam.ldb This field is used to track range of USN modified by provision and upgradeprovision. @@ -233,14 +238,15 @@ def setProvisionUSN(samdb, low, high): :param low: The lowest USN modified by this upgrade :param high: The highest USN modified by this upgrade""" tab = [] - tab.append("%s-%s"%(str(low), str(high))) + tab.append("%s-%s" % (low, high)) delta = ldb.Message() - delta.dn = ldb.Dn(samdb,"@PROVISION") + delta.dn = ldb.Dn(samdb, "@PROVISION") delta[LAST_PROVISION_USN_ATTRIBUTE] = ldb.MessageElement(tab, ldb.FLAG_MOD_ADD, LAST_PROVISION_USN_ATTRIBUTE) samdb.add(delta) + def get_max_usn(samdb,basedn): """ This function return the biggest USN present in the provision @@ -256,7 +262,7 @@ def get_max_usn(samdb,basedn): "paged_results:1:1"]) return res[0]["uSNChanged"] -def getLastProvisionUSN(sam): +def get_last_provision_usn(sam): """Get the lastest USN modified by a provision or an upgradeprovision :param sam: An LDB object pointing to the sam.ldb @@ -541,7 +547,7 @@ def make_smbconf(smbconf, setup_path, hostname, domain, realm, serverrole, privdir = os.path.join(targetdir, "private") else: privdir = default_lp.get("private dir") - posixeadb_line = "posix:eadb = " + os.path.abspath(os.path.join(privdir,"eadb.tdb")) + posixeadb_line = "posix:eadb = " + os.path.abspath(os.path.join(privdir, "eadb.tdb")) else: posixeadb_line = "" @@ -1159,7 +1165,7 @@ def set_gpo_acl(sysvol, dnsdomain, domainsid, domaindn, samdb, lp): set_dir_acl(policy_path,dsacl2fsacl(POLICIES_ACL, str(domainsid)), lp, str(domainsid)) res = samdb.search(base="CN=Policies,CN=System,%s"%(domaindn), - attrs=["cn","nTSecurityDescriptor"], + attrs=["cn", "nTSecurityDescriptor"], expression="", scope=ldb.SCOPE_ONELEVEL) for policy in res: acl = ndr_unpack(security.descriptor, @@ -1322,8 +1328,8 @@ def provision(setup_dir, logger, session_info, if not os.path.exists(paths.private_dir): os.mkdir(paths.private_dir) - if not os.path.exists(os.path.join(paths.private_dir,"tls")): - os.mkdir(os.path.join(paths.private_dir,"tls")) + if not os.path.exists(os.path.join(paths.private_dir, "tls")): + os.mkdir(os.path.join(paths.private_dir, "tls")) ldapi_url = "ldapi://%s" % urllib.quote(paths.s4_ldapi_path, safe="") @@ -1489,12 +1495,12 @@ def provision(setup_dir, logger, session_info, logger.info("A Kerberos configuration suitable for Samba 4 has been " "generated at %s", paths.krb5conf) - lastProvisionUSNs = getLastProvisionUSN(samdb) + lastProvisionUSNs = get_last_provision_usn(samdb) maxUSN = get_max_usn(samdb, str(names.rootdn)) if lastProvisionUSNs != None: - updateProvisionUSN(samdb, 0, maxUSN, 1) + update_provision_usn(samdb, 0, maxUSN, 1) else: - setProvisionUSN(samdb, 0, maxUSN) + set_provision_usn(samdb, 0, maxUSN) if serverrole == "domain controller": create_dns_update_list(lp, logger, paths, setup_path) @@ -1545,7 +1551,6 @@ def provision(setup_dir, logger, session_info, logger.info("This slapd-Commandline is also stored under: %s/ldap_backend_startup.sh", provision_backend.ldapdir) - result = ProvisionResult() result.domaindn = domaindn result.paths = paths @@ -1708,6 +1713,7 @@ def create_named_conf(paths, setup_path, realm, dnsdomain, setup_file(setup_path("named.conf.update"), paths.namedconf_update) + def create_named_txt(path, setup_path, realm, dnsdomain, private_dir, keytab_name): """Write out a file containing zone statements suitable for inclusion in a @@ -1729,6 +1735,7 @@ def create_named_txt(path, setup_path, realm, dnsdomain, "PRIVATE_DIR": private_dir }) + def create_krb5_conf(path, setup_path, dnsdomain, hostname, realm): """Write out a file containing zone statements suitable for inclusion in a named.conf file (including GSS-TSIG configuration). diff --git a/source4/scripting/python/samba/tests/provision.py b/source4/scripting/python/samba/tests/provision.py index b5cc57c4d9..aa4de660a0 100644 --- a/source4/scripting/python/samba/tests/provision.py +++ b/source4/scripting/python/samba/tests/provision.py @@ -40,9 +40,11 @@ def create_dummy_secretsdb(path, lp=None): secrets_ldb.transaction_commit() return secrets_ldb + class ProvisionTestCase(samba.tests.TestCaseInTempDir): """Some simple tests for individual functions in the provisioning code. """ + def test_setup_secretsdb(self): path = os.path.join(self.tempdir, "secrets.ldb") ldb = setup_secretsdb(path, setup_path, None, None, lp=env_loadparm()) diff --git a/source4/scripting/python/samba/tests/upgradeprovision.py b/source4/scripting/python/samba/tests/upgradeprovision.py index f0306fe6a1..e40262b37a 100644 --- a/source4/scripting/python/samba/tests/upgradeprovision.py +++ b/source4/scripting/python/samba/tests/upgradeprovision.py @@ -23,16 +23,13 @@ from samba.upgradehelpers import (usn_in_range, dn_sort, construct_existor_expr) from samba.tests.provision import create_dummy_secretsdb -from samba.tests import env_loadparm, TestCaseInTempDir +from samba.tests import TestCaseInTempDir from samba import Ldb from ldb import SCOPE_SUBTREE import samba.tests -lp = env_loadparm() - def dummymessage(a=None, b=None): - if 0: - print "none" + pass class UpgradeProvisionTestCase(TestCaseInTempDir): @@ -60,7 +57,8 @@ class UpgradeProvisionTestCase(TestCaseInTempDir): self.assertEquals(dn_sort("dc=toto,dc=tata", "cn=foo,dc=toto,dc=tata"), -1) self.assertEquals(dn_sort("cn=bar, dc=toto,dc=tata", - "cn=foo, dc=toto,dc=tata"),-1) + "cn=foo, dc=toto,dc=tata"), -1) + def test_get_diff_sddl(self): sddl = "O:SAG:DUD:AI(A;CIID;RPWPCRCCLCLORCWOWDSW;;;SA)\ (A;CIID;RP LCLORC;;;AU)(A;CIID;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)S:AI(AU;CIIDSA;WP;;;WD)" @@ -75,19 +73,19 @@ class UpgradeProvisionTestCase(TestCaseInTempDir): sddl5 = "O:SAG:DUD:AI(A;CIID;RPWPCRCCLCLORCWOWDSW;;;SA)\ (A;CIID;RP LCLORC;;;AU)(A;CIID;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" - self.assertEquals(get_diff_sddls(sddl, sddl1) ,"") + self.assertEquals(get_diff_sddls(sddl, sddl1), "") txt = get_diff_sddls(sddl, sddl2) - self.assertEquals(txt ,"\tOwner mismatch: SA (in ref) BA(in current)\n") + self.assertEquals(txt, "\tOwner mismatch: SA (in ref) BA(in current)\n") txt = get_diff_sddls(sddl, sddl3) - self.assertEquals(txt ,"\tGroup mismatch: DU (in ref) BA(in current)\n") + self.assertEquals(txt, "\tGroup mismatch: DU (in ref) BA(in current)\n") txt = get_diff_sddls(sddl, sddl4) txtmsg = "\tPart dacl is different between reference and current here\ is the detail:\n\t\t(A;CIID;RPWPCRCCLCLORCWOWDSW;;;BA) ACE is not present in\ the reference\n\t\t(A;CIID;RPWPCRCCLCLORCWOWDSW;;;SA) ACE is not present in\ the current\n" - self.assertEquals(txt , txtmsg) + self.assertEquals(txt, txtmsg) txt = get_diff_sddls(sddl, sddl5) - self.assertEquals(txt ,"\tCurrent ACL hasn't a sacl part\n") + self.assertEquals(txt, "\tCurrent ACL hasn't a sacl part\n") def test_construct_existor_expr(self): res = construct_existor_expr([]) @@ -99,7 +97,9 @@ class UpgradeProvisionTestCase(TestCaseInTempDir): res = construct_existor_expr(["foo", "bar"]) self.assertEquals(res, "(|(foo=*)(bar=*))") + class UpdateSecretsTests(samba.tests.TestCaseInTempDir): + def setUp(self): super(UpdateSecretsTests, self).setUp() self.referencedb = create_dummy_secretsdb( diff --git a/source4/scripting/python/samba/tests/upgradeprovisionneeddc.py b/source4/scripting/python/samba/tests/upgradeprovisionneeddc.py index 32d6c0975b..3b0a695d83 100644 --- a/source4/scripting/python/samba/tests/upgradeprovisionneeddc.py +++ b/source4/scripting/python/samba/tests/upgradeprovisionneeddc.py @@ -20,6 +20,8 @@ import os import re import shutil + +from samba import param from samba.credentials import Credentials from samba.auth import system_session from samba.provision import getpolicypath @@ -27,67 +29,66 @@ from samba.upgradehelpers import (get_paths, get_ldbs, find_provision_key_parameters, identic_rename, updateOEMInfo, getOEMInfo, update_gpo, delta_update_basesamdb,search_constructed_attrs_stored) - -from samba.tests.provision import create_dummy_secretsdb -from samba import param from samba.tests import env_loadparm, TestCaseInTempDir +from samba.tests.provision import create_dummy_secretsdb import ldb def dummymessage(a=None, b=None): - if 0: - print "none" + pass -lp = env_loadparm() -smbConfPath = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "dc", "etc/smb.conf") +smb_conf_path = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "dc", "etc/smb.conf") class UpgradeProvisionBasicLdbHelpersTestCase(TestCaseInTempDir): """Some simple tests for individual functions in the provisioning code. """ def test_get_ldbs(self): - paths = get_paths(param, None, smbConfPath) + paths = get_paths(param, None, smb_conf_path) creds = Credentials() + lp = env_loadparm() creds.guess(lp) get_ldbs(paths, creds, system_session(), lp) def test_find_key_param(self): - paths = get_paths(param, None, smbConfPath) + paths = get_paths(param, None, smb_conf_path) creds = Credentials() + lp = env_loadparm() creds.guess(lp) rootdn = "dc=samba,dc=example,dc=com" ldbs = get_ldbs(paths, creds, system_session(), lp) names = find_provision_key_parameters(ldbs.sam, ldbs.secrets, ldbs.idmap, - paths, smbConfPath, lp) + paths, smb_conf_path, lp) self.assertEquals(names.realm, "SAMBA.EXAMPLE.COM") - self.assertTrue(str(names.rootdn).lower() == rootdn.lower()) + self.assertEquals(str(names.rootdn).lower(), rootdn.lower()) self.assertTrue(names.policyid_dc != None) self.assertTrue(names.ntdsguid != "") class UpgradeProvisionWithLdbTestCase(TestCaseInTempDir): + def _getEmptyDbName(self): return os.path.join(self.tempdir, "sam.ldb") def setUp(self): super(UpgradeProvisionWithLdbTestCase, self).setUp() - paths = get_paths(param, None, smbConfPath) + paths = get_paths(param, None, smb_conf_path) self.creds = Credentials() - self.creds.guess(lp) + self.lp = env_loadparm() + self.creds.guess(self.lp) self.paths = paths - self.ldbs = get_ldbs(paths, self.creds, system_session(), lp) - self.lp = lp + self.ldbs = get_ldbs(paths, self.creds, system_session(), self.lp) self.names = find_provision_key_parameters(self.ldbs.sam, self.ldbs.secrets, - self.ldbs.idmap, paths, smbConfPath, lp) + self.ldbs.idmap, paths, smb_conf_path, self.lp) self.referencedb = create_dummy_secretsdb( os.path.join(self.tempdir, "ref.ldb")) - def test_search_constructed_attrs_stored(self): hashAtt = search_constructed_attrs_stored(self.ldbs.sam, self.names.rootdn, ["msds-KeyVersionNumber"]) self.assertFalse(hashAtt.has_key("msds-KeyVersionNumber")) + def test_identic_rename(self): rootdn = "DC=samba,DC=example,DC=com" diff --git a/source4/scripting/python/samba/upgradehelpers.py b/source4/scripting/python/samba/upgradehelpers.py index db6ea560a2..5a37dab108 100755 --- a/source4/scripting/python/samba/upgradehelpers.py +++ b/source4/scripting/python/samba/upgradehelpers.py @@ -166,6 +166,7 @@ def get_ldbs(paths, creds, session, lp): return ldbs + def usn_in_range(usn, range): """Check if the usn is in one of the range provided. To do so, the value is checked to be between the lower bound and @@ -174,25 +175,27 @@ def usn_in_range(usn, range): :param usn: A integer value corresponding to the usn that we want to update :param range: A list of integer representing ranges, lower bounds are in the even indices, higher in odd indices - :return: 1 if the usn is in one of the range, 0 otherwise""" + :return: True if the usn is in one of the range, False otherwise + """ idx = 0 - cont = 1 - ok = 0 - while (cont == 1): + cont = True + ok = False + while cont: if idx == len(range): - cont = 0 + cont = False continue if usn < int(range[idx]): if idx %2 == 1: - ok = 1 - cont = 0 + ok = True + cont = False if usn == int(range[idx]): - cont = 0 - ok = 1 + cont = False + ok = True idx = idx + 1 return ok + def get_paths(param, targetdir=None, smbconf=None): """Get paths to important provision objects (smb.conf, ldb files, ...) @@ -237,6 +240,7 @@ def update_policyids(names, samdb): else: names.policyid_dc = None + def find_provision_key_parameters(samdb, secretsdb, idmapdb, paths, smbconf, lp): """Get key provision parameters (realm, domain, ...) from a given provision @@ -246,8 +250,8 @@ def find_provision_key_parameters(samdb, secretsdb, idmapdb, paths, smbconf, lp) :param paths: A list of path to provision object :param smbconf: Path to the smb.conf file :param lp: A LoadParm object - :return: A list of key provision parameters""" - + :return: A list of key provision parameters + """ names = ProvisionNames() names.adminpass = None @@ -408,16 +412,19 @@ def dn_sort(x, y): return -1 return ret + def identic_rename(ldbobj, dn): """Perform a back and forth rename to trigger renaming on attribute that - can't be directly modified. + can't be directly modified. :param lbdobj: An Ldb Object - :param dn: DN of the object to manipulate """ + :param dn: DN of the object to manipulate + """ (before, sep, after)=str(dn).partition('=') ldbobj.rename(dn, ldb.Dn(ldbobj, "%s=foo%s" % (before, after))) ldbobj.rename(ldb.Dn(ldbobj, "%s=foo%s" % (before, after)), dn) + def chunck_acl(acl): """Return separate ACE of an ACL @@ -659,7 +666,7 @@ def update_gpo(paths, samdb, names, lp, message, force=0): Set ACL correctly also. Check ACLs for sysvol/netlogon dirs also """ - resetacls = 0 + resetacls = False try: ntacls.checkset_backend(lp, None, None) eadbname = lp.get("posix:eadb") @@ -674,10 +681,10 @@ def update_gpo(paths, samdb, names, lp, message, force=0): attribute = samba.xattr_native.wrap_getxattr(paths.sysvol, xattr.XATTR_NTACL_NAME) except: - resetacls = 1 + resetacls = True if force: - resetacls = 1 + resetacls = True dir = getpolicypath(paths.sysvol, names.dnsdomain, names.policyid) if not os.path.isdir(dir): |