diff options
Diffstat (limited to 'source4/scripting/python')
3 files changed, 444 insertions, 99 deletions
diff --git a/source4/scripting/python/samba/tests/upgradeprovision.py b/source4/scripting/python/samba/tests/upgradeprovision.py index 7adb97f298..c5e1094e0d 100644 --- a/source4/scripting/python/samba/tests/upgradeprovision.py +++ b/source4/scripting/python/samba/tests/upgradeprovision.py @@ -18,90 +18,39 @@ # import os -from samba.credentials import Credentials -from samba.auth import system_session -from samba.upgradehelpers import get_paths, usn_in_range, get_ldbs,\ - find_provision_key_parameters, dn_sort,\ - identic_rename, get_diff_sddls -from samba import param +from samba.upgradehelpers import usn_in_range, dn_sort,\ + get_diff_sddls, update_secrets + + +from samba.tests.provision import create_dummy_secretsdb from samba.tests import env_loadparm, TestCaseInTempDir -import ldb +from samba import Ldb +from ldb import SCOPE_SUBTREE +import samba.tests lp = env_loadparm() +def dummymessage(a=None, b=None): + if 0: + print "none" + class UpgradeProvisionTestCase(TestCaseInTempDir): """Some simple tests for individual functions in the provisioning code. """ - def test_get_paths(self): - smbConfPath = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "dc", "etc/smb.conf") - targetdir = os.path.join(os.environ["SELFTEST_PREFIX"], "dc") - privatePath = os.path.join(targetdir, "private") - - paths = get_paths(param, None, smbConfPath) - self.assertEquals(paths.private_dir, privatePath) - - paths2 = get_paths(param, targetdir) - self.assertEquals(paths2.private_dir, privatePath) - def test_usn_in_range(self): + range = [5, 25, 35, 55] - range = [] - range.append(5) - range.append(25) - range.append(35) - range.append(55) - - vals = [] - vals.append(3) - vals.append(26) - vals.append(56) + vals = [3, 26, 56] for v in vals: self.assertFalse(usn_in_range(v, range)) - vals = [] - vals.append(5) - vals.append(20) - vals.append(25) - vals.append(35) - vals.append(36) + vals = [5, 20, 25, 35, 36] for v in vals: self.assertTrue(usn_in_range(v, range)) - - def test_get_ldbs(self): - smbConfPath = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "dc", "etc/smb.conf") - paths = get_paths(param, None, smbConfPath) - creds = Credentials() - creds.guess(lp) - try: - get_ldbs(paths, creds, system_session(), lp) - except: - self.assertTrue(0) - - def test_find_key_param(self): - smbConfPath = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "dc", "etc/smb.conf") - paths = get_paths(param, None, smbConfPath) - creds = Credentials() - creds.guess(lp) - rootdn = "dc=samba,dc=example,dc=com" - ldbs = get_ldbs(paths, creds, system_session(), lp) - find_provision_key_parameters(ldbs.sam, ldbs.secrets, paths, - smbConfPath, lp) - try: - names = find_provision_key_parameters(ldbs.sam, ldbs.secrets, paths, - smbConfPath, lp) - except: - self.assertTrue(0) - - self.assertTrue(names.realm == "SAMBA.EXAMPLE.COM") - self.assertTrue(str(names.rootdn).lower() == rootdn.lower()) - self.assertTrue(names.ntdsguid != "") - - - def test_dn_sort(self): # higher level comes after lower even if lexicographicaly closer # ie dc=tata,dc=toto (2 levels), comes after dc=toto @@ -111,27 +60,7 @@ class UpgradeProvisionTestCase(TestCaseInTempDir): self.assertEquals(dn_sort("dc=toto,dc=tata", "cn=foo,dc=toto,dc=tata"), -1) self.assertEquals(dn_sort("cn=bar, dc=toto,dc=tata", - "cn=foo, dc=toto,dc=tata"), -1) - - def test_identic_rename(self): - smbConfPath = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "dc", "etc/smb.conf") - paths = get_paths(param, None, smbConfPath) - creds = Credentials() - creds.guess(lp) - rootdn = "DC=samba,DC=example,DC=com" - ldbs = get_ldbs(paths, creds, system_session(), lp) - - guestDN = ldb.Dn(ldbs.sam, "CN=Guest,CN=Users,%s" % rootdn) - try: - identic_rename(ldbs.sam, guestDN) - res = ldbs.sam.search(expression="(name=Guest)", base=rootdn, - scope=ldb.SCOPE_SUBTREE, attrs=["dn"]) - except: - self.assertTrue(0) - - self.assertEquals(len(res), 1) - self.assertEquals(str(res[0]["dn"]), "CN=Guest,CN=Users,%s" % rootdn) - + "cn=foo, dc=toto,dc=tata"),-1) def test_get_diff_sddl(self): sddl = "O:SAG:DUD:AI(A;CIID;RPWPCRCCLCLORCWOWDSW;;;SA)\ (A;CIID;RP LCLORC;;;AU)(A;CIID;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)S:AI(AU;CIIDSA;WP;;;WD)" @@ -148,9 +77,9 @@ class UpgradeProvisionTestCase(TestCaseInTempDir): self.assertEquals(get_diff_sddls(sddl, sddl1) ,"") txt = get_diff_sddls(sddl, sddl2) - self.assertEquals(txt ,"\tOwner mismatch: SA (in ref) BA (in current)\n") + self.assertEquals(txt ,"\tOwner mismatch: SA (in ref) BA(in current)\n") txt = get_diff_sddls(sddl, sddl3) - self.assertEquals(txt ,"\tGroup mismatch: DU (in ref) BA (in current)\n") + self.assertEquals(txt ,"\tGroup mismatch: DU (in ref) BA(in current)\n") txt = get_diff_sddls(sddl, sddl4) txtmsg = "\tPart dacl is different between reference and current here\ is the detail:\n\t\t(A;CIID;RPWPCRCCLCLORCWOWDSW;;;BA) ACE is not present in\ @@ -159,3 +88,41 @@ class UpgradeProvisionTestCase(TestCaseInTempDir): self.assertEquals(txt , txtmsg) txt = get_diff_sddls(sddl, sddl5) self.assertEquals(txt ,"\tCurrent ACL hasn't a sacl part\n") + + +class UpdateSecretsTests(samba.tests.TestCaseInTempDir): + def setUp(self): + super(UpdateSecretsTests, self).setUp() + self.referencedb = create_dummy_secretsdb( + os.path.join(self.tempdir, "ref.ldb")) + + def _getEmptyDb(self): + return Ldb(os.path.join(self.tempdir, "secrets.ldb")) + + def _getCurrentFormatDb(self): + return create_dummy_secretsdb( + os.path.join(self.tempdir, "secrets.ldb")) + + def test_trivial(self): + # Test that updating an already up-to-date secretsdb works fine + self.secretsdb = self._getCurrentFormatDb() + self.assertEquals(None, + update_secrets(self.referencedb, self.secretsdb, dummymessage)) + + def test_update_modules(self): + empty_db = self._getEmptyDb() + update_secrets(self.referencedb, empty_db, dummymessage) + newmodules = empty_db.search( + expression="dn=@MODULES", base="", scope=SCOPE_SUBTREE) + refmodules = self.referencedb.search( + expression="dn=@MODULES", base="", scope=SCOPE_SUBTREE) + self.assertEquals(newmodules, refmodules) + + def tearDown(self): + for name in ["ref.ldb", "secrets.ldb"]: + path = os.path.join(self.tempdir, name) + if os.path.exists(path): + os.unlink(path) + super(UpdateSecretsTests, self).tearDown() + + diff --git a/source4/scripting/python/samba/tests/upgradeprovisionneeddc.py b/source4/scripting/python/samba/tests/upgradeprovisionneeddc.py new file mode 100644 index 0000000000..32fad14765 --- /dev/null +++ b/source4/scripting/python/samba/tests/upgradeprovisionneeddc.py @@ -0,0 +1,138 @@ +#!/usr/bin/python + +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +import re +import shutil +from samba.credentials import Credentials +from samba.auth import system_session +from samba.provision import getpolicypath +from samba.upgradehelpers import (get_paths, get_ldbs, + find_provision_key_parameters, identic_rename, + updateOEMInfo, getOEMInfo, update_gpo, + delta_update_basesamdb) + +from samba.tests.provision import create_dummy_secretsdb +from samba import param +from samba.tests import env_loadparm, TestCaseInTempDir +import ldb + + +def dummymessage(a=None, b=None): + if 0: + print "none" + +lp = env_loadparm() +smbConfPath = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "dc", "etc/smb.conf") + +class UpgradeProvisionBasicLdbHelpersTestCase(TestCaseInTempDir): + """Some simple tests for individual functions in the provisioning code. + """ + + def test_get_ldbs(self): + paths = get_paths(param, None, smbConfPath) + creds = Credentials() + creds.guess(lp) + get_ldbs(paths, creds, system_session(), lp) + + def test_find_key_param(self): + paths = get_paths(param, None, smbConfPath) + creds = Credentials() + creds.guess(lp) + rootdn = "dc=samba,dc=example,dc=com" + ldbs = get_ldbs(paths, creds, system_session(), lp) + names = find_provision_key_parameters(ldbs.sam, ldbs.secrets, ldbs.idmap, + paths, smbConfPath, lp) + self.assertEquals(names.realm, "SAMBA.EXAMPLE.COM") + self.assertTrue(str(names.rootdn).lower() == rootdn.lower()) + self.assertTrue(names.policyid_dc != None) + self.assertTrue(names.ntdsguid != "") + + +class UpgradeProvisionWithLdbTestCase(TestCaseInTempDir): + def _getEmptyDbName(self): + return os.path.join(self.tempdir, "sam.ldb") + + def setUp(self): + super(UpgradeProvisionWithLdbTestCase, self).setUp() + paths = get_paths(param, None, smbConfPath) + self.creds = Credentials() + self.creds.guess(lp) + self.paths = paths + self.ldbs = get_ldbs(paths, self.creds, system_session(), lp) + self.lp = lp + self.names = find_provision_key_parameters(self.ldbs.sam, self.ldbs.secrets, + self.ldbs.idmap, paths, smbConfPath, lp) + self.referencedb = create_dummy_secretsdb( + os.path.join(self.tempdir, "ref.ldb")) + + def test_identic_rename(self): + rootdn = "DC=samba,DC=example,DC=com" + + guestDN = ldb.Dn(self.ldbs.sam, "CN=Guest,CN=Users,%s" % rootdn) + identic_rename(self.ldbs.sam, guestDN) + res = self.ldbs.sam.search(expression="(name=Guest)", base=rootdn, + scope=ldb.SCOPE_SUBTREE, attrs=["dn"]) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0]["dn"]), "CN=Guest,CN=Users,%s" % rootdn) + + def test_delta_update_basesamdb(self): + dummysampath = self._getEmptyDbName() + delta_update_basesamdb(self.paths.samdb, dummysampath, + self.creds, system_session(), self.lp, dummymessage) + + def test_update_gpo_simple(self): + dir = getpolicypath(self.paths.sysvol, self.names.dnsdomain, self.names.policyid) + shutil.rmtree(dir) + self.assertFalse(os.path.isdir(dir)) + update_gpo(self.paths, self.ldbs.sam, self.names, self.lp, dummymessage) + self.assertTrue(os.path.isdir(dir)) + + def test_update_gpo_acl(self): + path = os.path.join(self.tempdir, "testupdategpo") + save = self.paths.sysvol + self.paths.sysvol = path + os.mkdir(path) + os.mkdir(os.path.join(path, self.names.dnsdomain)) + os.mkdir(os.path.join(os.path.join(path, self.names.dnsdomain), "Policies")) + update_gpo(self.paths, self.ldbs.sam, self.names, self.lp, dummymessage) + shutil.rmtree(path) + self.paths.sysvol = save + + def test_getOEMInfo(self): + realm = self.lp.get("realm") + basedn = "DC=%s" % realm.replace(".", ", DC=") + oem = getOEMInfo(self.ldbs.sam, basedn) + self.assertTrue(oem != "") + + def test_updateOEMInfo(self): + realm = self.lp.get("realm") + basedn = "DC=%s" % realm.replace(".", ", DC=") + oem = getOEMInfo(self.ldbs.sam, basedn) + updateOEMInfo(self.ldbs.sam, basedn) + oem2 = getOEMInfo(self.ldbs.sam, basedn) + self.assertTrue(str(oem) != str(oem2)) + self.assertTrue(re.match(".*upgrade to.*", str(oem2))) + + def tearDown(self): + for name in ["ref.ldb", "secrets.ldb", "sam.ldb"]: + path = os.path.join(self.tempdir, name) + if os.path.exists(path): + os.unlink(path) + super(UpgradeProvisionWithLdbTestCase, self).tearDown() diff --git a/source4/scripting/python/samba/upgradehelpers.py b/source4/scripting/python/samba/upgradehelpers.py index a297035482..be5bdb05d6 100755 --- a/source4/scripting/python/samba/upgradehelpers.py +++ b/source4/scripting/python/samba/upgradehelpers.py @@ -26,21 +26,44 @@ import os import string import re import shutil +import samba +from samba import Ldb, version, ntacls +from samba.dsdb import DS_DOMAIN_FUNCTION_2000 from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE import ldb - -from samba import Ldb -from samba.dcerpc import misc, security -from samba.dsdb import DS_DOMAIN_FUNCTION_2000 -from samba.provision import (ProvisionNames, provision_paths_from_lp, - FILL_FULL, provision, ProvisioningError) +from samba.provision import ProvisionNames, provision_paths_from_lp,\ + getpolicypath, set_gpo_acl, create_gpo_struct,\ + FILL_FULL, provision, ProvisioningError,\ + setsysvolacl +from samba.dcerpc import misc, security, xattr from samba.ndr import ndr_unpack # All the ldb related to registry are commented because the path for them is relative # in the provisionPath object # And so opening them create a file in the current directory which is not what we want # I still keep them commented because I plan soon to make more cleaner +ERROR = -1 +SIMPLE = 0x00 +CHANGE = 0x01 +CHANGESD = 0x02 +GUESS = 0x04 +PROVISION = 0x08 +CHANGEALL = 0xff + +hashAttrNotCopied = { "dn": 1, "whenCreated": 1, "whenChanged": 1, + "objectGUID": 1, "uSNCreated": 1, + "replPropertyMetaData": 1, "uSNChanged": 1, + "parentGUID": 1, "objectCategory": 1, + "distinguishedName": 1, "nTMixedDomain": 1, + "showInAdvancedViewOnly": 1, "instanceType": 1, + "msDS-Behavior-Version":1, "nextRid":1, "cn": 1, + "versionNumber":1, "lmPwdHistory":1, "pwdLastSet": 1, + "ntPwdHistory":1, "unicodePwd":1,"dBCSPwd":1, + "supplementalCredentials":1, "gPCUserExtensionNames":1, + "gPCMachineExtensionNames":1,"maxPwdAge":1, "secret":1, + "possibleInferiors":1, "privilege":1, + "sAMAccountType":1 } class ProvisionLDB(object): def __init__(self): @@ -165,11 +188,12 @@ def get_paths(param, targetdir=None, smbconf=None): return paths -def find_provision_key_parameters(samdb, secretsdb, paths, smbconf, lp): +def find_provision_key_parameters(samdb, secretsdb, idmapdb, paths, smbconf, lp): """Get key provision parameters (realm, domain, ...) from a given provision :param samdb: An LDB object connected to the sam.ldb file :param secretsdb: An LDB object connected to the secrets.ldb file + :param idmapdb: An LDB object connected to the idmap.ldb file :param paths: A list of path to provision object :param smbconf: Path to the smb.conf file :param lp: A LoadParm object @@ -181,8 +205,8 @@ def find_provision_key_parameters(samdb, secretsdb, paths, smbconf, lp): # NT domain, kerberos realm, root dn, domain dn, domain dns name names.domain = string.upper(lp.get("workgroup")) names.realm = lp.get("realm") - basedn = "DC=" + names.realm.replace(".", ",DC=") - names.dnsdomain = names.realm + basedn = "DC=" + names.realm.replace(".",",DC=") + names.dnsdomain = names.realm.lower() names.realm = string.upper(names.realm) # netbiosname # Get the netbiosname first (could be obtained from smb.conf in theory) @@ -252,7 +276,12 @@ def find_provision_key_parameters(samdb, secretsdb, paths, smbconf, lp): names.policyid_dc = str(res8[0]["cn"]).replace("{","").replace("}","") else: names.policyid_dc = None - + res9 = idmapdb.search(expression="(cn=%s)" % (security.SID_BUILTIN_ADMINISTRATORS), + attrs=["xidNumber"]) + if len(res9) == 1: + names.wheel_gid = res9[0]["xidNumber"] + else: + raise ProvisioningError("Unable to find uid/gid for Domain Admins rid") return names @@ -433,3 +462,214 @@ def get_diff_sddls(refsddl, cursddl): txt = "%s\tCurrent ACL hasn't a %s part\n" % (txt, part) return txt + + +def update_secrets(newsecrets_ldb, secrets_ldb, messagefunc): + """Update secrets.ldb + + :param newsecrets_ldb: An LDB object that is connected to the secrets.ldb + of the reference provision + :param secrets_ldb: An LDB object that is connected to the secrets.ldb + of the updated provision + """ + + messagefunc(SIMPLE, "update secrets.ldb") + reference = newsecrets_ldb.search(expression="dn=@MODULES", base="", + scope=SCOPE_SUBTREE) + current = secrets_ldb.search(expression="dn=@MODULES", base="", + scope=SCOPE_SUBTREE) + assert reference, "Reference modules list can not be empty" + if len(current) == 0: + # No modules present + delta = secrets_ldb.msg_diff(ldb.Message(), reference[0]) + delta.dn = reference[0].dn + secrets_ldb.add(reference[0]) + else: + delta = secrets_ldb.msg_diff(current[0], reference[0]) + delta.dn = current[0].dn + secrets_ldb.modify(delta) + + reference = newsecrets_ldb.search(expression="objectClass=top", base="", + scope=SCOPE_SUBTREE, attrs=["dn"]) + current = secrets_ldb.search(expression="objectClass=top", base="", + scope=SCOPE_SUBTREE, attrs=["dn"]) + hash_new = {} + hash = {} + listMissing = [] + listPresent = [] + + empty = ldb.Message() + for i in range(0, len(reference)): + hash_new[str(reference[i]["dn"]).lower()] = reference[i]["dn"] + + # Create a hash for speeding the search of existing object in the + # current provision + for i in range(0, len(current)): + hash[str(current[i]["dn"]).lower()] = current[i]["dn"] + + for k in hash_new.keys(): + if not hash.has_key(k): + listMissing.append(hash_new[k]) + else: + listPresent.append(hash_new[k]) + + for entry in listMissing: + reference = newsecrets_ldb.search(expression="dn=%s" % entry, + base="", scope=SCOPE_SUBTREE) + current = secrets_ldb.search(expression="dn=%s" % entry, + base="", scope=SCOPE_SUBTREE) + delta = secrets_ldb.msg_diff(empty, reference[0]) + for att in hashAttrNotCopied.keys(): + delta.remove(att) + messagefunc(CHANGE, "Entry %s is missing from secrets.ldb" % reference[0].dn) + for att in delta: + messagefunc(CHANGE, " Adding attribute %s" % att) + delta.dn = reference[0].dn + secrets_ldb.add(delta) + + for entry in listPresent: + reference = newsecrets_ldb.search(expression="dn=%s" % entry, + base="", scope=SCOPE_SUBTREE) + current = secrets_ldb.search(expression="dn=%s" % entry, base="", + scope=SCOPE_SUBTREE) + delta = secrets_ldb.msg_diff(current[0], reference[0]) + for att in hashAttrNotCopied.keys(): + delta.remove(att) + for att in delta: + if att == "name": + messagefunc(CHANGE, "Found attribute name on %s," \ + " must rename the DN" % (current[0].dn)) + identic_rename(secrets_ldb, reference[0].dn) + else: + delta.remove(att) + + for entry in listPresent: + reference = newsecrets_ldb.search(expression="dn=%s" % entry, base="", + scope=SCOPE_SUBTREE) + current = secrets_ldb.search(expression="dn=%s" % entry, base="", + scope=SCOPE_SUBTREE) + delta = secrets_ldb.msg_diff(current[0], reference[0]) + for att in hashAttrNotCopied.keys(): + delta.remove(att) + for att in delta: + if att != "dn": + messagefunc(CHANGE, + "Adding/Changing attribute %s to %s" % (att, current[0].dn)) + + delta.dn = current[0].dn + secrets_ldb.modify(delta) + +def getOEMInfo(samdb, rootdn): + """Return OEM Information on the top level + Samba4 use to store version info in this field + + :param samdb: An LDB object connect to sam.ldb + :param rootdn: Root DN of the domain + :return: The content of the field oEMInformation (if any)""" + res = samdb.search(expression="(objectClass=*)", base=str(rootdn), + scope=SCOPE_BASE, attrs=["dn", "oEMInformation"]) + if len(res) > 0: + info = res[0]["oEMInformation"] + return info + else: + return "" + +def updateOEMInfo(samdb, rootdn): + """Update the OEMinfo field to add information about upgrade + :param samdb: an LDB object connected to the sam DB + :param rootdn: The string representation of the root DN of + the provision (ie. DC=...,DC=...) + """ + res = samdb.search(expression="(objectClass=*)", base=rootdn, + scope=SCOPE_BASE, attrs=["dn", "oEMInformation"]) + if len(res) > 0: + info = res[0]["oEMInformation"] + info = "%s, upgrade to %s" % (info, version) + delta = ldb.Message() + delta.dn = ldb.Dn(samdb, str(res[0]["dn"])) + delta["oEMInformation"] = ldb.MessageElement(info, ldb.FLAG_MOD_REPLACE, + "oEMInformation" ) + samdb.modify(delta) + +def update_gpo(paths, samdb, names, lp, message, force=0): + """Create missing GPO file object if needed + + Set ACL correctly also. + Check ACLs for sysvol/netlogon dirs also + """ + resetacls = 0 + try: + ntacls.checkset_backend(lp, None, None) + eadbname = lp.get("posix:eadb") + if eadbname is not None and eadbname != "": + try: + attribute = samba.xattr_tdb.wrap_getxattr(eadbname, paths.sysvol, + xattr.XATTR_NTACL_NAME) + except: + attribute = samba.xattr_native.wrap_getxattr(paths.sysvol, + xattr.XATTR_NTACL_NAME) + else: + attribute = samba.xattr_native.wrap_getxattr(paths.sysvol, + xattr.XATTR_NTACL_NAME) + except: + resetacls = 1 + + if force: + resetacls = 1 + + dir = getpolicypath(paths.sysvol, names.dnsdomain, names.policyid) + if not os.path.isdir(dir): + create_gpo_struct(dir) + + dir = getpolicypath(paths.sysvol, names.dnsdomain, names.policyid_dc) + if not os.path.isdir(dir): + create_gpo_struct(dir) + # We always reinforce acls on GPO folder because they have to be in sync + # with the one in DS + set_gpo_acl(paths.sysvol, names.dnsdomain, names.domainsid, + names.domaindn, samdb, lp) + + if resetacls: + setsysvolacl(samdb, paths.netlogon, paths.sysvol, names.wheel_gid, + names.domainsid, names.dnsdomain, names.domaindn, lp) + +def delta_update_basesamdb(refsam, sam, creds, session, lp, message): + """Update the provision container db: sam.ldb + This function is aimed for alpha9 and newer; + + :param refsam: Path to the samdb in the reference provision + :param sam: Path to the samdb in the upgraded provision + :param creds: Credential used for openning LDB files + :param session: Session to use for openning LDB files + :param lp: A loadparam object""" + + message(SIMPLE, + "Update base samdb by searching difference with reference one") + refsam = Ldb(refsam, session_info=session, credentials=creds, + lp=lp, options=["modules:"]) + sam = Ldb(sam, session_info=session, credentials=creds, lp=lp, + options=["modules:"]) + + empty = ldb.Message() + + reference = refsam.search(expression="") + + for refentry in reference: + entry = sam.search(expression="dn=%s" % refentry["dn"], + scope=SCOPE_SUBTREE) + if not len(entry): + delta = sam.msg_diff(empty, refentry) + message(CHANGE, "Adding %s to sam db" % str(refentry.dn)) + if str(refentry.dn) == "@PROVISION" and\ + delta.get(samba.provision.LAST_PROVISION_USN_ATTRIBUTE): + delta.remove(samba.provision.LAST_PROVISION_USN_ATTRIBUTE) + delta.dn = refentry.dn + sam.add(delta) + else: + delta = sam.msg_diff(entry[0], refentry) + if str(refentry.dn) == "@PROVISION" and\ + delta.get(samba.provision.LAST_PROVISION_USN_ATTRIBUTE): + delta.remove(samba.provision.LAST_PROVISION_USN_ATTRIBUTE) + if len(delta.items()) > 1: + delta.dn = refentry.dn + sam.modify(delta) |