summaryrefslogtreecommitdiff
path: root/source4/lib/ldb
diff options
context:
space:
mode:
Diffstat (limited to 'source4/lib/ldb')
-rw-r--r--source4/lib/ldb/config.mk7
-rw-r--r--source4/lib/ldb/ldb.i (renamed from source4/lib/ldb/swig/ldb.i)0
-rw-r--r--source4/lib/ldb/libldb.m432
-rwxr-xr-xsource4/lib/ldb/setup.py2
-rw-r--r--source4/lib/ldb/swig/Ldb.py178
-rwxr-xr-xsource4/lib/ldb/tests/python/api.py353
6 files changed, 357 insertions, 215 deletions
diff --git a/source4/lib/ldb/config.mk b/source4/lib/ldb/config.mk
index 6472612837..38f6c371a2 100644
--- a/source4/lib/ldb/config.mk
+++ b/source4/lib/ldb/config.mk
@@ -194,10 +194,9 @@ PRIVATE_DEPENDENCIES = \
#######################
# Start LIBRARY swig_ldb
-[LIBRARY::swig_ldb]
-PUBLIC_DEPENDENCIES = LIBLDB DYNCONFIG
-LIBRARY_REALNAME = swig/_ldb.$(SHLIBEXT)
-OBJ_FILES = swig/ldb_wrap.o
+[PYTHON::swig_ldb]
+PUBLIC_DEPENDENCIES = LIBLDB LIBPYTHON
+SWIG_FILE = ldb.i
# End LIBRARY swig_ldb
#######################
diff --git a/source4/lib/ldb/swig/ldb.i b/source4/lib/ldb/ldb.i
index cdf1d66de1..cdf1d66de1 100644
--- a/source4/lib/ldb/swig/ldb.i
+++ b/source4/lib/ldb/ldb.i
diff --git a/source4/lib/ldb/libldb.m4 b/source4/lib/ldb/libldb.m4
index fbef076f81..77ebcc5ff4 100644
--- a/source4/lib/ldb/libldb.m4
+++ b/source4/lib/ldb/libldb.m4
@@ -5,35 +5,3 @@ SMB_ENABLE(ldb_sqlite3, NO)
if test x"$with_sqlite3_support" = x"yes"; then
SMB_ENABLE(ldb_sqlite3, YES)
fi
-
-AC_MSG_CHECKING([for Python])
-
-PYTHON=
-
-AC_ARG_WITH(python,
-[ --with-python=PYTHONNAME build Python libraries],
-[ case "${withval-python}" in
- yes)
- PYTHON=python
- ;;
- no)
- PYTHON=
- ;;
- *)
- PYTHON=${withval-python}
- ;;
- esac ])
-
-if test x"$PYTHON" != "x"; then
- incdir=`python -c 'import sys; print "%s/include/python%d.%d" % (sys.prefix, sys.version_info[[0]], sys.version_info[[1]])'`
- CPPFLAGS="$CPPFLAGS -I $incdir"
-fi
-
-if test x"$PYTHON" != "x"; then
- AC_MSG_RESULT([${withval-python}])
-else
- AC_MSG_RESULT(no)
- SMB_ENABLE(swig_ldb, NO)
-fi
-
-AC_SUBST(PYTHON)
diff --git a/source4/lib/ldb/setup.py b/source4/lib/ldb/setup.py
index e5fcddf0db..da3623315e 100755
--- a/source4/lib/ldb/setup.py
+++ b/source4/lib/ldb/setup.py
@@ -3,6 +3,6 @@ from distutils.core import setup
from distutils.extension import Extension
setup(name='ldb',
version='1.0',
- ext_modules=[Extension('_ldb', ['swig/ldb.i'], include_dirs=['include'],
+ ext_modules=[Extension('_ldb', ['ldb.i'], include_dirs=['include'],
libraries=['ldb','ldap'])],
)
diff --git a/source4/lib/ldb/swig/Ldb.py b/source4/lib/ldb/swig/Ldb.py
deleted file mode 100644
index 4be3eec704..0000000000
--- a/source4/lib/ldb/swig/Ldb.py
+++ /dev/null
@@ -1,178 +0,0 @@
-"""Provide a more Pythonic and object-oriented interface to ldb."""
-
-#
-# Swig interface to Samba
-#
-# Copyright (C) Tim Potter 2006
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, see <http://www.gnu.org/licenses/>.
-#
-
-#
-# Interface notes:
-#
-# - should an empty dn be represented as None, or an empty string?
-#
-# - should single-valued attributes be a string, or a list with one
-# element?
-#
-
-from ldb import *
-
-# Global initialisation
-
-result = ldb_global_init()
-
-if result != 0:
- raise LdbError, (result, 'ldb_global_init failed')
-
-# Ldb exceptions
-
-class LdbError(Exception):
- """An exception raised when a ldb error occurs.
- The exception data is a tuple consisting of the ldb number and a
- string description of the error."""
- pass
-
-# Ldb classes
-
-class LdbMessage:
- """A class representing a ldb message as a Python dictionary."""
-
- def __init__(self):
- self.mem_ctx = talloc_init(None)
- self.msg = ldb_msg_new(self.mem_ctx)
-
- def __del__(self):
- if self.mem_ctx is not None:
- talloc_free(self.mem_ctx)
- self.mem_ctx = None
- self.msg = None
-
- # Make the dn attribute of the object dynamic
-
- def __getattr__(self, attr):
- if attr == 'dn':
- return ldb_dn_linearize(None, self.msg.dn)
- return self.__dict__[attr]
-
- def __setattr__(self, attr, value):
- if attr == 'dn':
- self.msg.dn = ldb_dn_explode(self.msg, value)
- if self.msg.dn == None:
- err = LDB_ERR_INVALID_DN_SYNTAX
- raise LdbError(err, ldb_strerror(err))
- return
- self.__dict__[attr] = value
-
- # Get and set individual elements
-
- def __getitem__(self, key):
-
- elt = ldb_msg_find_element(self.msg, key)
-
- if elt is None:
- raise KeyError, "No such attribute '%s'" % key
-
- return [ldb_val_array_getitem(elt.values, i)
- for i in range(elt.num_values)]
-
- def __setitem__(self, key, value):
- ldb_msg_remove_attr(self.msg, key)
- if type(value) in (list, tuple):
- [ldb_msg_add_value(self.msg, key, v) for v in value]
- else:
- ldb_msg_add_value(self.msg, key, value)
-
- # Dictionary interface
- # TODO: move to iterator based interface
-
- def len(self):
- return self.msg.num_elements
-
- def keys(self):
- return [ldb_message_element_array_getitem(self.msg.elements, i).name
- for i in range(self.msg.num_elements)]
-
- def values(self):
- return [self[k] for k in self.keys()]
-
- def items(self):
- return [(k, self[k]) for k in self.keys()]
-
- # Misc stuff
-
- def sanity_check(self):
- return ldb_msg_sanity_check(self.msg)
-
-class Ldb:
- """A class representing a binding to a ldb file."""
-
- def __init__(self, url, flags = 0):
- """Initialise underlying ldb."""
-
- self.mem_ctx = talloc_init('mem_ctx for ldb 0x%x' % id(self))
- self.ldb_ctx = ldb_init(self.mem_ctx)
-
- result = ldb_connect(self.ldb_ctx, url, flags, None)
-
- if result != LDB_SUCCESS:
- raise LdbError, (result, ldb_strerror(result))
-
- def __del__(self):
- """Called when the object is to be garbage collected."""
- self.close()
-
- def close(self):
- """Close down a ldb."""
- if self.mem_ctx is not None:
- talloc_free(self.mem_ctx)
- self.mem_ctx = None
- self.ldb_ctx = None
-
- def _ldb_call(self, fn, *args):
- """Call a ldb function with args. Raise a LdbError exception
- if the function returns a non-zero return value."""
-
- result = fn(*args)
-
- if result != LDB_SUCCESS:
- raise LdbError, (result, ldb_strerror(result))
-
- def search(self, expression):
- """Search a ldb for a given expression."""
-
- self._ldb_call(ldb_search, self.ldb_ctx, None, LDB_SCOPE_DEFAULT,
- expression, None);
-
- return [LdbMessage(ldb_message_ptr_array_getitem(result.msgs, ndx))
- for ndx in range(result.count)]
-
- def delete(self, dn):
- """Delete a dn."""
-
- _dn = ldb_dn_explode(self.ldb_ctx, dn)
-
- self._ldb_call(ldb_delete, self.ldb_ctx, _dn)
-
- def rename(self, olddn, newdn):
- """Rename a dn."""
-
- _olddn = ldb_dn_explode(self.ldb_ctx, olddn)
- _newdn = ldb_dn_explode(self.ldb_ctx, newdn)
-
- self._ldb_call(ldb_rename, self.ldb_ctx, _olddn, _newdn)
-
- def add(self, m):
- self._ldb_call(ldb_add, self.ldb_ctx, m.msg)
diff --git a/source4/lib/ldb/tests/python/api.py b/source4/lib/ldb/tests/python/api.py
new file mode 100755
index 0000000000..b140bfc599
--- /dev/null
+++ b/source4/lib/ldb/tests/python/api.py
@@ -0,0 +1,353 @@
+#!/usr/bin/python
+# Simple tests for the ldb python API
+# Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
+import sys
+import unittest
+sys.path.append("swig")
+sys.path.append("build/lib.linux-i686-2.4")
+
+import ldb
+
+class NoContextTests(unittest.TestCase):
+ def test_valid_attr_name(self):
+ self.assertTrue(ldb.valid_attr_name("foo"))
+ self.assertFalse(ldb.valid_attr_name("24foo"))
+
+ def test_timestring(self):
+ self.assertEquals("19700101000000.0Z", ldb.timestring(0))
+ self.assertEquals("20071119191012.0Z", ldb.timestring(1195499412))
+
+ def test_string_to_time(self):
+ self.assertEquals(0, ldb.string_to_time("19700101000000.0Z"))
+ self.assertEquals(1195499412, ldb.string_to_time("20071119191012.0Z"))
+
+
+class SimpleLdb(unittest.TestCase):
+ def test_connect(self):
+ ldb.Ldb("foo.tdb")
+
+ def test_connect_none(self):
+ ldb.Ldb()
+
+ def test_connect_later(self):
+ x = ldb.Ldb()
+ x.connect("foo.tdb")
+
+ def test_set_create_perms(self):
+ x = ldb.Ldb()
+ x.set_create_perms(0600)
+
+ def test_set_modules_dir(self):
+ x = ldb.Ldb()
+ x.set_modules_dir("/tmp")
+
+ def test_search(self):
+ l = ldb.Ldb("foo.tdb")
+ self.assertEquals(len(l.search()), 1)
+
+ def test_search_attrs(self):
+ l = ldb.Ldb("foo.tdb")
+ self.assertEquals(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
+
+ def test_opaque(self):
+ l = ldb.Ldb("foo.tdb")
+ l.set_opaque("my_opaque", l)
+ self.assertTrue(l.get_opaque("my_opaque") is not None)
+ self.assertEquals(None, l.get_opaque("unknown"))
+
+ def test_search_scope_base(self):
+ l = ldb.Ldb("foo.tdb")
+ self.assertEquals(len(l.search(ldb.Dn(l, "dc=foo"),
+ ldb.SCOPE_ONELEVEL)), 0)
+
+ def test_delete(self):
+ l = ldb.Ldb("foo.tdb")
+ self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo")))
+
+ def test_contains(self):
+ l = ldb.Ldb("foo.tdb")
+ self.assertFalse(ldb.Dn(l, "dc=foo") in l)
+ l = ldb.Ldb("foo.tdb")
+ m = ldb.Message()
+ m.dn = ldb.Dn(l, "dc=foo")
+ m["b"] = ["a"]
+ l.add(m)
+ try:
+ self.assertTrue(ldb.Dn(l, "dc=foo") in l)
+ finally:
+ l.delete(m.dn)
+
+ def test_get_config_basedn(self):
+ l = ldb.Ldb("foo.tdb")
+ self.assertEquals(None, l.get_config_basedn())
+
+ def test_get_root_basedn(self):
+ l = ldb.Ldb("foo.tdb")
+ self.assertEquals(None, l.get_root_basedn())
+
+ def test_get_schema_basedn(self):
+ l = ldb.Ldb("foo.tdb")
+ self.assertEquals(None, l.get_schema_basedn())
+
+ def test_get_default_basedn(self):
+ l = ldb.Ldb("foo.tdb")
+ self.assertEquals(None, l.get_default_basedn())
+
+ def test_add(self):
+ l = ldb.Ldb("foo.tdb")
+ m = ldb.Message()
+ m.dn = ldb.Dn(l, "dc=foo")
+ m["bla"] = "bla"
+ self.assertEquals(len(l.search()), 1)
+ l.add(m)
+ try:
+ self.assertEquals(len(l.search()), 2)
+ finally:
+ l.delete(ldb.Dn(l, "dc=foo"))
+
+ def test_add_dict(self):
+ l = ldb.Ldb("foo.tdb")
+ m = {"dn": ldb.Dn(l, "dc=foo"),
+ "bla": "bla"}
+ self.assertEquals(len(l.search()), 1)
+ l.add(m)
+ try:
+ self.assertEquals(len(l.search()), 2)
+ finally:
+ l.delete(ldb.Dn(l, "dc=foo"))
+
+ def test_rename(self):
+ l = ldb.Ldb("foo.tdb")
+ m = ldb.Message()
+ m.dn = ldb.Dn(l, "dc=foo")
+ m["bla"] = "bla"
+ self.assertEquals(len(l.search()), 1)
+ l.add(m)
+ try:
+ l.rename(ldb.Dn(l, "dc=foo"), ldb.Dn(l, "dc=bar"))
+ self.assertEquals(len(l.search()), 2)
+ finally:
+ l.delete(ldb.Dn(l, "dc=bar"))
+
+ def test_modify_delete(self):
+ l = ldb.Ldb("foo.tdb")
+ m = ldb.Message()
+ m.dn = ldb.Dn(l, "dc=modify")
+ m["bla"] = ["1234"]
+ l.add(m)
+ rm = l.search(m.dn)[0]
+ self.assertEquals(["1234"], list(rm["bla"]))
+ try:
+ m = ldb.Message()
+ m.dn = ldb.Dn(l, "dc=modify")
+ m["bla"] = ldb.MessageElement([], ldb.CHANGETYPE_DELETE, "bla")
+ l.modify(m)
+ rm = l.search(m.dn)[0]
+ self.assertEquals(1, len(rm))
+ finally:
+ l.delete(ldb.Dn(l, "dc=modify"))
+
+ def test_modify_add(self):
+ l = ldb.Ldb("foo.tdb")
+ m = ldb.Message()
+ m.dn = ldb.Dn(l, "dc=modify")
+ m["bla"] = ["1234"]
+ l.add(m)
+ try:
+ m = ldb.Message()
+ m.dn = ldb.Dn(l, "dc=modify")
+ m["bla"] = ldb.MessageElement(["456"], ldb.CHANGETYPE_ADD, "bla")
+ l.modify(m)
+ rm = l.search(m.dn)[0]
+ self.assertEquals(2, len(rm))
+ self.assertEquals(["1234", "456"], list(rm["bla"]))
+ finally:
+ l.delete(ldb.Dn(l, "dc=modify"))
+
+ def test_modify_modify(self):
+ l = ldb.Ldb("foo.tdb")
+ m = ldb.Message()
+ m.dn = ldb.Dn(l, "dc=modify")
+ m["bla"] = ["1234", "456"]
+ l.add(m)
+ try:
+ m = ldb.Message()
+ m.dn = ldb.Dn(l, "dc=modify")
+ m["bla"] = ldb.MessageElement(["456"], ldb.CHANGETYPE_MODIFY, "bla")
+ l.modify(m)
+ rm = l.search(m.dn)[0]
+ self.assertEquals(2, len(rm))
+ self.assertEquals(["1234"], list(rm["bla"]))
+ finally:
+ l.delete(ldb.Dn(l, "dc=modify"))
+
+ def test_transaction_commit(self):
+ l = ldb.Ldb("foo.tdb")
+ l.transaction_start()
+ m = ldb.Message(ldb.Dn(l, "dc=foo"))
+ m["foo"] = ["bar"]
+ l.add(m)
+ l.transaction_commit()
+ l.delete(m.dn)
+
+ def test_transaction_cancel(self):
+ l = ldb.Ldb("foo.tdb")
+ l.transaction_start()
+ m = ldb.Message(ldb.Dn(l, "dc=foo"))
+ m["foo"] = ["bar"]
+ l.add(m)
+ l.transaction_cancel()
+ self.assertEquals(0, len(l.search(ldb.Dn(l, "dc=foo"))))
+
+ def test_set_debug(self):
+ def my_report_fn(level, text):
+ pass
+ l = ldb.Ldb("foo.tdb")
+ l.set_debug(my_report_fn)
+
+
+class DnTests(unittest.TestCase):
+ def setUp(self):
+ self.ldb = ldb.Ldb("foo.tdb")
+
+ def test_str(self):
+ x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
+ self.assertEquals(x.__str__(), "dc=foo,bar=bloe")
+
+ def test_get_casefold(self):
+ x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
+ self.assertEquals(x.get_casefold(), "DC=FOO,BAR=bloe")
+
+ def test_validate(self):
+ x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
+ self.assertTrue(x.validate())
+
+ def test_parent(self):
+ x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
+ self.assertEquals("bar=bloe", x.parent().__str__())
+
+ def test_compare(self):
+ x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
+ y = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
+ self.assertEquals(x, y)
+ z = ldb.Dn(self.ldb, "dc=foo,bar=blie")
+ self.assertNotEquals(z, y)
+
+ def test_is_valid(self):
+ x = ldb.Dn(self.ldb, "dc=foo,dc=bloe")
+ self.assertTrue(x.is_valid())
+ x = ldb.Dn(self.ldb, "")
+ # is_valid()'s return values appears to be a side effect of
+ # some other ldb functions. yuck.
+ # self.assertFalse(x.is_valid())
+
+ def test_is_special(self):
+ x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
+ self.assertFalse(x.is_special())
+ x = ldb.Dn(self.ldb, "@FOOBAR")
+ self.assertTrue(x.is_special())
+
+ def test_check_special(self):
+ x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
+ self.assertFalse(x.check_special("FOOBAR"))
+ x = ldb.Dn(self.ldb, "@FOOBAR")
+ self.assertTrue(x.check_special("@FOOBAR"))
+
+ def test_len(self):
+ x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
+ self.assertEquals(2, len(x))
+ x = ldb.Dn(self.ldb, "dc=foo")
+ self.assertEquals(1, len(x))
+
+ def test_add_child(self):
+ x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
+ self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
+ self.assertEquals("bla=bloe,dc=foo,bar=bloe", x.__str__())
+
+ def test_add_base(self):
+ x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
+ self.assertTrue(x.add_base(ldb.Dn(self.ldb, "bla=bloe")))
+ self.assertEquals("dc=foo,bar=bloe,bla=bloe", x.__str__())
+
+ def test_add(self):
+ x = ldb.Dn(self.ldb, "dc=foo")
+ y = ldb.Dn(self.ldb, "bar=bla")
+ self.assertEquals("dc=foo,bar=bla", str(y + x))
+
+ def test_parse_ldif(self):
+ msgs = self.ldb.parse_ldif("dn: foo=bar\n")
+ msg = msgs.next()
+ self.assertEquals("foo=bar", str(msg[1].dn))
+ self.assertTrue(isinstance(msg[1], ldb.Message))
+
+ def test_parse_ldif_more(self):
+ msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
+ msg = msgs.next()
+ self.assertEquals("foo=bar", str(msg[1].dn))
+ msg = msgs.next()
+ self.assertEquals("bar=bar", str(msg[1].dn))
+
+ def test_canonical_string(self):
+ x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
+ self.assertEquals("/bloe/foo", x.canonical_str())
+
+ def test_canonical_ex_string(self):
+ x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
+ self.assertEquals("/bloe\nfoo", x.canonical_ex_str())
+
+
+class LdbMsgTests(unittest.TestCase):
+ def setUp(self):
+ self.msg = ldb.Message()
+
+ def test_init_dn(self):
+ self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo"))
+ self.assertEquals("dc=foo", str(self.msg.dn))
+
+ def test_len(self):
+ self.assertEquals(0, len(self.msg))
+
+ def test_notpresent(self):
+ self.assertRaises(KeyError, lambda: self.msg["foo"])
+
+ def test_del(self):
+ del self.msg["foo"]
+
+ def test_add_value(self):
+ self.assertEquals(0, len(self.msg))
+ self.msg["foo"] = ["foo"]
+ self.assertEquals(1, len(self.msg))
+
+ def test_add_value_multiple(self):
+ self.assertEquals(0, len(self.msg))
+ self.msg["foo"] = ["foo", "bla"]
+ self.assertEquals(1, len(self.msg))
+ self.assertEquals(["foo", "bla"], list(self.msg["foo"]))
+
+ def test_set_value(self):
+ self.msg["foo"] = ["fool"]
+ self.assertEquals(["fool"], list(self.msg["foo"]))
+ self.msg["foo"] = ["bar"]
+ self.assertEquals(["bar"], list(self.msg["foo"]))
+
+ def test_keys(self):
+ self.msg["foo"] = ["bla"]
+ self.msg["bar"] = ["bla"]
+ self.assertEquals(["foo", "bar"], self.msg.keys())
+
+ def test_dn(self):
+ self.msg.dn = ldb.Dn(ldb.Ldb("foo.tdb"), "@BASEINFO")
+ self.assertEquals("@BASEINFO", self.msg.dn.__str__())
+
+
+class MessageElementTests(unittest.TestCase):
+ def test_cmp_element(self):
+ x = ldb.MessageElement(["foo"])
+ y = ldb.MessageElement(["foo"])
+ z = ldb.MessageElement(["bzr"])
+ self.assertEquals(x, y)
+ self.assertNotEquals(x, z)
+
+ def test_create_iterable(self):
+ x = ldb.MessageElement(["foo"])
+ self.assertEquals(["foo"], list(x))