summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthieu Patou <mat@matws.net>2010-06-22 20:03:15 +0400
committerAndrew Bartlett <abartlet@samba.org>2010-07-15 22:08:20 +1000
commitf97c90c9cd124314b4a0862e702dd021bd2df9a0 (patch)
tree42b8c04f5eb77888f2d16037608be569fcd32c5f
parent6a0856da9cc075acaa7fcb6bad614f8f403df9c7 (diff)
downloadsamba-f97c90c9cd124314b4a0862e702dd021bd2df9a0.tar.gz
samba-f97c90c9cd124314b4a0862e702dd021bd2df9a0.tar.bz2
samba-f97c90c9cd124314b4a0862e702dd021bd2df9a0.zip
s4 python: Add functions to samdb to manipulate version of replPropertyMetaData attribute
This change contains also helpers for attribute id to attribute oid conversion and from attribute id to attribute name. It brings also unit tests Signed-off-by: Andrew Bartlett <abartlet@samba.org>
-rw-r--r--source4/scripting/python/samba/samdb.py103
-rw-r--r--source4/scripting/python/samba/tests/dsdb.py26
2 files changed, 128 insertions, 1 deletions
diff --git a/source4/scripting/python/samba/samdb.py b/source4/scripting/python/samba/samdb.py
index f810926710..f358747b51 100644
--- a/source4/scripting/python/samba/samdb.py
+++ b/source4/scripting/python/samba/samdb.py
@@ -28,12 +28,16 @@ import samba
import ldb
import time
import base64
+from samba.ndr import ndr_unpack, ndr_pack
+from samba.dcerpc import drsblobs, misc
__docformat__ = "restructuredText"
class SamDB(samba.Ldb):
"""The SAM database."""
+ hash_oid_name = {}
+
def __init__(self, url=None, lp=None, modules_dir=None, session_info=None,
credentials=None, flags=0, options=None, global_schema=True,
auto_connect=True, am_rodc=False):
@@ -470,5 +474,104 @@ accountExpires: %u
def set_schema_from_ldb(self, ldb):
dsdb._dsdb_set_schema_from_ldb(self, ldb)
+ def get_attribute_from_attid(self, attid):
+ """ Get from an attid the associated attribute
+
+ :param attid: The attribute id for searched attribute
+ :return: The name of the attribute associated with this id
+ """
+ if len(self.hash_oid_name.keys()) == 0:
+ self._populate_oid_attid()
+ if self.hash_oid_name.has_key(self.get_oid_from_attid(attid)):
+ return self.hash_oid_name[self.get_oid_from_attid(attid)]
+ else:
+ return None
+
+
+ def _populate_oid_attid(self):
+ """Populate the hash hash_oid_name
+
+ This hash contains the oid of the attribute as a key and
+ its display name as a value
+ """
+ self.hash_oid_name = {}
+ res = self.search(expression="objectClass=attributeSchema",
+ controls=["search_options:1:2"],
+ attrs=["attributeID",
+ "lDAPDisplayName"])
+ if len(res) > 0:
+ for e in res:
+ strDisplay = str(e.get("lDAPDisplayName"))
+ self.hash_oid_name[str(e.get("attributeID"))] = strDisplay
+
+
+ def get_attribute_replmetadata_version(self, dn, att):
+ """ Get the version field trom the replPropertyMetaData for
+ the given field
+
+ :param dn: The on which we want to get the version
+ :param att: The name of the attribute
+ :return: The value of the version field in the replPropertyMetaData
+ for the given attribute. None if the attribute is not replicated
+ """
+
+ res = self.search(expression="dn=%s" % dn,
+ scope=ldb.SCOPE_SUBTREE,
+ controls=["search_options:1:2"],
+ attrs=["replPropertyMetaData"])
+ if len(res) == 0:
+ return None
+
+ repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
+ str(res[0]["replPropertyMetaData"]))
+ ctr = repl.ctr
+ if len(self.hash_oid_name.keys()) == 0:
+ self._populate_oid_attid()
+ for o in ctr.array:
+ # Search for Description
+ att_oid = self.get_oid_from_attid(o.attid)
+ if self.hash_oid_name.has_key(att_oid) and\
+ att.lower() == self.hash_oid_name[att_oid].lower():
+ return o.version
+ return None
+
+
+ def set_attribute_replmetadata_version(self, dn, att, value):
+ res = self.search(expression="dn=%s" % dn,
+ scope=ldb.SCOPE_SUBTREE,
+ controls=["search_options:1:2"],
+ attrs=["replPropertyMetaData"])
+ if len(res) == 0:
+ return None
+
+ repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
+ str(res[0]["replPropertyMetaData"]))
+ ctr = repl.ctr
+ now = samba.unix2nttime(int(time.time()))
+ found = False
+ if len(self.hash_oid_name.keys()) == 0:
+ self._populate_oid_attid()
+ for o in ctr.array:
+ # Search for Description
+ att_oid = self.get_oid_from_attid(o.attid)
+ if self.hash_oid_name.has_key(att_oid) and\
+ att.lower() == self.hash_oid_name[att_oid].lower():
+ found = True
+ seq = self.sequence_number(ldb.SEQ_NEXT)
+ o.version = value
+ o.originating_change_time = now
+ o.originating_invocation_id = misc.GUID(self.get_invocation_id())
+ o.originating_usn = seq
+ o.local_usn = seq
+ if found :
+ replBlob = ndr_pack(repl)
+ msg = ldb.Message()
+ msg.dn = res[0].dn
+ msg["replPropertyMetaData"] = ldb.MessageElement(replBlob,
+ ldb.FLAG_MOD_REPLACE,
+ "replPropertyMetaData")
+ self.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
+
+
def write_prefixes_from_schema(self):
dsdb._dsdb_write_prefixes_from_schema_to_ldb(self)
diff --git a/source4/scripting/python/samba/tests/dsdb.py b/source4/scripting/python/samba/tests/dsdb.py
index d28be82984..4a50c96e25 100644
--- a/source4/scripting/python/samba/tests/dsdb.py
+++ b/source4/scripting/python/samba/tests/dsdb.py
@@ -25,7 +25,7 @@ from samba.ndr import ndr_unpack, ndr_pack
from samba.dcerpc import drsblobs
import ldb
import os
-import samba.dsdb
+import samba
class DsdbTests(TestCase):
@@ -107,3 +107,27 @@ class DsdbTests(TestCase):
msg.dn = res[0].dn
msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
+
+ def test_ok_get_attribute_from_attid(self):
+ self.assertEquals(self.samdb.get_attribute_from_attid(13), "description")
+
+ def test_ko_get_attribute_from_attid(self):
+ self.assertEquals(self.samdb.get_attribute_from_attid(11979), None)
+
+ def test_get_attribute_replmetadata_version(self):
+ res = self.samdb.search(expression="cn=Administrator",
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=["dn"])
+ self.assertEquals(len(res), 1)
+ dn = str(res[0].dn)
+ self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 1)
+
+ def test_set_attribute_replmetadata_version(self):
+ res = self.samdb.search(expression="cn=Administrator",
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=["dn"])
+ self.assertEquals(len(res), 1)
+ dn = str(res[0].dn)
+ version = self.samdb.get_attribute_replmetadata_version(dn, "description")
+ self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2)
+ self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2)