diff options
author | Matthieu Patou <mat@matws.net> | 2010-06-22 20:03:15 +0400 |
---|---|---|
committer | Andrew Bartlett <abartlet@samba.org> | 2010-07-15 22:08:20 +1000 |
commit | f97c90c9cd124314b4a0862e702dd021bd2df9a0 (patch) | |
tree | 42b8c04f5eb77888f2d16037608be569fcd32c5f /source4/scripting/python | |
parent | 6a0856da9cc075acaa7fcb6bad614f8f403df9c7 (diff) | |
download | samba-f97c90c9cd124314b4a0862e702dd021bd2df9a0.tar.gz samba-f97c90c9cd124314b4a0862e702dd021bd2df9a0.tar.bz2 samba-f97c90c9cd124314b4a0862e702dd021bd2df9a0.zip |
s4 python: Add functions to samdb to manipulate version of replPropertyMetaData attribute
This change contains also helpers for attribute id to attribute oid
conversion and from attribute id to attribute name.
It brings also unit tests
Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Diffstat (limited to 'source4/scripting/python')
-rw-r--r-- | source4/scripting/python/samba/samdb.py | 103 | ||||
-rw-r--r-- | source4/scripting/python/samba/tests/dsdb.py | 26 |
2 files changed, 128 insertions, 1 deletions
diff --git a/source4/scripting/python/samba/samdb.py b/source4/scripting/python/samba/samdb.py index f810926710..f358747b51 100644 --- a/source4/scripting/python/samba/samdb.py +++ b/source4/scripting/python/samba/samdb.py @@ -28,12 +28,16 @@ import samba import ldb import time import base64 +from samba.ndr import ndr_unpack, ndr_pack +from samba.dcerpc import drsblobs, misc __docformat__ = "restructuredText" class SamDB(samba.Ldb): """The SAM database.""" + hash_oid_name = {} + def __init__(self, url=None, lp=None, modules_dir=None, session_info=None, credentials=None, flags=0, options=None, global_schema=True, auto_connect=True, am_rodc=False): @@ -470,5 +474,104 @@ accountExpires: %u def set_schema_from_ldb(self, ldb): dsdb._dsdb_set_schema_from_ldb(self, ldb) + def get_attribute_from_attid(self, attid): + """ Get from an attid the associated attribute + + :param attid: The attribute id for searched attribute + :return: The name of the attribute associated with this id + """ + if len(self.hash_oid_name.keys()) == 0: + self._populate_oid_attid() + if self.hash_oid_name.has_key(self.get_oid_from_attid(attid)): + return self.hash_oid_name[self.get_oid_from_attid(attid)] + else: + return None + + + def _populate_oid_attid(self): + """Populate the hash hash_oid_name + + This hash contains the oid of the attribute as a key and + its display name as a value + """ + self.hash_oid_name = {} + res = self.search(expression="objectClass=attributeSchema", + controls=["search_options:1:2"], + attrs=["attributeID", + "lDAPDisplayName"]) + if len(res) > 0: + for e in res: + strDisplay = str(e.get("lDAPDisplayName")) + self.hash_oid_name[str(e.get("attributeID"))] = strDisplay + + + def get_attribute_replmetadata_version(self, dn, att): + """ Get the version field trom the replPropertyMetaData for + the given field + + :param dn: The on which we want to get the version + :param att: The name of the attribute + :return: The value of the version field in the replPropertyMetaData + for the given attribute. None if the attribute is not replicated + """ + + res = self.search(expression="dn=%s" % dn, + scope=ldb.SCOPE_SUBTREE, + controls=["search_options:1:2"], + attrs=["replPropertyMetaData"]) + if len(res) == 0: + return None + + repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + str(res[0]["replPropertyMetaData"])) + ctr = repl.ctr + if len(self.hash_oid_name.keys()) == 0: + self._populate_oid_attid() + for o in ctr.array: + # Search for Description + att_oid = self.get_oid_from_attid(o.attid) + if self.hash_oid_name.has_key(att_oid) and\ + att.lower() == self.hash_oid_name[att_oid].lower(): + return o.version + return None + + + def set_attribute_replmetadata_version(self, dn, att, value): + res = self.search(expression="dn=%s" % dn, + scope=ldb.SCOPE_SUBTREE, + controls=["search_options:1:2"], + attrs=["replPropertyMetaData"]) + if len(res) == 0: + return None + + repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + str(res[0]["replPropertyMetaData"])) + ctr = repl.ctr + now = samba.unix2nttime(int(time.time())) + found = False + if len(self.hash_oid_name.keys()) == 0: + self._populate_oid_attid() + for o in ctr.array: + # Search for Description + att_oid = self.get_oid_from_attid(o.attid) + if self.hash_oid_name.has_key(att_oid) and\ + att.lower() == self.hash_oid_name[att_oid].lower(): + found = True + seq = self.sequence_number(ldb.SEQ_NEXT) + o.version = value + o.originating_change_time = now + o.originating_invocation_id = misc.GUID(self.get_invocation_id()) + o.originating_usn = seq + o.local_usn = seq + if found : + replBlob = ndr_pack(repl) + msg = ldb.Message() + msg.dn = res[0].dn + msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, + ldb.FLAG_MOD_REPLACE, + "replPropertyMetaData") + self.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) + + def write_prefixes_from_schema(self): dsdb._dsdb_write_prefixes_from_schema_to_ldb(self) diff --git a/source4/scripting/python/samba/tests/dsdb.py b/source4/scripting/python/samba/tests/dsdb.py index d28be82984..4a50c96e25 100644 --- a/source4/scripting/python/samba/tests/dsdb.py +++ b/source4/scripting/python/samba/tests/dsdb.py @@ -25,7 +25,7 @@ from samba.ndr import ndr_unpack, ndr_pack from samba.dcerpc import drsblobs import ldb import os -import samba.dsdb +import samba class DsdbTests(TestCase): @@ -107,3 +107,27 @@ class DsdbTests(TestCase): msg.dn = res[0].dn msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) + + def test_ok_get_attribute_from_attid(self): + self.assertEquals(self.samdb.get_attribute_from_attid(13), "description") + + def test_ko_get_attribute_from_attid(self): + self.assertEquals(self.samdb.get_attribute_from_attid(11979), None) + + def test_get_attribute_replmetadata_version(self): + res = self.samdb.search(expression="cn=Administrator", + scope=ldb.SCOPE_SUBTREE, + attrs=["dn"]) + self.assertEquals(len(res), 1) + dn = str(res[0].dn) + self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 1) + + def test_set_attribute_replmetadata_version(self): + res = self.samdb.search(expression="cn=Administrator", + scope=ldb.SCOPE_SUBTREE, + attrs=["dn"]) + self.assertEquals(len(res), 1) + dn = str(res[0].dn) + version = self.samdb.get_attribute_replmetadata_version(dn, "description") + self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2) + self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2) |