diff options
author | Jelmer Vernooij <jelmer@samba.org> | 2007-11-21 11:47:55 +0100 |
---|---|---|
committer | Stefan Metzmacher <metze@samba.org> | 2007-12-21 05:45:51 +0100 |
commit | ea851658411e3ff03a906f7ae6afc7e9319d6f90 (patch) | |
tree | 76cbc3b9417c3668b9f33511272cc79d5b2bb8a3 /source4/lib/ldb | |
parent | 5256d93a458d0f653afa788bb8f8d894dd1a25b2 (diff) | |
download | samba-ea851658411e3ff03a906f7ae6afc7e9319d6f90.tar.gz samba-ea851658411e3ff03a906f7ae6afc7e9319d6f90.tar.bz2 samba-ea851658411e3ff03a906f7ae6afc7e9319d6f90.zip |
r26068: Import improved Python bindings for LDB, including tests.
(This used to be commit fc3a8caef749ddac56a4f035dde8b6ceeaa95c48)
Diffstat (limited to 'source4/lib/ldb')
-rw-r--r-- | source4/lib/ldb/config.mk | 7 | ||||
-rw-r--r-- | source4/lib/ldb/ldb.i (renamed from source4/lib/ldb/swig/ldb.i) | 0 | ||||
-rw-r--r-- | source4/lib/ldb/libldb.m4 | 32 | ||||
-rwxr-xr-x | source4/lib/ldb/setup.py | 2 | ||||
-rw-r--r-- | source4/lib/ldb/swig/Ldb.py | 178 | ||||
-rwxr-xr-x | source4/lib/ldb/tests/python/api.py | 353 |
6 files changed, 357 insertions, 215 deletions
diff --git a/source4/lib/ldb/config.mk b/source4/lib/ldb/config.mk index 6472612837..38f6c371a2 100644 --- a/source4/lib/ldb/config.mk +++ b/source4/lib/ldb/config.mk @@ -194,10 +194,9 @@ PRIVATE_DEPENDENCIES = \ ####################### # Start LIBRARY swig_ldb -[LIBRARY::swig_ldb] -PUBLIC_DEPENDENCIES = LIBLDB DYNCONFIG -LIBRARY_REALNAME = swig/_ldb.$(SHLIBEXT) -OBJ_FILES = swig/ldb_wrap.o +[PYTHON::swig_ldb] +PUBLIC_DEPENDENCIES = LIBLDB LIBPYTHON +SWIG_FILE = ldb.i # End LIBRARY swig_ldb ####################### diff --git a/source4/lib/ldb/swig/ldb.i b/source4/lib/ldb/ldb.i index cdf1d66de1..cdf1d66de1 100644 --- a/source4/lib/ldb/swig/ldb.i +++ b/source4/lib/ldb/ldb.i diff --git a/source4/lib/ldb/libldb.m4 b/source4/lib/ldb/libldb.m4 index fbef076f81..77ebcc5ff4 100644 --- a/source4/lib/ldb/libldb.m4 +++ b/source4/lib/ldb/libldb.m4 @@ -5,35 +5,3 @@ SMB_ENABLE(ldb_sqlite3, NO) if test x"$with_sqlite3_support" = x"yes"; then SMB_ENABLE(ldb_sqlite3, YES) fi - -AC_MSG_CHECKING([for Python]) - -PYTHON= - -AC_ARG_WITH(python, -[ --with-python=PYTHONNAME build Python libraries], -[ case "${withval-python}" in - yes) - PYTHON=python - ;; - no) - PYTHON= - ;; - *) - PYTHON=${withval-python} - ;; - esac ]) - -if test x"$PYTHON" != "x"; then - incdir=`python -c 'import sys; print "%s/include/python%d.%d" % (sys.prefix, sys.version_info[[0]], sys.version_info[[1]])'` - CPPFLAGS="$CPPFLAGS -I $incdir" -fi - -if test x"$PYTHON" != "x"; then - AC_MSG_RESULT([${withval-python}]) -else - AC_MSG_RESULT(no) - SMB_ENABLE(swig_ldb, NO) -fi - -AC_SUBST(PYTHON) diff --git a/source4/lib/ldb/setup.py b/source4/lib/ldb/setup.py index e5fcddf0db..da3623315e 100755 --- a/source4/lib/ldb/setup.py +++ b/source4/lib/ldb/setup.py @@ -3,6 +3,6 @@ from distutils.core import setup from distutils.extension import Extension setup(name='ldb', version='1.0', - ext_modules=[Extension('_ldb', ['swig/ldb.i'], include_dirs=['include'], + ext_modules=[Extension('_ldb', ['ldb.i'], include_dirs=['include'], libraries=['ldb','ldap'])], ) diff --git a/source4/lib/ldb/swig/Ldb.py b/source4/lib/ldb/swig/Ldb.py deleted file mode 100644 index 4be3eec704..0000000000 --- a/source4/lib/ldb/swig/Ldb.py +++ /dev/null @@ -1,178 +0,0 @@ -"""Provide a more Pythonic and object-oriented interface to ldb.""" - -# -# Swig interface to Samba -# -# Copyright (C) Tim Potter 2006 -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, see <http://www.gnu.org/licenses/>. -# - -# -# Interface notes: -# -# - should an empty dn be represented as None, or an empty string? -# -# - should single-valued attributes be a string, or a list with one -# element? -# - -from ldb import * - -# Global initialisation - -result = ldb_global_init() - -if result != 0: - raise LdbError, (result, 'ldb_global_init failed') - -# Ldb exceptions - -class LdbError(Exception): - """An exception raised when a ldb error occurs. - The exception data is a tuple consisting of the ldb number and a - string description of the error.""" - pass - -# Ldb classes - -class LdbMessage: - """A class representing a ldb message as a Python dictionary.""" - - def __init__(self): - self.mem_ctx = talloc_init(None) - self.msg = ldb_msg_new(self.mem_ctx) - - def __del__(self): - if self.mem_ctx is not None: - talloc_free(self.mem_ctx) - self.mem_ctx = None - self.msg = None - - # Make the dn attribute of the object dynamic - - def __getattr__(self, attr): - if attr == 'dn': - return ldb_dn_linearize(None, self.msg.dn) - return self.__dict__[attr] - - def __setattr__(self, attr, value): - if attr == 'dn': - self.msg.dn = ldb_dn_explode(self.msg, value) - if self.msg.dn == None: - err = LDB_ERR_INVALID_DN_SYNTAX - raise LdbError(err, ldb_strerror(err)) - return - self.__dict__[attr] = value - - # Get and set individual elements - - def __getitem__(self, key): - - elt = ldb_msg_find_element(self.msg, key) - - if elt is None: - raise KeyError, "No such attribute '%s'" % key - - return [ldb_val_array_getitem(elt.values, i) - for i in range(elt.num_values)] - - def __setitem__(self, key, value): - ldb_msg_remove_attr(self.msg, key) - if type(value) in (list, tuple): - [ldb_msg_add_value(self.msg, key, v) for v in value] - else: - ldb_msg_add_value(self.msg, key, value) - - # Dictionary interface - # TODO: move to iterator based interface - - def len(self): - return self.msg.num_elements - - def keys(self): - return [ldb_message_element_array_getitem(self.msg.elements, i).name - for i in range(self.msg.num_elements)] - - def values(self): - return [self[k] for k in self.keys()] - - def items(self): - return [(k, self[k]) for k in self.keys()] - - # Misc stuff - - def sanity_check(self): - return ldb_msg_sanity_check(self.msg) - -class Ldb: - """A class representing a binding to a ldb file.""" - - def __init__(self, url, flags = 0): - """Initialise underlying ldb.""" - - self.mem_ctx = talloc_init('mem_ctx for ldb 0x%x' % id(self)) - self.ldb_ctx = ldb_init(self.mem_ctx) - - result = ldb_connect(self.ldb_ctx, url, flags, None) - - if result != LDB_SUCCESS: - raise LdbError, (result, ldb_strerror(result)) - - def __del__(self): - """Called when the object is to be garbage collected.""" - self.close() - - def close(self): - """Close down a ldb.""" - if self.mem_ctx is not None: - talloc_free(self.mem_ctx) - self.mem_ctx = None - self.ldb_ctx = None - - def _ldb_call(self, fn, *args): - """Call a ldb function with args. Raise a LdbError exception - if the function returns a non-zero return value.""" - - result = fn(*args) - - if result != LDB_SUCCESS: - raise LdbError, (result, ldb_strerror(result)) - - def search(self, expression): - """Search a ldb for a given expression.""" - - self._ldb_call(ldb_search, self.ldb_ctx, None, LDB_SCOPE_DEFAULT, - expression, None); - - return [LdbMessage(ldb_message_ptr_array_getitem(result.msgs, ndx)) - for ndx in range(result.count)] - - def delete(self, dn): - """Delete a dn.""" - - _dn = ldb_dn_explode(self.ldb_ctx, dn) - - self._ldb_call(ldb_delete, self.ldb_ctx, _dn) - - def rename(self, olddn, newdn): - """Rename a dn.""" - - _olddn = ldb_dn_explode(self.ldb_ctx, olddn) - _newdn = ldb_dn_explode(self.ldb_ctx, newdn) - - self._ldb_call(ldb_rename, self.ldb_ctx, _olddn, _newdn) - - def add(self, m): - self._ldb_call(ldb_add, self.ldb_ctx, m.msg) diff --git a/source4/lib/ldb/tests/python/api.py b/source4/lib/ldb/tests/python/api.py new file mode 100755 index 0000000000..b140bfc599 --- /dev/null +++ b/source4/lib/ldb/tests/python/api.py @@ -0,0 +1,353 @@ +#!/usr/bin/python +# Simple tests for the ldb python API +# Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org> +import sys +import unittest +sys.path.append("swig") +sys.path.append("build/lib.linux-i686-2.4") + +import ldb + +class NoContextTests(unittest.TestCase): + def test_valid_attr_name(self): + self.assertTrue(ldb.valid_attr_name("foo")) + self.assertFalse(ldb.valid_attr_name("24foo")) + + def test_timestring(self): + self.assertEquals("19700101000000.0Z", ldb.timestring(0)) + self.assertEquals("20071119191012.0Z", ldb.timestring(1195499412)) + + def test_string_to_time(self): + self.assertEquals(0, ldb.string_to_time("19700101000000.0Z")) + self.assertEquals(1195499412, ldb.string_to_time("20071119191012.0Z")) + + +class SimpleLdb(unittest.TestCase): + def test_connect(self): + ldb.Ldb("foo.tdb") + + def test_connect_none(self): + ldb.Ldb() + + def test_connect_later(self): + x = ldb.Ldb() + x.connect("foo.tdb") + + def test_set_create_perms(self): + x = ldb.Ldb() + x.set_create_perms(0600) + + def test_set_modules_dir(self): + x = ldb.Ldb() + x.set_modules_dir("/tmp") + + def test_search(self): + l = ldb.Ldb("foo.tdb") + self.assertEquals(len(l.search()), 1) + + def test_search_attrs(self): + l = ldb.Ldb("foo.tdb") + self.assertEquals(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0) + + def test_opaque(self): + l = ldb.Ldb("foo.tdb") + l.set_opaque("my_opaque", l) + self.assertTrue(l.get_opaque("my_opaque") is not None) + self.assertEquals(None, l.get_opaque("unknown")) + + def test_search_scope_base(self): + l = ldb.Ldb("foo.tdb") + self.assertEquals(len(l.search(ldb.Dn(l, "dc=foo"), + ldb.SCOPE_ONELEVEL)), 0) + + def test_delete(self): + l = ldb.Ldb("foo.tdb") + self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo"))) + + def test_contains(self): + l = ldb.Ldb("foo.tdb") + self.assertFalse(ldb.Dn(l, "dc=foo") in l) + l = ldb.Ldb("foo.tdb") + m = ldb.Message() + m.dn = ldb.Dn(l, "dc=foo") + m["b"] = ["a"] + l.add(m) + try: + self.assertTrue(ldb.Dn(l, "dc=foo") in l) + finally: + l.delete(m.dn) + + def test_get_config_basedn(self): + l = ldb.Ldb("foo.tdb") + self.assertEquals(None, l.get_config_basedn()) + + def test_get_root_basedn(self): + l = ldb.Ldb("foo.tdb") + self.assertEquals(None, l.get_root_basedn()) + + def test_get_schema_basedn(self): + l = ldb.Ldb("foo.tdb") + self.assertEquals(None, l.get_schema_basedn()) + + def test_get_default_basedn(self): + l = ldb.Ldb("foo.tdb") + self.assertEquals(None, l.get_default_basedn()) + + def test_add(self): + l = ldb.Ldb("foo.tdb") + m = ldb.Message() + m.dn = ldb.Dn(l, "dc=foo") + m["bla"] = "bla" + self.assertEquals(len(l.search()), 1) + l.add(m) + try: + self.assertEquals(len(l.search()), 2) + finally: + l.delete(ldb.Dn(l, "dc=foo")) + + def test_add_dict(self): + l = ldb.Ldb("foo.tdb") + m = {"dn": ldb.Dn(l, "dc=foo"), + "bla": "bla"} + self.assertEquals(len(l.search()), 1) + l.add(m) + try: + self.assertEquals(len(l.search()), 2) + finally: + l.delete(ldb.Dn(l, "dc=foo")) + + def test_rename(self): + l = ldb.Ldb("foo.tdb") + m = ldb.Message() + m.dn = ldb.Dn(l, "dc=foo") + m["bla"] = "bla" + self.assertEquals(len(l.search()), 1) + l.add(m) + try: + l.rename(ldb.Dn(l, "dc=foo"), ldb.Dn(l, "dc=bar")) + self.assertEquals(len(l.search()), 2) + finally: + l.delete(ldb.Dn(l, "dc=bar")) + + def test_modify_delete(self): + l = ldb.Ldb("foo.tdb") + m = ldb.Message() + m.dn = ldb.Dn(l, "dc=modify") + m["bla"] = ["1234"] + l.add(m) + rm = l.search(m.dn)[0] + self.assertEquals(["1234"], list(rm["bla"])) + try: + m = ldb.Message() + m.dn = ldb.Dn(l, "dc=modify") + m["bla"] = ldb.MessageElement([], ldb.CHANGETYPE_DELETE, "bla") + l.modify(m) + rm = l.search(m.dn)[0] + self.assertEquals(1, len(rm)) + finally: + l.delete(ldb.Dn(l, "dc=modify")) + + def test_modify_add(self): + l = ldb.Ldb("foo.tdb") + m = ldb.Message() + m.dn = ldb.Dn(l, "dc=modify") + m["bla"] = ["1234"] + l.add(m) + try: + m = ldb.Message() + m.dn = ldb.Dn(l, "dc=modify") + m["bla"] = ldb.MessageElement(["456"], ldb.CHANGETYPE_ADD, "bla") + l.modify(m) + rm = l.search(m.dn)[0] + self.assertEquals(2, len(rm)) + self.assertEquals(["1234", "456"], list(rm["bla"])) + finally: + l.delete(ldb.Dn(l, "dc=modify")) + + def test_modify_modify(self): + l = ldb.Ldb("foo.tdb") + m = ldb.Message() + m.dn = ldb.Dn(l, "dc=modify") + m["bla"] = ["1234", "456"] + l.add(m) + try: + m = ldb.Message() + m.dn = ldb.Dn(l, "dc=modify") + m["bla"] = ldb.MessageElement(["456"], ldb.CHANGETYPE_MODIFY, "bla") + l.modify(m) + rm = l.search(m.dn)[0] + self.assertEquals(2, len(rm)) + self.assertEquals(["1234"], list(rm["bla"])) + finally: + l.delete(ldb.Dn(l, "dc=modify")) + + def test_transaction_commit(self): + l = ldb.Ldb("foo.tdb") + l.transaction_start() + m = ldb.Message(ldb.Dn(l, "dc=foo")) + m["foo"] = ["bar"] + l.add(m) + l.transaction_commit() + l.delete(m.dn) + + def test_transaction_cancel(self): + l = ldb.Ldb("foo.tdb") + l.transaction_start() + m = ldb.Message(ldb.Dn(l, "dc=foo")) + m["foo"] = ["bar"] + l.add(m) + l.transaction_cancel() + self.assertEquals(0, len(l.search(ldb.Dn(l, "dc=foo")))) + + def test_set_debug(self): + def my_report_fn(level, text): + pass + l = ldb.Ldb("foo.tdb") + l.set_debug(my_report_fn) + + +class DnTests(unittest.TestCase): + def setUp(self): + self.ldb = ldb.Ldb("foo.tdb") + + def test_str(self): + x = ldb.Dn(self.ldb, "dc=foo,bar=bloe") + self.assertEquals(x.__str__(), "dc=foo,bar=bloe") + + def test_get_casefold(self): + x = ldb.Dn(self.ldb, "dc=foo,bar=bloe") + self.assertEquals(x.get_casefold(), "DC=FOO,BAR=bloe") + + def test_validate(self): + x = ldb.Dn(self.ldb, "dc=foo,bar=bloe") + self.assertTrue(x.validate()) + + def test_parent(self): + x = ldb.Dn(self.ldb, "dc=foo,bar=bloe") + self.assertEquals("bar=bloe", x.parent().__str__()) + + def test_compare(self): + x = ldb.Dn(self.ldb, "dc=foo,bar=bloe") + y = ldb.Dn(self.ldb, "dc=foo,bar=bloe") + self.assertEquals(x, y) + z = ldb.Dn(self.ldb, "dc=foo,bar=blie") + self.assertNotEquals(z, y) + + def test_is_valid(self): + x = ldb.Dn(self.ldb, "dc=foo,dc=bloe") + self.assertTrue(x.is_valid()) + x = ldb.Dn(self.ldb, "") + # is_valid()'s return values appears to be a side effect of + # some other ldb functions. yuck. + # self.assertFalse(x.is_valid()) + + def test_is_special(self): + x = ldb.Dn(self.ldb, "dc=foo,bar=bloe") + self.assertFalse(x.is_special()) + x = ldb.Dn(self.ldb, "@FOOBAR") + self.assertTrue(x.is_special()) + + def test_check_special(self): + x = ldb.Dn(self.ldb, "dc=foo,bar=bloe") + self.assertFalse(x.check_special("FOOBAR")) + x = ldb.Dn(self.ldb, "@FOOBAR") + self.assertTrue(x.check_special("@FOOBAR")) + + def test_len(self): + x = ldb.Dn(self.ldb, "dc=foo,bar=bloe") + self.assertEquals(2, len(x)) + x = ldb.Dn(self.ldb, "dc=foo") + self.assertEquals(1, len(x)) + + def test_add_child(self): + x = ldb.Dn(self.ldb, "dc=foo,bar=bloe") + self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe"))) + self.assertEquals("bla=bloe,dc=foo,bar=bloe", x.__str__()) + + def test_add_base(self): + x = ldb.Dn(self.ldb, "dc=foo,bar=bloe") + self.assertTrue(x.add_base(ldb.Dn(self.ldb, "bla=bloe"))) + self.assertEquals("dc=foo,bar=bloe,bla=bloe", x.__str__()) + + def test_add(self): + x = ldb.Dn(self.ldb, "dc=foo") + y = ldb.Dn(self.ldb, "bar=bla") + self.assertEquals("dc=foo,bar=bla", str(y + x)) + + def test_parse_ldif(self): + msgs = self.ldb.parse_ldif("dn: foo=bar\n") + msg = msgs.next() + self.assertEquals("foo=bar", str(msg[1].dn)) + self.assertTrue(isinstance(msg[1], ldb.Message)) + + def test_parse_ldif_more(self): + msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar") + msg = msgs.next() + self.assertEquals("foo=bar", str(msg[1].dn)) + msg = msgs.next() + self.assertEquals("bar=bar", str(msg[1].dn)) + + def test_canonical_string(self): + x = ldb.Dn(self.ldb, "dc=foo,bar=bloe") + self.assertEquals("/bloe/foo", x.canonical_str()) + + def test_canonical_ex_string(self): + x = ldb.Dn(self.ldb, "dc=foo,bar=bloe") + self.assertEquals("/bloe\nfoo", x.canonical_ex_str()) + + +class LdbMsgTests(unittest.TestCase): + def setUp(self): + self.msg = ldb.Message() + + def test_init_dn(self): + self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo")) + self.assertEquals("dc=foo", str(self.msg.dn)) + + def test_len(self): + self.assertEquals(0, len(self.msg)) + + def test_notpresent(self): + self.assertRaises(KeyError, lambda: self.msg["foo"]) + + def test_del(self): + del self.msg["foo"] + + def test_add_value(self): + self.assertEquals(0, len(self.msg)) + self.msg["foo"] = ["foo"] + self.assertEquals(1, len(self.msg)) + + def test_add_value_multiple(self): + self.assertEquals(0, len(self.msg)) + self.msg["foo"] = ["foo", "bla"] + self.assertEquals(1, len(self.msg)) + self.assertEquals(["foo", "bla"], list(self.msg["foo"])) + + def test_set_value(self): + self.msg["foo"] = ["fool"] + self.assertEquals(["fool"], list(self.msg["foo"])) + self.msg["foo"] = ["bar"] + self.assertEquals(["bar"], list(self.msg["foo"])) + + def test_keys(self): + self.msg["foo"] = ["bla"] + self.msg["bar"] = ["bla"] + self.assertEquals(["foo", "bar"], self.msg.keys()) + + def test_dn(self): + self.msg.dn = ldb.Dn(ldb.Ldb("foo.tdb"), "@BASEINFO") + self.assertEquals("@BASEINFO", self.msg.dn.__str__()) + + +class MessageElementTests(unittest.TestCase): + def test_cmp_element(self): + x = ldb.MessageElement(["foo"]) + y = ldb.MessageElement(["foo"]) + z = ldb.MessageElement(["bzr"]) + self.assertEquals(x, y) + self.assertNotEquals(x, z) + + def test_create_iterable(self): + x = ldb.MessageElement(["foo"]) + self.assertEquals(["foo"], list(x)) |