From 9040e2684161ce75738e9da0fee191aa34858607 Mon Sep 17 00:00:00 2001 From: Andrew Bartlett Date: Fri, 22 Mar 2013 16:19:27 +1100 Subject: scripting: Move get_diff_sds from samba.upgradehelpers to samba.descriptor This helps avoid a dependency loop when we use get_diff_sds in dbcheck. Andrew Bartlett Reviewed-by: Stefan Metzmacher --- python/samba/descriptor.py | 154 +++++++++++++++++++++++++++++++++ python/samba/tests/upgradeprovision.py | 4 +- python/samba/upgradehelpers.py | 153 -------------------------------- 3 files changed, 156 insertions(+), 155 deletions(-) (limited to 'python') diff --git a/python/samba/descriptor.py b/python/samba/descriptor.py index f9fb3c6643..4137bc3a12 100644 --- a/python/samba/descriptor.py +++ b/python/samba/descriptor.py @@ -30,6 +30,7 @@ from samba.dcerpc import security from samba.ndr import ndr_pack from samba.schema import get_schema_descriptor import ldb +import re # Descriptors of naming contexts and other important objects @@ -425,3 +426,156 @@ def get_wellknown_sds(samdb): subcontainers.append(c) return subcontainers + + +def chunck_acl(acl): + """Return separate ACE of an ACL + + :param acl: A string representing the ACL + :return: A hash with different parts + """ + + p = re.compile(r'(\w+)?(\(.*?\))') + tab = p.findall(acl) + + hash = {} + hash["aces"] = [] + for e in tab: + if len(e[0]) > 0: + hash["flags"] = e[0] + hash["aces"].append(e[1]) + + return hash + + +def chunck_sddl(sddl): + """ Return separate parts of the SDDL (owner, group, ...) + + :param sddl: An string containing the SDDL to chunk + :return: A hash with the different chunk + """ + + p = re.compile(r'([OGDS]:)(.*?)(?=(?:[GDS]:|$))') + tab = p.findall(sddl) + + hash = {} + for e in tab: + if e[0] == "O:": + hash["owner"] = e[1] + if e[0] == "G:": + hash["group"] = e[1] + if e[0] == "D:": + hash["dacl"] = e[1] + if e[0] == "S:": + hash["sacl"] = e[1] + + return hash + + +def get_clean_sd(sd): + """Get the SD without any inherited ACEs + + :param sd: SD to strip + :return: An SD with inherited ACEs stripped + """ + + sd_clean = security.descriptor() + sd_clean.owner_sid = sd.owner_sid + sd_clean.group_sid = sd.group_sid + sd_clean.type = sd.type + sd_clean.revision = sd.revision + + aces = [] + if sd.sacl is not None: + aces = sd.sacl.aces + for i in range(0, len(aces)): + ace = aces[i] + + if not ace.flags & security.SEC_ACE_FLAG_INHERITED_ACE: + sd_clean.sacl_add(ace) + continue + + aces = [] + if sd.dacl is not None: + aces = sd.dacl.aces + for i in range(0, len(aces)): + ace = aces[i] + + if not ace.flags & security.SEC_ACE_FLAG_INHERITED_ACE: + sd_clean.dacl_add(ace) + continue + return sd_clean + + +def get_diff_sds(refsd, cursd, domainsid, checkSacl = True): + """Get the difference between 2 sd + + This function split the textual representation of ACL into smaller + chunck in order to not to report a simple permutation as a difference + + :param refsddl: First sddl to compare + :param cursddl: Second sddl to compare + :param checkSacl: If false we skip the sacl checks + :return: A string that explain difference between sddls + """ + + cursddl = get_clean_sd(cursd).as_sddl(domainsid) + refsddl = get_clean_sd(refsd).as_sddl(domainsid) + + txt = "" + hash_cur = chunck_sddl(cursddl) + hash_ref = chunck_sddl(refsddl) + + if not hash_cur.has_key("owner"): + txt = "\tNo owner in current SD" + elif hash_cur["owner"] != hash_ref["owner"]: + txt = "\tOwner mismatch: %s (in ref) %s" \ + "(in current)\n" % (hash_ref["owner"], hash_cur["owner"]) + + if not hash_cur.has_key("group"): + txt = "%s\tNo group in current SD" % txt + elif hash_cur["group"] != hash_ref["group"]: + txt = "%s\tGroup mismatch: %s (in ref) %s" \ + "(in current)\n" % (txt, hash_ref["group"], hash_cur["group"]) + + parts = [ "dacl" ] + if checkSacl: + parts.append("sacl") + for part in parts: + if hash_cur.has_key(part) and hash_ref.has_key(part): + + # both are present, check if they contain the same ACE + h_cur = set() + h_ref = set() + c_cur = chunck_acl(hash_cur[part]) + c_ref = chunck_acl(hash_ref[part]) + + for elem in c_cur["aces"]: + h_cur.add(elem) + + for elem in c_ref["aces"]: + h_ref.add(elem) + + for k in set(h_ref): + if k in h_cur: + h_cur.remove(k) + h_ref.remove(k) + + if len(h_cur) + len(h_ref) > 0: + txt = "%s\tPart %s is different between reference" \ + " and current here is the detail:\n" % (txt, part) + + for item in h_cur: + txt = "%s\t\t%s ACE is not present in the" \ + " reference\n" % (txt, item) + + for item in h_ref: + txt = "%s\t\t%s ACE is not present in the" \ + " current\n" % (txt, item) + + elif hash_cur.has_key(part) and not hash_ref.has_key(part): + txt = "%s\tReference ACL hasn't a %s part\n" % (txt, part) + elif not hash_cur.has_key(part) and hash_ref.has_key(part): + txt = "%s\tCurrent ACL hasn't a %s part\n" % (txt, part) + + return txt diff --git a/python/samba/tests/upgradeprovision.py b/python/samba/tests/upgradeprovision.py index bc3509e530..d785bc1cb4 100644 --- a/python/samba/tests/upgradeprovision.py +++ b/python/samba/tests/upgradeprovision.py @@ -19,9 +19,9 @@ import os from samba.upgradehelpers import (usn_in_range, dn_sort, - get_diff_sds, update_secrets, + update_secrets, construct_existor_expr) - +from samba.descriptor import get_diff_sds from samba.tests.provision import create_dummy_secretsdb from samba.tests import TestCaseInTempDir from samba import Ldb diff --git a/python/samba/upgradehelpers.py b/python/samba/upgradehelpers.py index 13a369183a..04f1e82e61 100644 --- a/python/samba/upgradehelpers.py +++ b/python/samba/upgradehelpers.py @@ -302,159 +302,6 @@ def identic_rename(ldbobj, dn): ldbobj.rename(ldb.Dn(ldbobj, "%s=foo%s" % (before, after)), dn, ["relax:0"]) -def chunck_acl(acl): - """Return separate ACE of an ACL - - :param acl: A string representing the ACL - :return: A hash with different parts - """ - - p = re.compile(r'(\w+)?(\(.*?\))') - tab = p.findall(acl) - - hash = {} - hash["aces"] = [] - for e in tab: - if len(e[0]) > 0: - hash["flags"] = e[0] - hash["aces"].append(e[1]) - - return hash - - -def chunck_sddl(sddl): - """ Return separate parts of the SDDL (owner, group, ...) - - :param sddl: An string containing the SDDL to chunk - :return: A hash with the different chunk - """ - - p = re.compile(r'([OGDS]:)(.*?)(?=(?:[GDS]:|$))') - tab = p.findall(sddl) - - hash = {} - for e in tab: - if e[0] == "O:": - hash["owner"] = e[1] - if e[0] == "G:": - hash["group"] = e[1] - if e[0] == "D:": - hash["dacl"] = e[1] - if e[0] == "S:": - hash["sacl"] = e[1] - - return hash - - -def get_clean_sd(sd): - """Get the SD without any inherited ACEs - - :param sd: SD to strip - :return: An SD with inherited ACEs stripped - """ - - sd_clean = security.descriptor() - sd_clean.owner_sid = sd.owner_sid - sd_clean.group_sid = sd.group_sid - sd_clean.type = sd.type - sd_clean.revision = sd.revision - - aces = [] - if sd.sacl is not None: - aces = sd.sacl.aces - for i in range(0, len(aces)): - ace = aces[i] - - if not ace.flags & security.SEC_ACE_FLAG_INHERITED_ACE: - sd_clean.sacl_add(ace) - continue - - aces = [] - if sd.dacl is not None: - aces = sd.dacl.aces - for i in range(0, len(aces)): - ace = aces[i] - - if not ace.flags & security.SEC_ACE_FLAG_INHERITED_ACE: - sd_clean.dacl_add(ace) - continue - return sd_clean - - -def get_diff_sds(refsd, cursd, domainsid, checkSacl = True): - """Get the difference between 2 sd - - This function split the textual representation of ACL into smaller - chunck in order to not to report a simple permutation as a difference - - :param refsddl: First sddl to compare - :param cursddl: Second sddl to compare - :param checkSacl: If false we skip the sacl checks - :return: A string that explain difference between sddls - """ - - cursddl = get_clean_sd(cursd).as_sddl(domainsid) - refsddl = get_clean_sd(refsd).as_sddl(domainsid) - - txt = "" - hash_cur = chunck_sddl(cursddl) - hash_ref = chunck_sddl(refsddl) - - if not hash_cur.has_key("owner"): - txt = "\tNo owner in current SD" - elif hash_cur["owner"] != hash_ref["owner"]: - txt = "\tOwner mismatch: %s (in ref) %s" \ - "(in current)\n" % (hash_ref["owner"], hash_cur["owner"]) - - if not hash_cur.has_key("group"): - txt = "%s\tNo group in current SD" % txt - elif hash_cur["group"] != hash_ref["group"]: - txt = "%s\tGroup mismatch: %s (in ref) %s" \ - "(in current)\n" % (txt, hash_ref["group"], hash_cur["group"]) - - parts = [ "dacl" ] - if checkSacl: - parts.append("sacl") - for part in parts: - if hash_cur.has_key(part) and hash_ref.has_key(part): - - # both are present, check if they contain the same ACE - h_cur = set() - h_ref = set() - c_cur = chunck_acl(hash_cur[part]) - c_ref = chunck_acl(hash_ref[part]) - - for elem in c_cur["aces"]: - h_cur.add(elem) - - for elem in c_ref["aces"]: - h_ref.add(elem) - - for k in set(h_ref): - if k in h_cur: - h_cur.remove(k) - h_ref.remove(k) - - if len(h_cur) + len(h_ref) > 0: - txt = "%s\tPart %s is different between reference" \ - " and current here is the detail:\n" % (txt, part) - - for item in h_cur: - txt = "%s\t\t%s ACE is not present in the" \ - " reference\n" % (txt, item) - - for item in h_ref: - txt = "%s\t\t%s ACE is not present in the" \ - " current\n" % (txt, item) - - elif hash_cur.has_key(part) and not hash_ref.has_key(part): - txt = "%s\tReference ACL hasn't a %s part\n" % (txt, part) - elif not hash_cur.has_key(part) and hash_ref.has_key(part): - txt = "%s\tCurrent ACL hasn't a %s part\n" % (txt, part) - - return txt - - def update_secrets(newsecrets_ldb, secrets_ldb, messagefunc): """Update secrets.ldb -- cgit