From aa0a06f13c44e0eca0b3f2f0c34f0f7995b87159 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Sun, 23 Dec 2007 19:19:41 -0600 Subject: r26570: - Trim size of the swig-generated Python bindings by removing a bunch of {}'s. - Start working on Python equivalents for various EJS tests. - Fix regression in argument order for reg_diff_apply() in EJS bindings. (This used to be commit c550c03372cb260b78f6a6c132e70571bc4cb852) --- source4/dsdb/samdb/ldb_modules/tests/samba3sam.py | 1067 +++++++++++++++++++++ 1 file changed, 1067 insertions(+) create mode 100644 source4/dsdb/samdb/ldb_modules/tests/samba3sam.py (limited to 'source4/dsdb/samdb/ldb_modules/tests/samba3sam.py') diff --git a/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py b/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py new file mode 100644 index 0000000000..6a4935bf4d --- /dev/null +++ b/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py @@ -0,0 +1,1067 @@ +#!/usr/bin/python + +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij 2005-2007 +# Copyright (C) Martin Kuehl 2006 +# +# This is a Python port of the original in testprogs/ejs/samba3sam.js +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +import os +import sys +import samba +import ldb +from samba import Ldb, substitute_var +from samba.tests import LdbTestCase, TestCaseInTempDir + +datadir = sys.argv[2] + +class Samba3SamTestCase(TestCaseInTempDir): + def setup_data(self, obj, ldif): + self.assertTrue(ldif is not None) + obj.db.add_ldif(substitute_var(ldif, obj.substvars)) + + def setup_modules(self, ldb, s3, s4, ldif): + self.assertTrue(ldif is not None) + ldb.add_ldif(substitute_var(ldif, s4.substvars)) + + ldif = """ +dn: @MAP=samba3sam +@FROM: """ + s4.substvars["BASEDN"] + """ +@TO: sambaDomainName=TESTS,""" + s3.substvars["BASEDN"] + """ + +dn: @MODULES +@LIST: rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition + +dn: @PARTITION +partition: """ + s4.substvars["BASEDN"] + ":" + s4.url + """ +partition: """ + s3.substvars["BASEDN"] + ":" + s3.url + """ +replicateEntries: @SUBCLASSES +replicateEntries: @ATTRIBUTES +replicateEntries: @INDEXLIST +""" + ldb.add_ldif(ldif) + + def test_s3sam_search(self, ldb): + print "Looking up by non-mapped attribute" + msg = ldb.search(expression="(cn=Administrator)") + self.assertEquals(len(msg), 1) + self.assertEquals(msg[0]["cn"], "Administrator") + + print "Looking up by mapped attribute" + msg = ldb.search(expression="(name=Backup Operators)") + self.assertEquals(len(msg), 1) + self.assertEquals(msg[0]["name"], "Backup Operators") + + print "Looking up by old name of renamed attribute" + msg = ldb.search(expression="(displayName=Backup Operators)") + self.assertEquals(len(msg), 0) + + print "Looking up mapped entry containing SID" + msg = ldb.search(expression="(cn=Replicator)") + self.assertEquals(len(msg), 1) + print msg[0].dn + self.assertEquals(str(msg[0].dn), "cn=Replicator,ou=Groups,dc=vernstok,dc=nl") + self.assertEquals(msg[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552") + + print "Checking mapping of objectClass" + oc = set(msg[0]["objectClass"]) + self.assertTrue(oc is not None) + for i in oc: + self.assertEquals(oc[i] == "posixGroup" or oc[i], "group") + + print "Looking up by objectClass" + msg = ldb.search(expression="(|(objectClass=user)(cn=Administrator))") + self.assertEquals(len(msg), 2) + for i in range(len(msg)): + self.assertEquals((str(msg[i].dn), "unixName=Administrator,ou=Users,dc=vernstok,dc=nl") or + (str(msg[i].dn) == "unixName=nobody,ou=Users,dc=vernstok,dc=nl")) + + + def test_s3sam_modify(ldb, s3): + print "Adding a record that will be fallbacked" + ldb.add_ldif(""" +dn: cn=Foo +foo: bar +blah: Blie +cn: Foo +showInAdvancedViewOnly: TRUE + """) + + print "Checking for existence of record (local)" + # TODO: This record must be searched in the local database, which is currently only supported for base searches + # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')] + # TODO: Actually, this version should work as well but doesn't... + # + # + attrs = ['foo','blah','cn','showInAdvancedViewOnly'] + msg = ldb.search(expression="(cn=Foo)", base="cn=Foo", scope=ldb.LDB_SCOPE_BASE, attrs=attrs) + self.assertEquals(len(msg), 1) + self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE") + self.assertEquals(msg[0]["foo"], "bar") + self.assertEquals(msg[0]["blah"], "Blie") + + print "Adding record that will be mapped" + ldb.add_ldif(""" +dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl +objectClass: user +unixName: bin +sambaUnicodePwd: geheim +cn: Niemand +""") + + print "Checking for existence of record (remote)" + msg = ldb.search(expression="(unixName=bin)", attrs=['unixName','cn','dn', 'sambaUnicodePwd']) + self.assertEquals(len(msg), 1) + self.assertEquals(msg[0]["cn"], "Niemand") + self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim") + + print "Checking for existence of record (local && remote)" + msg = ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", + attrs=['unixName','cn','dn', 'sambaUnicodePwd']) + self.assertEquals(len(msg), 1) # TODO: should check with more records + self.assertEquals(msg[0]["cn"], "Niemand") + self.assertEquals(msg[0]["unixName"], "bin") + self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim") + + print "Checking for existence of record (local || remote)" + msg = ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", + attrs=['unixName','cn','dn', 'sambaUnicodePwd']) + print "got " + len(msg) + " replies" + self.assertEquals(len(msg), 1) # TODO: should check with more records + self.assertEquals(msg[0]["cn"], "Niemand") + self.assertEquals(msg[0]["unixName"] == "bin" or msg[0]["sambaUnicodePwd"], "geheim") + + print "Checking for data in destination database" + msg = s3.db.search("(cn=Niemand)") + self.assertTrue(len(msg) >= 1) + self.assertEquals(msg[0]["sambaSID"], "S-1-5-21-4231626423-2410014848-2360679739-2001") + self.assertEquals(msg[0]["displayName"], "Niemand") + + print "Adding attribute..." + ldb.modify_ldif(""" +dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl +changetype: modify +add: description +description: Blah +""") + + print "Checking whether changes are still there..." + msg = ldb.search(expression="(cn=Niemand)") + self.assertTrue(len(msg) >= 1) + self.assertEquals(msg[0]["cn"], "Niemand") + self.assertEquals(msg[0]["description"], "Blah") + + print "Modifying attribute..." + ldb.modify_ldif(""" +dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl +changetype: modify +replace: description +description: Blie +""") + + print "Checking whether changes are still there..." + msg = ldb.search(expression="(cn=Niemand)") + self.assertTrue(len(msg) >= 1) + self.assertEquals(msg[0]["description"], "Blie") + + print "Deleting attribute..." + ldb.modify_ldif(""" +dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl +changetype: modify +delete: description +""") + + print "Checking whether changes are no longer there..." + msg = ldb.search(expression="(cn=Niemand)") + self.assertTrue(len(msg) >= 1) + self.assertEquals(msg[0]["description"], undefined) + + print "Renaming record..." + ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", "cn=Niemand2,cn=Users,dc=vernstok,dc=nl") + + print "Checking whether DN has changed..." + msg = ldb.search(expression="(cn=Niemand2)") + self.assertEquals(len(msg), 1) + self.assertEquals(str(msg[0].dn), "cn=Niemand2,cn=Users,dc=vernstok,dc=nl") + + print "Deleting record..." + ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl") + + print "Checking whether record is gone..." + msg = ldb.search(expression="(cn=Niemand2)") + self.assertEquals(len(msg), 0) + + def test_map_search(ldb, s3, s4): + print "Running search tests on mapped data" + ldif = """ +dn: """ + "sambaDomainName=TESTS,""" + s3.substvars["BASEDN"] + """ +objectclass: sambaDomain +objectclass: top +sambaSID: S-1-5-21-4231626423-2410014848-2360679739 +sambaNextRid: 2000 +sambaDomainName: TESTS""" + self.assertTrue(ldif is not None) + s3.db.add_ldif(substitute_var(ldif, s3.substvars)) + + print "Add a set of split records" + ldif = """ +dn: """ + s4.dn("cn=X") + """ +objectClass: user +cn: X +codePage: x +revision: x +dnsHostName: x +nextRid: y +lastLogon: x +description: x +objectSid: S-1-5-21-4231626423-2410014848-2360679739-552 +primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512 + +dn: """ + s4.dn("cn=Y") + """ +objectClass: top +cn: Y +codePage: x +revision: x +dnsHostName: y +nextRid: y +lastLogon: y +description: x + +dn: """ + s4.dn("cn=Z") + """ +objectClass: top +cn: Z +codePage: x +revision: y +dnsHostName: z +nextRid: y +lastLogon: z +description: y +""" + + self.assertTrue(ldif is not None) + ldb.add_ldif(substitute_var(ldif, s4.substvars)) + + print "Add a set of remote records" + + ldif = """ +dn: """ + s3.dn("cn=A") + """ +objectClass: posixAccount +cn: A +sambaNextRid: x +sambaBadPasswordCount: x +sambaLogonTime: x +description: x +sambaSID: S-1-5-21-4231626423-2410014848-2360679739-552 +sambaPrimaryGroupSID: S-1-5-21-4231626423-2410014848-2360679739-512 + +dn: """ + s3.dn("cn=B") + """ +objectClass: top +cn:B +sambaNextRid: x +sambaBadPasswordCount: x +sambaLogonTime: y +description: x + +dn: """ + s3.dn("cn=C") + """ +objectClass: top +cn: C +sambaNextRid: x +sambaBadPasswordCount: y +sambaLogonTime: z +description: y +""" + self.assertTrue(ldif is not None) + s3.add_ldif(substitute_var(ldif, s3.substvars)) + + print "Testing search by DN" + + # Search remote record by local DN + dn = s4.dn("cn=A") + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(str(res[0].dn)), dn) + self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(res[0]["lastLogon"], "x") + + # Search remote record by remote DN + dn = s3.dn("cn=A") + attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"] + res = s3.db.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(str(res[0].dn)), dn) + self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(res[0]["lastLogon"], undefined) + self.assertEquals(res[0]["sambaLogonTime"], "x") + + # Search split record by local DN + dn = s4.dn("cn=X") + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(str(res[0].dn)), dn) + self.assertEquals(res[0]["dnsHostName"], "x") + self.assertEquals(res[0]["lastLogon"], "x") + + # Search split record by remote DN + dn = s3.dn("cn=X") + attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"] + res = s3.db.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(str(res[0].dn)), dn) + self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(res[0]["lastLogon"], undefined) + self.assertEquals(res[0]["sambaLogonTime"], "x") + + print "Testing search by attribute" + + # Search by ignored attribute + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(revision=x)", scope=ldb.SCOPE_DEFAULT, attrs=attrs) + self.assertEquals(len(res), 2) + self.assertEquals(str(str(res[0].dn)), s4.dn("cn=Y")) + self.assertEquals(res[0]["dnsHostName"], "y") + self.assertEquals(res[0]["lastLogon"], "y") + self.assertEquals(str(str(res[1].dn)), s4.dn("cn=X")) + self.assertEquals(res[1]["dnsHostName"], "x") + self.assertEquals(res[1]["lastLogon"], "x") + + # Search by kept attribute + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(description=y)", scope=ldb.SCOPE_DEFAULT, attrs=attrs) + self.assertEquals(len(res), 2) + self.assertEquals(str(str(res[0].dn)), s4.dn("cn=Z")) + self.assertEquals(res[0]["dnsHostName"], "z") + self.assertEquals(res[0]["lastLogon"], "z") + self.assertEquals(str(str(res[1].dn)), s4.dn("cn=C")) + self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(res[1]["lastLogon"], "z") + + # Search by renamed attribute + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(badPwdCount=x)", scope=ldb.SCOPE_DEFAULT, attrs=attrs) + self.assertEquals(len(res), 2) + self.assertEquals(str(res[0].dn), s4.dn("cn=B")) + self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(res[0]["lastLogon"], "y") + self.assertEquals(str(res[1].dn), s4.dn("cn=A")) + self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(res[1]["lastLogon"], "x") + + # Search by converted attribute + attrs = ["dnsHostName", "lastLogon", "objectSid"] + # TODO: + # Using the SID directly in the parse tree leads to conversion + # errors, letting the search fail with no results. + #res = ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", NULL, ldb. SCOPE_DEFAULT, attrs) + res = ldb.search(expression="(objectSid=*)", attrs=attrs) + self.assertEquals(len(res), 3) + self.assertEquals(str(res[0].dn), s4.dn("cn=X")) + self.assertEquals(res[0]["dnsHostName"], "x") + self.assertEquals(res[0]["lastLogon"], "x") + self.assertEquals(res[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552") + self.assertEquals(str(res[1].dn), s4.dn("cn=A")) + self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(res[1]["lastLogon"], "x") + self.assertEquals(res[1]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552") + + # Search by generated attribute + # In most cases, this even works when the mapping is missing + # a `convert_operator' by enumerating the remote db. + attrs = ["dnsHostName", "lastLogon", "primaryGroupID"] + res = ldb.search(expression="(primaryGroupID=512)", attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), s4.dn("cn=A")) + self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(res[0]["lastLogon"], "x") + self.assertEquals(res[0]["primaryGroupID"], "512") + + # TODO: There should actually be two results, A and X. The + # primaryGroupID of X seems to get corrupted somewhere, and the + # objectSid isn't available during the generation of remote (!) data, + # which can be observed with the following search. Also note that Xs + # objectSid seems to be fine in the previous search for objectSid... */ + #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs) + #print len(res) + " results found" + #for i in range(len(res)): + # for (obj in res[i]) { + # print obj + ": " + res[i][obj] + # } + # print "---" + # + + # Search by remote name of renamed attribute */ + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(sambaBadPasswordCount=*)", attrs=attrs) + self.assertEquals(len(res), 0) + + # Search by objectClass + attrs = ["dnsHostName", "lastLogon", "objectClass"] + res = ldb.search(expression="(objectClass=user)", attrs=attrs) + self.assertEquals(len(res), 2) + self.assertEquals(str(res[0].dn), s4.dn("cn=X")) + self.assertEquals(res[0]["dnsHostName"], "x") + self.assertEquals(res[0]["lastLogon"], "x") + self.assertTrue(res[0]["objectClass"] is not None) + self.assertEquals(res[0]["objectClass"][0], "user") + self.assertEquals(str(res[1].dn), s4.dn("cn=A")) + self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(res[1]["lastLogon"], "x") + self.assertTrue(res[1]["objectClass"] is not None) + self.assertEquals(res[1]["objectClass"][0], "user") + + # Prove that the objectClass is actually used for the search + res = ldb.search(expression="(|(objectClass=user)(badPwdCount=x))", attrs=attrs) + self.assertEquals(len(res), 3) + self.assertEquals(str(res[0].dn), s4.dn("cn=B")) + self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(res[0]["lastLogon"], "y") + self.assertTrue(res[0]["objectClass"] is not None) + for oc in set(res[0]["objectClass"]): + self.assertEquals(oc, "user") + self.assertEquals(str(res[1].dn), s4.dn("cn=X")) + self.assertEquals(res[1]["dnsHostName"], "x") + self.assertEquals(res[1]["lastLogon"], "x") + self.assertTrue(res[1]["objectClass"] is not None) + self.assertEquals(res[1]["objectClass"][0], "user") + self.assertEquals(str(res[2].dn), s4.dn("cn=A")) + self.assertEquals(res[2]["dnsHostName"], undefined) + self.assertEquals(res[2]["lastLogon"], "x") + self.assertTrue(res[2]["objectClass"] is not None) + self.assertEquals(res[2]["objectClass"][0], "user") + + print "Testing search by parse tree" + + # Search by conjunction of local attributes + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(&(codePage=x)(revision=x))", attrs=attrs) + self.assertEquals(len(res), 2) + self.assertEquals(str(res[0].dn), s4.dn("cn=Y")) + self.assertEquals(res[0]["dnsHostName"], "y") + self.assertEquals(res[0]["lastLogon"], "y") + self.assertEquals(str(res[1].dn), s4.dn("cn=X")) + self.assertEquals(res[1]["dnsHostName"], "x") + self.assertEquals(res[1]["lastLogon"], "x") + + # Search by conjunction of remote attributes + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(&(lastLogon=x)(description=x))", attrs=attrs) + self.assertEquals(len(res), 2) + self.assertEquals(str(res[0].dn), s4.dn("cn=X")) + self.assertEquals(res[0]["dnsHostName"], "x") + self.assertEquals(res[0]["lastLogon"], "x") + self.assertEquals(str(res[1].dn), s4.dn("cn=A")) + self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(res[1]["lastLogon"], "x") + + # Search by conjunction of local and remote attribute + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(&(codePage=x)(description=x))", attrs=attrs) + self.assertEquals(len(res), 2) + self.assertEquals(str(res[0].dn), s4.dn("cn=Y")) + self.assertEquals(res[0]["dnsHostName"], "y") + self.assertEquals(res[0]["lastLogon"], "y") + self.assertEquals(str(res[1].dn), s4.dn("cn=X")) + self.assertEquals(res[1]["dnsHostName"], "x") + self.assertEquals(res[1]["lastLogon"], "x") + + # Search by conjunction of local and remote attribute w/o match + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(&(codePage=x)(nextRid=x))", attrs=attrs) + self.assertEquals(len(res), 0) + res = ldb.search(expression="(&(revision=x)(lastLogon=z))", attrs=attrs) + self.assertEquals(len(res), 0) + + # Search by disjunction of local attributes + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(|(revision=x)(dnsHostName=x))", attrs=attrs) + self.assertEquals(len(res), 2) + self.assertEquals(str(res[0].dn), s4.dn("cn=Y")) + self.assertEquals(res[0]["dnsHostName"], "y") + self.assertEquals(res[0]["lastLogon"], "y") + self.assertEquals(str(res[1].dn), s4.dn("cn=X")) + self.assertEquals(res[1]["dnsHostName"], "x") + self.assertEquals(res[1]["lastLogon"], "x") + + # Search by disjunction of remote attributes + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", attrs=attrs) + self.assertEquals(len(res), 3) + self.assertEquals(str(res[0].dn), s4.dn("cn=B")) + self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(res[0]["lastLogon"], "y") + self.assertEquals(str(res[1].dn), s4.dn("cn=X")) + self.assertEquals(res[1]["dnsHostName"], "x") + self.assertEquals(res[1]["lastLogon"], "x") + self.assertEquals(str(res[2].dn), s4.dn("cn=A")) + self.assertEquals(res[2]["dnsHostName"], undefined) + self.assertEquals(res[2]["lastLogon"], "x") + + # Search by disjunction of local and remote attribute + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(|(revision=x)(lastLogon=y))", attrs=attrs) + self.assertEquals(len(res), 3) + self.assertEquals(str(res[0].dn), s4.dn("cn=Y")) + self.assertEquals(res[0]["dnsHostName"], "y") + self.assertEquals(res[0]["lastLogon"], "y") + self.assertEquals(str(res[1].dn), s4.dn("cn=B")) + self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(res[1]["lastLogon"], "y") + self.assertEquals(str(res[2].dn), s4.dn("cn=X")) + self.assertEquals(res[2]["dnsHostName"], "x") + self.assertEquals(res[2]["lastLogon"], "x") + + # Search by disjunction of local and remote attribute w/o match + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(|(codePage=y)(nextRid=z))", attrs=attrs) + self.assertEquals(len(res), 0) + + # Search by negated local attribute + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(!(revision=x))", attrs=attrs) + self.assertEquals(len(res), 5) + self.assertEquals(str(res[0].dn), s4.dn("cn=B")) + self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(res[0]["lastLogon"], "y") + self.assertEquals(str(res[1].dn), s4.dn("cn=A")) + self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(res[1]["lastLogon"], "x") + self.assertEquals(str(res[2].dn), s4.dn("cn=Z")) + self.assertEquals(res[2]["dnsHostName"], "z") + self.assertEquals(res[2]["lastLogon"], "z") + self.assertEquals(str(res[3].dn), s4.dn("cn=C")) + self.assertEquals(res[3]["dnsHostName"], undefined) + self.assertEquals(res[3]["lastLogon"], "z") + + # Search by negated remote attribute + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(!(description=x))", attrs=attrs) + self.assertEquals(len(res), 3) + self.assertEquals(str(res[0].dn), s4.dn("cn=Z")) + self.assertEquals(res[0]["dnsHostName"], "z") + self.assertEquals(res[0]["lastLogon"], "z") + self.assertEquals(str(res[1].dn), s4.dn("cn=C")) + self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(res[1]["lastLogon"], "z") + + # Search by negated conjunction of local attributes + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(!(&(codePage=x)(revision=x)))", attrs=attrs) + self.assertEquals(len(res), 5) + self.assertEquals(str(res[0].dn), s4.dn("cn=B")) + self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(res[0]["lastLogon"], "y") + self.assertEquals(str(res[1].dn), s4.dn("cn=A")) + self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(res[1]["lastLogon"], "x") + self.assertEquals(str(res[2].dn), s4.dn("cn=Z")) + self.assertEquals(res[2]["dnsHostName"], "z") + self.assertEquals(res[2]["lastLogon"], "z") + self.assertEquals(str(res[3].dn), s4.dn("cn=C")) + self.assertEquals(res[3]["dnsHostName"], undefined) + self.assertEquals(res[3]["lastLogon"], "z") + + # Search by negated conjunction of remote attributes + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(!(&(lastLogon=x)(description=x)))", attrs=attrs) + self.assertEquals(len(res), 5) + self.assertEquals(str(res[0].dn), s4.dn("cn=Y")) + self.assertEquals(res[0]["dnsHostName"], "y") + self.assertEquals(res[0]["lastLogon"], "y") + self.assertEquals(str(res[1].dn), s4.dn("cn=B")) + self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(res[1]["lastLogon"], "y") + self.assertEquals(str(res[2].dn), s4.dn("cn=Z")) + self.assertEquals(res[2]["dnsHostName"], "z") + self.assertEquals(res[2]["lastLogon"], "z") + self.assertEquals(str(res[3].dn), s4.dn("cn=C")) + self.assertEquals(res[3]["dnsHostName"], undefined) + self.assertEquals(res[3]["lastLogon"], "z") + + # Search by negated conjunction of local and remote attribute + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(!(&(codePage=x)(description=x)))", attrs=attrs) + self.assertEquals(len(res), 5) + self.assertEquals(str(res[0].dn), s4.dn("cn=B")) + self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(res[0]["lastLogon"], "y") + self.assertEquals(str(res[1].dn), s4.dn("cn=A")) + self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(res[1]["lastLogon"], "x") + self.assertEquals(str(res[2].dn), s4.dn("cn=Z")) + self.assertEquals(res[2]["dnsHostName"], "z") + self.assertEquals(res[2]["lastLogon"], "z") + self.assertEquals(str(res[3].dn), s4.dn("cn=C")) + self.assertEquals(res[3]["dnsHostName"], undefined) + self.assertEquals(res[3]["lastLogon"], "z") + + # Search by negated disjunction of local attributes + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", attrs=attrs) + self.assertEquals(str(res[0].dn), s4.dn("cn=B")) + self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(res[0]["lastLogon"], "y") + self.assertEquals(str(res[1].dn), s4.dn("cn=A")) + self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(res[1]["lastLogon"], "x") + self.assertEquals(str(res[2].dn), s4.dn("cn=Z")) + self.assertEquals(res[2]["dnsHostName"], "z") + self.assertEquals(res[2]["lastLogon"], "z") + self.assertEquals(str(res[3].dn), s4.dn("cn=C")) + self.assertEquals(res[3]["dnsHostName"], undefined) + self.assertEquals(res[3]["lastLogon"], "z") + + # Search by negated disjunction of remote attributes + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", attrs=attrs) + self.assertEquals(len(res), 4) + self.assertEquals(str(res[0].dn), s4.dn("cn=Y")) + self.assertEquals(res[0]["dnsHostName"], "y") + self.assertEquals(res[0]["lastLogon"], "y") + self.assertEquals(str(res[1].dn), s4.dn("cn=Z")) + self.assertEquals(res[1]["dnsHostName"], "z") + self.assertEquals(res[1]["lastLogon"], "z") + self.assertEquals(str(res[2].dn), s4.dn("cn=C")) + self.assertEquals(res[2]["dnsHostName"], undefined) + self.assertEquals(res[2]["lastLogon"], "z") + + # Search by negated disjunction of local and remote attribute + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", attrs=attrs) + self.assertEquals(len(res), 4) + self.assertEquals(str(res[0].dn), s4.dn("cn=A")) + self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(res[0]["lastLogon"], "x") + self.assertEquals(str(res[1].dn), s4.dn("cn=Z")) + self.assertEquals(res[1]["dnsHostName"], "z") + self.assertEquals(res[1]["lastLogon"], "z") + self.assertEquals(str(res[2].dn), s4.dn("cn=C")) + self.assertEquals(res[2]["dnsHostName"], undefined) + self.assertEquals(res[2]["lastLogon"], "z") + + print "Search by complex parse tree" + attrs = ["dnsHostName", "lastLogon"] + res = ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=attrs) + self.assertEquals(len(res), 6) + self.assertEquals(str(res[0].dn), s4.dn("cn=B")) + self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(res[0]["lastLogon"], "y") + self.assertEquals(str(res[1].dn), s4.dn("cn=X")) + self.assertEquals(res[1]["dnsHostName"], "x") + self.assertEquals(res[1]["lastLogon"], "x") + self.assertEquals(str(res[2].dn), s4.dn("cn=A")) + self.assertEquals(res[2]["dnsHostName"], undefined) + self.assertEquals(res[2]["lastLogon"], "x") + self.assertEquals(str(res[3].dn), s4.dn("cn=Z")) + self.assertEquals(res[3]["dnsHostName"], "z") + self.assertEquals(res[3]["lastLogon"], "z") + self.assertEquals(str(res[4].dn), s4.dn("cn=C")) + self.assertEquals(res[4]["dnsHostName"], undefined) + self.assertEquals(res[4]["lastLogon"], "z") + + # Clean up + dns = [s4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]] + for dn in dns: + ldb.delete(dn) + + def test_map_modify(self, ldb, s3, s4): + print "Running modification tests on mapped data" + + print "Testing modification of local records" + + # Add local record + dn = "cn=test,dc=idealx,dc=org" + ldif = """ +dn: """ + dn + """ +cn: test +foo: bar +revision: 1 +description: test +""" + ldb.add_ldif(ldif) + # Check it's there + attrs = ["foo", "revision", "description"] + res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(res[0]["foo"], "bar") + self.assertEquals(res[0]["revision"], "1") + self.assertEquals(res[0]["description"], "test") + # Check it's not in the local db + res = s4.db.search("(cn=test)", NULL, ldb.SCOPE_DEFAULT, attrs) + self.assertEquals(len(res), 0) + # Check it's not in the remote db + res = s3.db.search("(cn=test)", NULL, ldb.SCOPE_DEFAULT, attrs) + self.assertEquals(len(res), 0) + + # Modify local record + ldif = """ +dn: """ + dn + """ +replace: foo +foo: baz +replace: description +description: foo +""" + ldb.modify_ldif(ldif) + # Check in local db + res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(res[0]["foo"], "baz") + self.assertEquals(res[0]["revision"], "1") + self.assertEquals(res[0]["description"], "foo") + + # Rename local record + dn2 = "cn=toast,dc=idealx,dc=org" + ldb.rename(dn, dn2) + # Check in local db + res = ldb.search(dn2, scope=ldb.SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn2) + self.assertEquals(res[0]["foo"], "baz") + self.assertEquals(res[0]["revision"], "1") + self.assertEquals(res[0]["description"], "foo") + + # Delete local record + ldb.delete(dn2) + # Check it's gone + res = ldb.search(dn2, scope=ldb.SCOPE_BASE) + self.assertEquals(len(res), 0) + + print "Testing modification of remote records" + + # Add remote record + dn = s4.dn("cn=test") + dn2 = s3.dn("cn=test") + ldif = """ +dn: """ + dn2 + """ +cn: test +description: foo +sambaBadPasswordCount: 3 +sambaNextRid: 1001 +""" + s3.db.add_ldif(ldif) + # Check it's there + attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"] + res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn2) + self.assertEquals(res[0]["description"], "foo") + self.assertEquals(res[0]["sambaBadPasswordCount"], "3") + self.assertEquals(res[0]["sambaNextRid"], "1001") + # Check in mapped db + attrs = ["description", "badPwdCount", "nextRid"] + res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(res[0]["description"], "foo") + self.assertEquals(res[0]["badPwdCount"], "3") + self.assertEquals(res[0]["nextRid"], "1001") + # Check in local db + res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs) + self.assertEquals(len(res), 0) + + # Modify remote data of remote record + ldif = """ +dn: """ + dn + """ +replace: description +description: test +replace: badPwdCount +badPwdCount: 4 +""" + ldb.modify_ldif(ldif) + # Check in mapped db + attrs = ["description", "badPwdCount", "nextRid"] + res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(res[0]["description"], "test") + self.assertEquals(res[0]["badPwdCount"], "4") + self.assertEquals(res[0]["nextRid"], "1001") + # Check in remote db + attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"] + res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn2) + self.assertEquals(res[0]["description"], "test") + self.assertEquals(res[0]["sambaBadPasswordCount"], "4") + self.assertEquals(res[0]["sambaNextRid"], "1001") + + # Rename remote record + dn2 = s4.dn("cn=toast") + ldb.rename(dn, dn2) + # Check in mapped db + dn = dn2 + attrs = ["description", "badPwdCount", "nextRid"] + res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(res[0]["description"], "test") + self.assertEquals(res[0]["badPwdCount"], "4") + self.assertEquals(res[0]["nextRid"], "1001") + # Check in remote db + dn2 = s3.dn("cn=toast") + attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"] + res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn2) + self.assertEquals(res[0]["description"], "test") + self.assertEquals(res[0]["sambaBadPasswordCount"], "4") + self.assertEquals(res[0]["sambaNextRid"], "1001") + + # Delete remote record + ldb.delete(dn) + # Check in mapped db + res = ldb.search(dn, scope=ldb.SCOPE_BASE) + self.assertEquals(len(res), 0) + # Check in remote db + res = s3.db.search("", dn2, ldb.SCOPE_BASE) + self.assertEquals(len(res), 0) + + # Add remote record (same as before) + dn = s4.dn("cn=test") + dn2 = s3.dn("cn=test") + ldif = """ +dn: """ + dn2 + """ +cn: test +description: foo +sambaBadPasswordCount: 3 +sambaNextRid: 1001 +""" + s3.db.add_ldif(ldif) + + # Modify local data of remote record + ldif = """ +dn: """ + dn + """ +add: revision +revision: 1 +replace: description +description: test +""" + ldb.modify_ldif(ldif) + # Check in mapped db + attrs = ["revision", "description"] + res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(res[0]["description"], "test") + self.assertEquals(res[0]["revision"], "1") + # Check in remote db + res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn2) + self.assertEquals(res[0]["description"], "test") + self.assertEquals(res[0]["revision"], undefined) + # Check in local db + res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(res[0]["description"], undefined) + self.assertEquals(res[0]["revision"], "1") + + # Delete (newly) split record + ldb.delete(dn) + + print "Testing modification of split records" + + # Add split record + dn = s4.dn("cn=test") + dn2 = s3.dn("cn=test") + ldif = """ +dn: """ + dn + """ +cn: test +description: foo +badPwdCount: 3 +nextRid: 1001 +revision: 1 +""" + ldb.add_ldif(ldif) + # Check it's there + attrs = ["description", "badPwdCount", "nextRid", "revision"] + res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(res[0]["description"], "foo") + self.assertEquals(res[0]["badPwdCount"], "3") + self.assertEquals(res[0]["nextRid"], "1001") + self.assertEquals(res[0]["revision"], "1") + # Check in local db + res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(res[0]["description"], undefined) + self.assertEquals(res[0]["badPwdCount"], undefined) + self.assertEquals(res[0]["nextRid"], undefined) + self.assertEquals(res[0]["revision"], "1") + # Check in remote db + attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"] + res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn2) + self.assertEquals(res[0]["description"], "foo") + self.assertEquals(res[0]["sambaBadPasswordCount"], "3") + self.assertEquals(res[0]["sambaNextRid"], "1001") + self.assertEquals(res[0]["revision"], undefined) + + # Modify of split record + ldif = """ +dn: """ + dn + """ +replace: description +description: test +replace: badPwdCount +badPwdCount: 4 +replace: revision +revision: 2 +""" + ldb.modify_ldif(ldif) + # Check in mapped db + attrs = ["description", "badPwdCount", "nextRid", "revision"] + res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(res[0]["description"], "test") + self.assertEquals(res[0]["badPwdCount"], "4") + self.assertEquals(res[0]["nextRid"], "1001") + self.assertEquals(res[0]["revision"], "2") + # Check in local db + res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(res[0]["description"], undefined) + self.assertEquals(res[0]["badPwdCount"], undefined) + self.assertEquals(res[0]["nextRid"], undefined) + self.assertEquals(res[0]["revision"], "2") + # Check in remote db + attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"] + res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn2) + self.assertEquals(res[0]["description"], "test") + self.assertEquals(res[0]["sambaBadPasswordCount"], "4") + self.assertEquals(res[0]["sambaNextRid"], "1001") + self.assertEquals(res[0]["revision"], undefined) + + # Rename split record + dn2 = s4.dn("cn=toast") + ldb.rename(dn, dn2) + # Check in mapped db + dn = dn2 + attrs = ["description", "badPwdCount", "nextRid", "revision"] + res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(res[0]["description"], "test") + self.assertEquals(res[0]["badPwdCount"], "4") + self.assertEquals(res[0]["nextRid"], "1001") + self.assertEquals(res[0]["revision"], "2") + # Check in local db + res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(res[0]["description"], undefined) + self.assertEquals(res[0]["badPwdCount"], undefined) + self.assertEquals(res[0]["nextRid"], undefined) + self.assertEquals(res[0]["revision"], "2") + # Check in remote db + dn2 = s3.dn("cn=toast") + attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"] + res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn2) + self.assertEquals(res[0]["description"], "test") + self.assertEquals(res[0]["sambaBadPasswordCount"], "4") + self.assertEquals(res[0]["sambaNextRid"], "1001") + self.assertEquals(res[0]["revision"], undefined) + + # Delete split record + ldb.delete(dn) + # Check in mapped db + res = ldb.search(dn, scope=ldb.SCOPE_BASE) + self.assertEquals(len(res), 0) + # Check in local db + res = s4.db.search("", dn, ldb.SCOPE_BASE) + self.assertEquals(len(res), 0) + # Check in remote db + res = s3.db.search("", dn2, ldb.SCOPE_BASE) + self.assertEquals(len(res), 0) + + def setUp(self): + super(Samba3SamTestCase, self).setUp() + + def make_dn(rdn): + return rdn + ",sambaDomainName=TESTS," + this.substvars["BASEDN"] + + def make_s4dn(rdn): + return rdn + "," + this.substvars["BASEDN"] + + ldb = Ldb() + + ldbfile = os.path.join(self.tempdir, "test.ldb") + ldburl = "tdb://" + ldbfile + + tempdir = self.tempdir + + class Target: + def __init__(self, file, basedn, dn): + self.file = os.path.join(tempdir, file) + self.url = "tdb://" + self.file + self.substvars = {"BASEDN": basedn} + self.db = Ldb() + self.dn = dn + + samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn) + samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn) + templates = Target("templates.ldb", "cn=templates", None) + + ldb.connect(ldburl) + samba3.db.connect(samba3.url) + templates.db.connect(templates.url) + samba4.db.connect(samba4.url) + + self.setup_data(samba3, open(os.path.join(datadir, "samba3.ldif"), 'r').read()) + self.setup_data(templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read()) + self.setup_modules(ldb, samba3, samba4, open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()) + + ldb = Ldb() + ldb.connect(ldburl) + + self.test_s3sam_search(ldb) + self.test_s3sam_modify(ldb, samba3) + + os.unlink(ldbfile) + os.unlink(samba3.file) + os.unlink(templates.file) + os.unlink(samba4.file) + + ldb = Ldb() + ldb.connect(ldburl) + samba3.db = Ldb() + samba3.db.connect(samba3.url) + templates.db = Ldb() + templates.db.connect(templates.url) + samba4.db = Ldb() + samba4.db.connect(samba4.url) + + self.setup_data(templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read()) + self.setup_modules(ldb, samba3, samba4, open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()) + + ldb = Ldb() + ldb.connect(ldburl) + + test_map_search(ldb, samba3, samba4) + test_map_modify(ldb, samba3, samba4) + -- cgit From 7c146c42d2cf51e891b9f29d3b61a40f173a3b23 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Tue, 25 Dec 2007 16:36:31 -0600 Subject: r26593: - More work on the python versions of samba3dump and the samba3sam tests. - Initial work converting the upgrade code to Python. - Removed the old EJS upgrade code because it has been broken for a long time. (This used to be commit 150cf39fbd4fe088546870fb0d8f20c0d9eb4aca) --- source4/dsdb/samdb/ldb_modules/tests/samba3sam.py | 127 +++++++++++----------- 1 file changed, 63 insertions(+), 64 deletions(-) (limited to 'source4/dsdb/samdb/ldb_modules/tests/samba3sam.py') diff --git a/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py b/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py index 6a4935bf4d..8ca92e152e 100644 --- a/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py +++ b/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py @@ -27,35 +27,33 @@ import ldb from samba import Ldb, substitute_var from samba.tests import LdbTestCase, TestCaseInTempDir -datadir = sys.argv[2] +datadir = os.path.join(os.path.dirname(__file__), "../../../../../testdata/samba3") class Samba3SamTestCase(TestCaseInTempDir): def setup_data(self, obj, ldif): self.assertTrue(ldif is not None) obj.db.add_ldif(substitute_var(ldif, obj.substvars)) - def setup_modules(self, ldb, s3, s4, ldif): - self.assertTrue(ldif is not None) - ldb.add_ldif(substitute_var(ldif, s4.substvars)) + def setup_modules(self, ldb, s3, s4): ldif = """ dn: @MAP=samba3sam -@FROM: """ + s4.substvars["BASEDN"] + """ -@TO: sambaDomainName=TESTS,""" + s3.substvars["BASEDN"] + """ +@FROM: """ + s4.basedn + """ +@TO: sambaDomainName=TESTS,""" + s3.basedn + """ dn: @MODULES @LIST: rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition dn: @PARTITION -partition: """ + s4.substvars["BASEDN"] + ":" + s4.url + """ -partition: """ + s3.substvars["BASEDN"] + ":" + s3.url + """ +partition: """ + s4.basedn + ":" + s4.url + """ +partition: """ + s3.basedn + ":" + s3.url + """ replicateEntries: @SUBCLASSES replicateEntries: @ATTRIBUTES replicateEntries: @INDEXLIST """ ldb.add_ldif(ldif) - def test_s3sam_search(self, ldb): + def _test_s3sam_search(self, ldb): print "Looking up by non-mapped attribute" msg = ldb.search(expression="(cn=Administrator)") self.assertEquals(len(msg), 1) @@ -91,7 +89,7 @@ replicateEntries: @INDEXLIST (str(msg[i].dn) == "unixName=nobody,ou=Users,dc=vernstok,dc=nl")) - def test_s3sam_modify(ldb, s3): + def _test_s3sam_modify(ldb, s3): print "Adding a record that will be fallbacked" ldb.add_ldif(""" dn: cn=Foo @@ -205,16 +203,15 @@ delete: description msg = ldb.search(expression="(cn=Niemand2)") self.assertEquals(len(msg), 0) - def test_map_search(ldb, s3, s4): + def _test_map_search(self, ldb, s3, s4): print "Running search tests on mapped data" ldif = """ -dn: """ + "sambaDomainName=TESTS,""" + s3.substvars["BASEDN"] + """ +dn: """ + "sambaDomainName=TESTS,""" + s3.basedn + """ objectclass: sambaDomain objectclass: top sambaSID: S-1-5-21-4231626423-2410014848-2360679739 sambaNextRid: 2000 sambaDomainName: TESTS""" - self.assertTrue(ldif is not None) s3.db.add_ldif(substitute_var(ldif, s3.substvars)) print "Add a set of split records" @@ -252,7 +249,6 @@ lastLogon: z description: y """ - self.assertTrue(ldif is not None) ldb.add_ldif(substitute_var(ldif, s4.substvars)) print "Add a set of remote records" @@ -284,7 +280,6 @@ sambaBadPasswordCount: y sambaLogonTime: z description: y """ - self.assertTrue(ldif is not None) s3.add_ldif(substitute_var(ldif, s3.substvars)) print "Testing search by DN" @@ -678,7 +673,7 @@ description: y for dn in dns: ldb.delete(dn) - def test_map_modify(self, ldb, s3, s4): + def _test_map_modify(self, ldb, s3, s4): print "Running modification tests on mapped data" print "Testing modification of local records" @@ -1002,66 +997,70 @@ revision: 2 def setUp(self): super(Samba3SamTestCase, self).setUp() - def make_dn(rdn): - return rdn + ",sambaDomainName=TESTS," + this.substvars["BASEDN"] - - def make_s4dn(rdn): - return rdn + "," + this.substvars["BASEDN"] + def make_dn(basedn, rdn): + return rdn + ",sambaDomainName=TESTS," + basedn - ldb = Ldb() + def make_s4dn(basedn, rdn): + return rdn + "," + basedn - ldbfile = os.path.join(self.tempdir, "test.ldb") - ldburl = "tdb://" + ldbfile + self.ldbfile = os.path.join(self.tempdir, "test.ldb") + self.ldburl = "tdb://" + self.ldbfile tempdir = self.tempdir + print tempdir class Target: + """Simple helper class that contains data for a specific SAM connection.""" def __init__(self, file, basedn, dn): self.file = os.path.join(tempdir, file) self.url = "tdb://" + self.file - self.substvars = {"BASEDN": basedn} + self.basedn = basedn + self.substvars = {"BASEDN": self.basedn} self.db = Ldb() - self.dn = dn - - samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn) - samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn) - templates = Target("templates.ldb", "cn=templates", None) - - ldb.connect(ldburl) - samba3.db.connect(samba3.url) - templates.db.connect(templates.url) - samba4.db.connect(samba4.url) - - self.setup_data(samba3, open(os.path.join(datadir, "samba3.ldif"), 'r').read()) - self.setup_data(templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read()) - self.setup_modules(ldb, samba3, samba4, open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()) - - ldb = Ldb() - ldb.connect(ldburl) - - self.test_s3sam_search(ldb) - self.test_s3sam_modify(ldb, samba3) - - os.unlink(ldbfile) - os.unlink(samba3.file) - os.unlink(templates.file) - os.unlink(samba4.file) + self._dn = dn + + def dn(self, rdn): + return self._dn(rdn, self.basedn) + + def connect(self): + return self.db.connect(self.url) + + self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn) + self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn) + self.templates = Target("templates.ldb", "cn=templates", None) + + self.samba3.connect() + self.templates.connect() + self.samba4.connect() + + def tearDown(self): + super(Samba3SamTestCase, self).tearDown() + os.unlink(self.ldbfile) + os.unlink(self.samba3.file) + os.unlink(self.templates.file) + os.unlink(self.samba4.file) + + def test_s3sam(self): + ldb = Ldb(self.ldburl) + self.setup_data(self.samba3, open(os.path.join(datadir, "samba3.ldif"), 'r').read()) + self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read()) + ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read() + ldb.add_ldif(substitute_var(ldif, s4.substvars)) + self.setup_modules(ldb, self.samba3, self.samba4) - ldb = Ldb() - ldb.connect(ldburl) - samba3.db = Ldb() - samba3.db.connect(samba3.url) - templates.db = Ldb() - templates.db.connect(templates.url) - samba4.db = Ldb() - samba4.db.connect(samba4.url) + ldb = Ldb(self.ldburl) - self.setup_data(templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read()) - self.setup_modules(ldb, samba3, samba4, open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()) + self._test_s3sam_search(ldb) + self._test_s3sam_modify(ldb, self.samba3) - ldb = Ldb() - ldb.connect(ldburl) + def test_map(self): + ldb = Ldb(self.ldburl) + self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read()) + ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read() + ldb.add_ldif(substitute_var(ldif, s4.substvars)) + self.setup_modules(ldb, self.samba3, self.samba4) - test_map_search(ldb, samba3, samba4) - test_map_modify(ldb, samba3, samba4) + ldb = Ldb(self.ldburl) + self._test_map_search(ldb, self.samba3, self.samba4) + self._test_map_modify(ldb, self.samba3, self.samba4) -- cgit From 43a03b0fb48ceb528539a16b0023fb5b30b7a79e Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Tue, 25 Dec 2007 16:36:53 -0600 Subject: r26598: Simplify the way Python tests are run. (This used to be commit d649f73431fc993e31522e7fc8e1e35e0a4421d8) --- source4/dsdb/samdb/ldb_modules/tests/samba3sam.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'source4/dsdb/samdb/ldb_modules/tests/samba3sam.py') diff --git a/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py b/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py index 8ca92e152e..b083b68da6 100644 --- a/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py +++ b/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py @@ -1034,18 +1034,18 @@ revision: 2 self.samba4.connect() def tearDown(self): - super(Samba3SamTestCase, self).tearDown() os.unlink(self.ldbfile) os.unlink(self.samba3.file) os.unlink(self.templates.file) os.unlink(self.samba4.file) + super(Samba3SamTestCase, self).tearDown() def test_s3sam(self): ldb = Ldb(self.ldburl) self.setup_data(self.samba3, open(os.path.join(datadir, "samba3.ldif"), 'r').read()) self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read()) ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read() - ldb.add_ldif(substitute_var(ldif, s4.substvars)) + ldb.add_ldif(substitute_var(ldif, self.samba4.substvars)) self.setup_modules(ldb, self.samba3, self.samba4) ldb = Ldb(self.ldburl) @@ -1057,7 +1057,7 @@ revision: 2 ldb = Ldb(self.ldburl) self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read()) ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read() - ldb.add_ldif(substitute_var(ldif, s4.substvars)) + ldb.add_ldif(substitute_var(ldif, self.samba4.substvars)) self.setup_modules(ldb, self.samba3, self.samba4) ldb = Ldb(self.ldburl) -- cgit From 870d20cf50a7f33c6b4cbd91f4c57406cb9d52b5 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Sun, 30 Dec 2007 16:46:05 -0600 Subject: r26630: Split up big tests into various smaller functions, making it easier to debug. (This used to be commit 4be116133724ac52f9df8adb3feeb93ea616a990) --- source4/dsdb/samdb/ldb_modules/tests/samba3sam.py | 368 +++++++++++----------- 1 file changed, 185 insertions(+), 183 deletions(-) (limited to 'source4/dsdb/samdb/ldb_modules/tests/samba3sam.py') diff --git a/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py b/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py index b083b68da6..16135c1681 100644 --- a/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py +++ b/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py @@ -20,40 +20,96 @@ # along with this program. If not, see . # +"""Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP.""" + import os import sys import samba import ldb +from ldb import SCOPE_DEFAULT, SCOPE_BASE from samba import Ldb, substitute_var from samba.tests import LdbTestCase, TestCaseInTempDir datadir = os.path.join(os.path.dirname(__file__), "../../../../../testdata/samba3") -class Samba3SamTestCase(TestCaseInTempDir): +class MapBaseTestCase(TestCaseInTempDir): def setup_data(self, obj, ldif): self.assertTrue(ldif is not None) obj.db.add_ldif(substitute_var(ldif, obj.substvars)) def setup_modules(self, ldb, s3, s4): + ldb.add({"dn": "@MAP=samba3sam", + "@FROM": s4.basedn, + "@TO": "sambaDomainName=TESTS," + s3.basedn}) - ldif = """ -dn: @MAP=samba3sam -@FROM: """ + s4.basedn + """ -@TO: sambaDomainName=TESTS,""" + s3.basedn + """ - -dn: @MODULES -@LIST: rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition - -dn: @PARTITION -partition: """ + s4.basedn + ":" + s4.url + """ -partition: """ + s3.basedn + ":" + s3.url + """ -replicateEntries: @SUBCLASSES -replicateEntries: @ATTRIBUTES -replicateEntries: @INDEXLIST -""" - ldb.add_ldif(ldif) + ldb.add({"dn": "@MODULES", + "@LIST": "rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"}) + + ldb.add({"dn": "@PARTITION", + "partition": [s4.basedn + ":" + s4.url, s3.basedn + ":" + s3.url], + "replicateEntries": ["@SUBCLASSES", "@ATTRIBUTES", "@INDEXLIST"]}) + + def setUp(self): + super(MapBaseTestCase, self).setUp() + + def make_dn(basedn, rdn): + return rdn + ",sambaDomainName=TESTS," + basedn + + def make_s4dn(basedn, rdn): + return rdn + "," + basedn + + self.ldbfile = os.path.join(self.tempdir, "test.ldb") + self.ldburl = "tdb://" + self.ldbfile - def _test_s3sam_search(self, ldb): + tempdir = self.tempdir + print tempdir + + class Target: + """Simple helper class that contains data for a specific SAM connection.""" + def __init__(self, file, basedn, dn): + self.file = os.path.join(tempdir, file) + self.url = "tdb://" + self.file + self.basedn = basedn + self.substvars = {"BASEDN": self.basedn} + self.db = Ldb() + self._dn = dn + + def dn(self, rdn): + return self._dn(rdn, self.basedn) + + def connect(self): + return self.db.connect(self.url) + + self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn) + self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn) + self.templates = Target("templates.ldb", "cn=templates", None) + + self.samba3.connect() + self.templates.connect() + self.samba4.connect() + + def tearDown(self): + os.unlink(self.ldbfile) + os.unlink(self.samba3.file) + os.unlink(self.templates.file) + os.unlink(self.samba4.file) + super(MapBaseTestCase, self).tearDown() + + +class Samba3SamTestCase(MapBaseTestCase): + def setUp(self): + super(Samba3SamTestCase, self).setUp() + ldb = Ldb(self.ldburl) + self.setup_data(self.samba3, open(os.path.join(datadir, "samba3.ldif"), 'r').read()) + self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read()) + ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read() + ldb.add_ldif(substitute_var(ldif, self.samba4.substvars)) + self.setup_modules(ldb, self.samba3, self.samba4) + + self.ldb = Ldb(self.ldburl) + + def test_s3sam_search(self): + ldb = self.ldb print "Looking up by non-mapped attribute" msg = ldb.search(expression="(cn=Administrator)") self.assertEquals(len(msg), 1) @@ -89,15 +145,16 @@ replicateEntries: @INDEXLIST (str(msg[i].dn) == "unixName=nobody,ou=Users,dc=vernstok,dc=nl")) - def _test_s3sam_modify(ldb, s3): + def test_s3sam_modify(self): + ldb = self.ldb + s3 = self.samba3 print "Adding a record that will be fallbacked" - ldb.add_ldif(""" -dn: cn=Foo -foo: bar -blah: Blie -cn: Foo -showInAdvancedViewOnly: TRUE - """) + ldb.add({"dn": "cn=Foo", + "foo": "bar", + "blah": "Blie", + "cn": "Foo", + "showInAdvancedViewOnly": "TRUE"} + ) print "Checking for existence of record (local)" # TODO: This record must be searched in the local database, which is currently only supported for base searches @@ -105,21 +162,18 @@ showInAdvancedViewOnly: TRUE # TODO: Actually, this version should work as well but doesn't... # # - attrs = ['foo','blah','cn','showInAdvancedViewOnly'] - msg = ldb.search(expression="(cn=Foo)", base="cn=Foo", scope=ldb.LDB_SCOPE_BASE, attrs=attrs) + msg = ldb.search(expression="(cn=Foo)", base="cn=Foo", scope=LDB_SCOPE_BASE, attrs=['foo','blah','cn','showInAdvancedViewOnly']) self.assertEquals(len(msg), 1) self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE") self.assertEquals(msg[0]["foo"], "bar") self.assertEquals(msg[0]["blah"], "Blie") print "Adding record that will be mapped" - ldb.add_ldif(""" -dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl -objectClass: user -unixName: bin -sambaUnicodePwd: geheim -cn: Niemand -""") + ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl", + "objectClass": "user", + "unixName": "bin", + "sambaUnicodePwd": "geheim", + "cn": "Niemand"}) print "Checking for existence of record (remote)" msg = ldb.search(expression="(unixName=bin)", attrs=['unixName','cn','dn', 'sambaUnicodePwd']) @@ -203,7 +257,22 @@ delete: description msg = ldb.search(expression="(cn=Niemand2)") self.assertEquals(len(msg), 0) - def _test_map_search(self, ldb, s3, s4): + + +class MapTestCase(MapBaseTestCase): + def setUp(self): + super(MapTestCase, self).setUp() + ldb = Ldb(self.ldburl) + self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read()) + ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read() + ldb.add_ldif(substitute_var(ldif, self.samba4.substvars)) + self.setup_modules(ldb, self.samba3, self.samba4) + self.ldb = Ldb(self.ldburl) + + def test_map_search(self): + s3 = self.samba3 + ldb = self.ldb + s4 = self.samba4 print "Running search tests on mapped data" ldif = """ dn: """ + "sambaDomainName=TESTS,""" + s3.basedn + """ @@ -287,7 +356,7 @@ description: y # Search remote record by local DN dn = s4.dn("cn=A") attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(str(res[0].dn)), dn) self.assertEquals(res[0]["dnsHostName"], undefined) @@ -296,7 +365,7 @@ description: y # Search remote record by remote DN dn = s3.dn("cn=A") attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"] - res = s3.db.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + res = s3.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(str(res[0].dn)), dn) self.assertEquals(res[0]["dnsHostName"], undefined) @@ -306,7 +375,7 @@ description: y # Search split record by local DN dn = s4.dn("cn=X") attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(str(res[0].dn)), dn) self.assertEquals(res[0]["dnsHostName"], "x") @@ -315,7 +384,7 @@ description: y # Search split record by remote DN dn = s3.dn("cn=X") attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"] - res = s3.db.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + res = s3.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(str(res[0].dn)), dn) self.assertEquals(res[0]["dnsHostName"], undefined) @@ -326,7 +395,7 @@ description: y # Search by ignored attribute attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(revision=x)", scope=ldb.SCOPE_DEFAULT, attrs=attrs) + res = ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, attrs=attrs) self.assertEquals(len(res), 2) self.assertEquals(str(str(res[0].dn)), s4.dn("cn=Y")) self.assertEquals(res[0]["dnsHostName"], "y") @@ -337,7 +406,7 @@ description: y # Search by kept attribute attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(description=y)", scope=ldb.SCOPE_DEFAULT, attrs=attrs) + res = ldb.search(expression="(description=y)", scope=SCOPE_DEFAULT, attrs=attrs) self.assertEquals(len(res), 2) self.assertEquals(str(str(res[0].dn)), s4.dn("cn=Z")) self.assertEquals(res[0]["dnsHostName"], "z") @@ -348,7 +417,7 @@ description: y # Search by renamed attribute attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(badPwdCount=x)", scope=ldb.SCOPE_DEFAULT, attrs=attrs) + res = ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT, attrs=attrs) self.assertEquals(len(res), 2) self.assertEquals(str(res[0].dn), s4.dn("cn=B")) self.assertEquals(res[0]["dnsHostName"], undefined) @@ -673,34 +742,32 @@ description: y for dn in dns: ldb.delete(dn) - def _test_map_modify(self, ldb, s3, s4): - print "Running modification tests on mapped data" - - print "Testing modification of local records" + def test_map_modify_local(self): + """Modification of local records.""" + s3 = self.samba3 + ldb = self.ldb + s4 = self.samba4 # Add local record dn = "cn=test,dc=idealx,dc=org" - ldif = """ -dn: """ + dn + """ -cn: test -foo: bar -revision: 1 -description: test -""" - ldb.add_ldif(ldif) + ldb.add({"dn": dn, + "cn": "test", + "foo": "bar", + "revision": "1", + "description": "test"}) # Check it's there attrs = ["foo", "revision", "description"] - res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["foo"], "bar") self.assertEquals(res[0]["revision"], "1") self.assertEquals(res[0]["description"], "test") # Check it's not in the local db - res = s4.db.search("(cn=test)", NULL, ldb.SCOPE_DEFAULT, attrs) + res = s4.db.search(expression="(cn=test)", scope=SCOPE_DEFAULT, attrs=attrs) self.assertEquals(len(res), 0) # Check it's not in the remote db - res = s3.db.search("(cn=test)", NULL, ldb.SCOPE_DEFAULT, attrs) + res = s3.db.search(expression="(cn=test)", scope=SCOPE_DEFAULT, attrs=attrs) self.assertEquals(len(res), 0) # Modify local record @@ -713,7 +780,7 @@ description: foo """ ldb.modify_ldif(ldif) # Check in local db - res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["foo"], "baz") @@ -724,7 +791,7 @@ description: foo dn2 = "cn=toast,dc=idealx,dc=org" ldb.rename(dn, dn2) # Check in local db - res = ldb.search(dn2, scope=ldb.SCOPE_BASE, attrs=attrs) + res = ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["foo"], "baz") @@ -734,25 +801,26 @@ description: foo # Delete local record ldb.delete(dn2) # Check it's gone - res = ldb.search(dn2, scope=ldb.SCOPE_BASE) + res = ldb.search(dn2, scope=SCOPE_BASE) self.assertEquals(len(res), 0) - print "Testing modification of remote records" + def test_map_modify_remote_remote(self): + """Modification of remote data of remote records""" + s3 = self.samba3 + ldb = self.ldb + s4 = self.samba4 # Add remote record dn = s4.dn("cn=test") dn2 = s3.dn("cn=test") - ldif = """ -dn: """ + dn2 + """ -cn: test -description: foo -sambaBadPasswordCount: 3 -sambaNextRid: 1001 -""" - s3.db.add_ldif(ldif) + s3.db.add({"dn": dn2, + "cn": "test", + "description": "foo", + "sambaBadPasswordCount": "3", + "sambaNextRid": "1001"}) # Check it's there attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"] - res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs) + res = s3.db.search("", dn2, SCOPE_BASE, attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "foo") @@ -760,14 +828,14 @@ sambaNextRid: 1001 self.assertEquals(res[0]["sambaNextRid"], "1001") # Check in mapped db attrs = ["description", "badPwdCount", "nextRid"] - res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "foo") self.assertEquals(res[0]["badPwdCount"], "3") self.assertEquals(res[0]["nextRid"], "1001") # Check in local db - res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs) + res = s4.db.search("", dn, SCOPE_BASE, attrs) self.assertEquals(len(res), 0) # Modify remote data of remote record @@ -781,7 +849,7 @@ badPwdCount: 4 ldb.modify_ldif(ldif) # Check in mapped db attrs = ["description", "badPwdCount", "nextRid"] - res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "test") @@ -789,7 +857,7 @@ badPwdCount: 4 self.assertEquals(res[0]["nextRid"], "1001") # Check in remote db attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"] - res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs) + res = s3.db.search("", dn2, SCOPE_BASE, attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") @@ -802,7 +870,7 @@ badPwdCount: 4 # Check in mapped db dn = dn2 attrs = ["description", "badPwdCount", "nextRid"] - res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "test") @@ -811,7 +879,7 @@ badPwdCount: 4 # Check in remote db dn2 = s3.dn("cn=toast") attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"] - res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs) + res = s3.db.search("", dn2, SCOPE_BASE, attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") @@ -821,23 +889,26 @@ badPwdCount: 4 # Delete remote record ldb.delete(dn) # Check in mapped db - res = ldb.search(dn, scope=ldb.SCOPE_BASE) + res = ldb.search(dn, scope=SCOPE_BASE) self.assertEquals(len(res), 0) # Check in remote db - res = s3.db.search("", dn2, ldb.SCOPE_BASE) + res = s3.db.search("", dn2, SCOPE_BASE) self.assertEquals(len(res), 0) + def test_map_modify_remote_local(self): + """Modification of local data of remote records""" + s3 = self.samba3 + ldb = self.ldb + s4 = self.samba4 + # Add remote record (same as before) dn = s4.dn("cn=test") dn2 = s3.dn("cn=test") - ldif = """ -dn: """ + dn2 + """ -cn: test -description: foo -sambaBadPasswordCount: 3 -sambaNextRid: 1001 -""" - s3.db.add_ldif(ldif) + s3.db.add({"dn": dn2, + "cn": "test", + "description": "foo", + "sambaBadPasswordCount": "3", + "sambaNextRid": "1001"}) # Modify local data of remote record ldif = """ @@ -850,19 +921,19 @@ description: test ldb.modify_ldif(ldif) # Check in mapped db attrs = ["revision", "description"] - res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "test") self.assertEquals(res[0]["revision"], "1") # Check in remote db - res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs) + res = s3.db.search("", dn2, SCOPE_BASE, attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") self.assertEquals(res[0]["revision"], undefined) # Check in local db - res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs) + res = s4.db.search("", dn, SCOPE_BASE, attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], undefined) @@ -871,23 +942,25 @@ description: test # Delete (newly) split record ldb.delete(dn) - print "Testing modification of split records" + def test_map_modify_split(self): + """Testing modification of split records""" + s3 = self.samba3 + ldb = self.ldb + s4 = self.samba4 # Add split record dn = s4.dn("cn=test") dn2 = s3.dn("cn=test") - ldif = """ -dn: """ + dn + """ -cn: test -description: foo -badPwdCount: 3 -nextRid: 1001 -revision: 1 -""" - ldb.add_ldif(ldif) + ldb.add({ + "dn": dn, + "cn": "test", + "description": "foo", + "badPwdCount": "3", + "nextRid": "1001", + "revision": "1"}) # Check it's there attrs = ["description", "badPwdCount", "nextRid", "revision"] - res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "foo") @@ -895,7 +968,7 @@ revision: 1 self.assertEquals(res[0]["nextRid"], "1001") self.assertEquals(res[0]["revision"], "1") # Check in local db - res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs) + res = s4.db.search("", dn, SCOPE_BASE, attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], undefined) @@ -904,7 +977,7 @@ revision: 1 self.assertEquals(res[0]["revision"], "1") # Check in remote db attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"] - res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs) + res = s3.db.search("", dn2, SCOPE_BASE, attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "foo") @@ -925,7 +998,7 @@ revision: 2 ldb.modify_ldif(ldif) # Check in mapped db attrs = ["description", "badPwdCount", "nextRid", "revision"] - res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "test") @@ -933,7 +1006,7 @@ revision: 2 self.assertEquals(res[0]["nextRid"], "1001") self.assertEquals(res[0]["revision"], "2") # Check in local db - res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs) + res = s4.db.search("", dn, SCOPE_BASE, attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], undefined) @@ -942,7 +1015,7 @@ revision: 2 self.assertEquals(res[0]["revision"], "2") # Check in remote db attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"] - res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs) + res = s3.db.search("", dn2, SCOPE_BASE, attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") @@ -956,7 +1029,7 @@ revision: 2 # Check in mapped db dn = dn2 attrs = ["description", "badPwdCount", "nextRid", "revision"] - res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs) + res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "test") @@ -964,7 +1037,7 @@ revision: 2 self.assertEquals(res[0]["nextRid"], "1001") self.assertEquals(res[0]["revision"], "2") # Check in local db - res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs) + res = s4.db.search("", dn, SCOPE_BASE, attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], undefined) @@ -974,7 +1047,7 @@ revision: 2 # Check in remote db dn2 = s3.dn("cn=toast") attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"] - res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs) + res = s3.db.search("", dn2, SCOPE_BASE, attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") @@ -985,82 +1058,11 @@ revision: 2 # Delete split record ldb.delete(dn) # Check in mapped db - res = ldb.search(dn, scope=ldb.SCOPE_BASE) + res = ldb.search(dn, scope=SCOPE_BASE) self.assertEquals(len(res), 0) # Check in local db - res = s4.db.search("", dn, ldb.SCOPE_BASE) + res = s4.db.search("", dn, SCOPE_BASE) self.assertEquals(len(res), 0) # Check in remote db - res = s3.db.search("", dn2, ldb.SCOPE_BASE) + res = s3.db.search("", dn2, SCOPE_BASE) self.assertEquals(len(res), 0) - - def setUp(self): - super(Samba3SamTestCase, self).setUp() - - def make_dn(basedn, rdn): - return rdn + ",sambaDomainName=TESTS," + basedn - - def make_s4dn(basedn, rdn): - return rdn + "," + basedn - - self.ldbfile = os.path.join(self.tempdir, "test.ldb") - self.ldburl = "tdb://" + self.ldbfile - - tempdir = self.tempdir - print tempdir - - class Target: - """Simple helper class that contains data for a specific SAM connection.""" - def __init__(self, file, basedn, dn): - self.file = os.path.join(tempdir, file) - self.url = "tdb://" + self.file - self.basedn = basedn - self.substvars = {"BASEDN": self.basedn} - self.db = Ldb() - self._dn = dn - - def dn(self, rdn): - return self._dn(rdn, self.basedn) - - def connect(self): - return self.db.connect(self.url) - - self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn) - self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn) - self.templates = Target("templates.ldb", "cn=templates", None) - - self.samba3.connect() - self.templates.connect() - self.samba4.connect() - - def tearDown(self): - os.unlink(self.ldbfile) - os.unlink(self.samba3.file) - os.unlink(self.templates.file) - os.unlink(self.samba4.file) - super(Samba3SamTestCase, self).tearDown() - - def test_s3sam(self): - ldb = Ldb(self.ldburl) - self.setup_data(self.samba3, open(os.path.join(datadir, "samba3.ldif"), 'r').read()) - self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read()) - ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read() - ldb.add_ldif(substitute_var(ldif, self.samba4.substvars)) - self.setup_modules(ldb, self.samba3, self.samba4) - - ldb = Ldb(self.ldburl) - - self._test_s3sam_search(ldb) - self._test_s3sam_modify(ldb, self.samba3) - - def test_map(self): - ldb = Ldb(self.ldburl) - self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read()) - ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read() - ldb.add_ldif(substitute_var(ldif, self.samba4.substvars)) - self.setup_modules(ldb, self.samba3, self.samba4) - - ldb = Ldb(self.ldburl) - self._test_map_search(ldb, self.samba3, self.samba4) - self._test_map_modify(ldb, self.samba3, self.samba4) - -- cgit From cf80a01591d57d346e42a0a0f9d662cc24ddff51 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Tue, 1 Jan 2008 22:04:57 -0600 Subject: r26637: More work converting to Python. (This used to be commit 84f1e82d8fe5ecca75e2d7048d1b8b409abcb9b7) --- source4/dsdb/samdb/ldb_modules/tests/samba3sam.py | 32 +++++++++++------------ 1 file changed, 16 insertions(+), 16 deletions(-) (limited to 'source4/dsdb/samdb/ldb_modules/tests/samba3sam.py') diff --git a/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py b/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py index 16135c1681..86b94fb8ec 100644 --- a/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py +++ b/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py @@ -162,7 +162,7 @@ class Samba3SamTestCase(MapBaseTestCase): # TODO: Actually, this version should work as well but doesn't... # # - msg = ldb.search(expression="(cn=Foo)", base="cn=Foo", scope=LDB_SCOPE_BASE, attrs=['foo','blah','cn','showInAdvancedViewOnly']) + msg = ldb.search(expression="(cn=Foo)", base="cn=Foo", scope=SCOPE_BASE, attrs=['foo','blah','cn','showInAdvancedViewOnly']) self.assertEquals(len(msg), 1) self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE") self.assertEquals(msg[0]["foo"], "bar") @@ -820,7 +820,7 @@ description: foo "sambaNextRid": "1001"}) # Check it's there attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"] - res = s3.db.search("", dn2, SCOPE_BASE, attrs) + res = s3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "foo") @@ -835,7 +835,7 @@ description: foo self.assertEquals(res[0]["badPwdCount"], "3") self.assertEquals(res[0]["nextRid"], "1001") # Check in local db - res = s4.db.search("", dn, SCOPE_BASE, attrs) + res = s4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 0) # Modify remote data of remote record @@ -857,7 +857,7 @@ badPwdCount: 4 self.assertEquals(res[0]["nextRid"], "1001") # Check in remote db attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"] - res = s3.db.search("", dn2, SCOPE_BASE, attrs) + res = s3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") @@ -879,7 +879,7 @@ badPwdCount: 4 # Check in remote db dn2 = s3.dn("cn=toast") attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"] - res = s3.db.search("", dn2, SCOPE_BASE, attrs) + res = s3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") @@ -892,7 +892,7 @@ badPwdCount: 4 res = ldb.search(dn, scope=SCOPE_BASE) self.assertEquals(len(res), 0) # Check in remote db - res = s3.db.search("", dn2, SCOPE_BASE) + res = s3.db.search(dn2, scope=SCOPE_BASE) self.assertEquals(len(res), 0) def test_map_modify_remote_local(self): @@ -927,13 +927,13 @@ description: test self.assertEquals(res[0]["description"], "test") self.assertEquals(res[0]["revision"], "1") # Check in remote db - res = s3.db.search("", dn2, SCOPE_BASE, attrs) + res = s3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") self.assertEquals(res[0]["revision"], undefined) # Check in local db - res = s4.db.search("", dn, SCOPE_BASE, attrs) + res = s4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], undefined) @@ -968,7 +968,7 @@ description: test self.assertEquals(res[0]["nextRid"], "1001") self.assertEquals(res[0]["revision"], "1") # Check in local db - res = s4.db.search("", dn, SCOPE_BASE, attrs) + res = s4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], undefined) @@ -977,7 +977,7 @@ description: test self.assertEquals(res[0]["revision"], "1") # Check in remote db attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"] - res = s3.db.search("", dn2, SCOPE_BASE, attrs) + res = s3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "foo") @@ -1006,7 +1006,7 @@ revision: 2 self.assertEquals(res[0]["nextRid"], "1001") self.assertEquals(res[0]["revision"], "2") # Check in local db - res = s4.db.search("", dn, SCOPE_BASE, attrs) + res = s4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], undefined) @@ -1015,7 +1015,7 @@ revision: 2 self.assertEquals(res[0]["revision"], "2") # Check in remote db attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"] - res = s3.db.search("", dn2, SCOPE_BASE, attrs) + res = s3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") @@ -1037,7 +1037,7 @@ revision: 2 self.assertEquals(res[0]["nextRid"], "1001") self.assertEquals(res[0]["revision"], "2") # Check in local db - res = s4.db.search("", dn, SCOPE_BASE, attrs) + res = s4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], undefined) @@ -1047,7 +1047,7 @@ revision: 2 # Check in remote db dn2 = s3.dn("cn=toast") attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"] - res = s3.db.search("", dn2, SCOPE_BASE, attrs) + res = s3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") @@ -1061,8 +1061,8 @@ revision: 2 res = ldb.search(dn, scope=SCOPE_BASE) self.assertEquals(len(res), 0) # Check in local db - res = s4.db.search("", dn, SCOPE_BASE) + res = s4.db.search(dn, scope=SCOPE_BASE) self.assertEquals(len(res), 0) # Check in remote db - res = s3.db.search("", dn2, SCOPE_BASE) + res = s3.db.search(dn2, scope=SCOPE_BASE) self.assertEquals(len(res), 0) -- cgit From 5e03b921825ffe177bf9d00ed1e12de02728da75 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Wed, 2 Jan 2008 01:52:31 -0600 Subject: r26642: samba3sam.py: Remove more EJS-specific code. (This used to be commit 7d14b657b3d59924b15f4f84bbd5745cd7f759ef) --- source4/dsdb/samdb/ldb_modules/tests/samba3sam.py | 500 +++++++++++----------- 1 file changed, 239 insertions(+), 261 deletions(-) (limited to 'source4/dsdb/samdb/ldb_modules/tests/samba3sam.py') diff --git a/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py b/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py index 86b94fb8ec..7c408d0436 100644 --- a/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py +++ b/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py @@ -26,7 +26,7 @@ import os import sys import samba import ldb -from ldb import SCOPE_DEFAULT, SCOPE_BASE +from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE from samba import Ldb, substitute_var from samba.tests import LdbTestCase, TestCaseInTempDir @@ -105,27 +105,25 @@ class Samba3SamTestCase(MapBaseTestCase): ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read() ldb.add_ldif(substitute_var(ldif, self.samba4.substvars)) self.setup_modules(ldb, self.samba3, self.samba4) - self.ldb = Ldb(self.ldburl) def test_s3sam_search(self): - ldb = self.ldb print "Looking up by non-mapped attribute" - msg = ldb.search(expression="(cn=Administrator)") + msg = self.ldb.search(expression="(cn=Administrator)") self.assertEquals(len(msg), 1) self.assertEquals(msg[0]["cn"], "Administrator") print "Looking up by mapped attribute" - msg = ldb.search(expression="(name=Backup Operators)") + msg = self.ldb.search(expression="(name=Backup Operators)") self.assertEquals(len(msg), 1) self.assertEquals(msg[0]["name"], "Backup Operators") print "Looking up by old name of renamed attribute" - msg = ldb.search(expression="(displayName=Backup Operators)") + msg = self.ldb.search(expression="(displayName=Backup Operators)") self.assertEquals(len(msg), 0) print "Looking up mapped entry containing SID" - msg = ldb.search(expression="(cn=Replicator)") + msg = self.ldb.search(expression="(cn=Replicator)") self.assertEquals(len(msg), 1) print msg[0].dn self.assertEquals(str(msg[0].dn), "cn=Replicator,ou=Groups,dc=vernstok,dc=nl") @@ -138,7 +136,7 @@ class Samba3SamTestCase(MapBaseTestCase): self.assertEquals(oc[i] == "posixGroup" or oc[i], "group") print "Looking up by objectClass" - msg = ldb.search(expression="(|(objectClass=user)(cn=Administrator))") + msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))") self.assertEquals(len(msg), 2) for i in range(len(msg)): self.assertEquals((str(msg[i].dn), "unixName=Administrator,ou=Users,dc=vernstok,dc=nl") or @@ -146,10 +144,8 @@ class Samba3SamTestCase(MapBaseTestCase): def test_s3sam_modify(self): - ldb = self.ldb - s3 = self.samba3 print "Adding a record that will be fallbacked" - ldb.add({"dn": "cn=Foo", + self.ldb.add({"dn": "cn=Foo", "foo": "bar", "blah": "Blie", "cn": "Foo", @@ -162,27 +158,28 @@ class Samba3SamTestCase(MapBaseTestCase): # TODO: Actually, this version should work as well but doesn't... # # - msg = ldb.search(expression="(cn=Foo)", base="cn=Foo", scope=SCOPE_BASE, attrs=['foo','blah','cn','showInAdvancedViewOnly']) + msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo", scope=SCOPE_BASE, attrs=['foo','blah','cn','showInAdvancedViewOnly']) self.assertEquals(len(msg), 1) self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE") self.assertEquals(msg[0]["foo"], "bar") self.assertEquals(msg[0]["blah"], "Blie") print "Adding record that will be mapped" - ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl", + self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl", "objectClass": "user", "unixName": "bin", "sambaUnicodePwd": "geheim", "cn": "Niemand"}) print "Checking for existence of record (remote)" - msg = ldb.search(expression="(unixName=bin)", attrs=['unixName','cn','dn', 'sambaUnicodePwd']) + msg = self.ldb.search(expression="(unixName=bin)", + attrs=['unixName','cn','dn', 'sambaUnicodePwd']) self.assertEquals(len(msg), 1) self.assertEquals(msg[0]["cn"], "Niemand") self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim") print "Checking for existence of record (local && remote)" - msg = ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", + msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", attrs=['unixName','cn','dn', 'sambaUnicodePwd']) self.assertEquals(len(msg), 1) # TODO: should check with more records self.assertEquals(msg[0]["cn"], "Niemand") @@ -190,7 +187,7 @@ class Samba3SamTestCase(MapBaseTestCase): self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim") print "Checking for existence of record (local || remote)" - msg = ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", + msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", attrs=['unixName','cn','dn', 'sambaUnicodePwd']) print "got " + len(msg) + " replies" self.assertEquals(len(msg), 1) # TODO: should check with more records @@ -204,7 +201,7 @@ class Samba3SamTestCase(MapBaseTestCase): self.assertEquals(msg[0]["displayName"], "Niemand") print "Adding attribute..." - ldb.modify_ldif(""" + self.ldb.modify_ldif(""" dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl changetype: modify add: description @@ -212,13 +209,13 @@ description: Blah """) print "Checking whether changes are still there..." - msg = ldb.search(expression="(cn=Niemand)") + msg = self.ldb.search(expression="(cn=Niemand)") self.assertTrue(len(msg) >= 1) self.assertEquals(msg[0]["cn"], "Niemand") self.assertEquals(msg[0]["description"], "Blah") print "Modifying attribute..." - ldb.modify_ldif(""" + self.ldb.modify_ldif(""" dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl changetype: modify replace: description @@ -226,35 +223,35 @@ description: Blie """) print "Checking whether changes are still there..." - msg = ldb.search(expression="(cn=Niemand)") + msg = self.ldb.search(expression="(cn=Niemand)") self.assertTrue(len(msg) >= 1) self.assertEquals(msg[0]["description"], "Blie") print "Deleting attribute..." - ldb.modify_ldif(""" + self.ldb.modify_ldif(""" dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl changetype: modify delete: description """) print "Checking whether changes are no longer there..." - msg = ldb.search(expression="(cn=Niemand)") + msg = self.ldb.search(expression="(cn=Niemand)") self.assertTrue(len(msg) >= 1) - self.assertEquals(msg[0]["description"], undefined) + self.assertTrue(not "description" in res[0]) print "Renaming record..." - ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", "cn=Niemand2,cn=Users,dc=vernstok,dc=nl") + self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", "cn=Niemand2,cn=Users,dc=vernstok,dc=nl") print "Checking whether DN has changed..." - msg = ldb.search(expression="(cn=Niemand2)") + msg = self.ldb.search(expression="(cn=Niemand2)") self.assertEquals(len(msg), 1) self.assertEquals(str(msg[0].dn), "cn=Niemand2,cn=Users,dc=vernstok,dc=nl") print "Deleting record..." - ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl") + self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl") print "Checking whether record is gone..." - msg = ldb.search(expression="(cn=Niemand2)") + msg = self.ldb.search(expression="(cn=Niemand2)") self.assertEquals(len(msg), 0) @@ -270,22 +267,19 @@ class MapTestCase(MapBaseTestCase): self.ldb = Ldb(self.ldburl) def test_map_search(self): - s3 = self.samba3 - ldb = self.ldb - s4 = self.samba4 print "Running search tests on mapped data" ldif = """ -dn: """ + "sambaDomainName=TESTS,""" + s3.basedn + """ +dn: """ + "sambaDomainName=TESTS,""" + self.samba3.basedn + """ objectclass: sambaDomain objectclass: top sambaSID: S-1-5-21-4231626423-2410014848-2360679739 sambaNextRid: 2000 sambaDomainName: TESTS""" - s3.db.add_ldif(substitute_var(ldif, s3.substvars)) + self.samba3.db.add_ldif(substitute_var(ldif, self.samba3.substvars)) print "Add a set of split records" ldif = """ -dn: """ + s4.dn("cn=X") + """ +dn: """ + self.samba4.dn("cn=X") + """ objectClass: user cn: X codePage: x @@ -297,7 +291,7 @@ description: x objectSid: S-1-5-21-4231626423-2410014848-2360679739-552 primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512 -dn: """ + s4.dn("cn=Y") + """ +dn: """ + self.samba4.dn("cn=Y") + """ objectClass: top cn: Y codePage: x @@ -307,7 +301,7 @@ nextRid: y lastLogon: y description: x -dn: """ + s4.dn("cn=Z") + """ +dn: """ + self.samba4.dn("cn=Z") + """ objectClass: top cn: Z codePage: x @@ -318,12 +312,12 @@ lastLogon: z description: y """ - ldb.add_ldif(substitute_var(ldif, s4.substvars)) + self.ldb.add_ldif(substitute_var(ldif, self.samba4.substvars)) print "Add a set of remote records" ldif = """ -dn: """ + s3.dn("cn=A") + """ +dn: """ + self.samba3.dn("cn=A") + """ objectClass: posixAccount cn: A sambaNextRid: x @@ -333,7 +327,7 @@ description: x sambaSID: S-1-5-21-4231626423-2410014848-2360679739-552 sambaPrimaryGroupSID: S-1-5-21-4231626423-2410014848-2360679739-512 -dn: """ + s3.dn("cn=B") + """ +dn: """ + self.samba3.dn("cn=B") + """ objectClass: top cn:B sambaNextRid: x @@ -341,7 +335,7 @@ sambaBadPasswordCount: x sambaLogonTime: y description: x -dn: """ + s3.dn("cn=C") + """ +dn: """ + self.samba3.dn("cn=C") + """ objectClass: top cn: C sambaNextRid: x @@ -349,81 +343,81 @@ sambaBadPasswordCount: y sambaLogonTime: z description: y """ - s3.add_ldif(substitute_var(ldif, s3.substvars)) + self.samba3.add_ldif(substitute_var(ldif, self.samba3.substvars)) print "Testing search by DN" # Search remote record by local DN - dn = s4.dn("cn=A") + dn = self.samba4.dn("cn=A") attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(str(res[0].dn)), dn) - self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "x") # Search remote record by remote DN - dn = s3.dn("cn=A") + dn = self.samba3.dn("cn=A") attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"] - res = s3.db.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.samba3.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(str(res[0].dn)), dn) - self.assertEquals(res[0]["dnsHostName"], undefined) - self.assertEquals(res[0]["lastLogon"], undefined) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertTrue(not "lastLogon" in res[0]) self.assertEquals(res[0]["sambaLogonTime"], "x") # Search split record by local DN - dn = s4.dn("cn=X") + dn = self.samba4.dn("cn=X") attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(str(res[0].dn)), dn) self.assertEquals(res[0]["dnsHostName"], "x") self.assertEquals(res[0]["lastLogon"], "x") # Search split record by remote DN - dn = s3.dn("cn=X") + dn = self.samba3.dn("cn=X") attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"] - res = s3.db.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.samba3.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(str(res[0].dn)), dn) - self.assertEquals(res[0]["dnsHostName"], undefined) - self.assertEquals(res[0]["lastLogon"], undefined) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertTrue(not "lastLogon" in res[0]) self.assertEquals(res[0]["sambaLogonTime"], "x") print "Testing search by attribute" # Search by ignored attribute attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, attrs=attrs) + res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, attrs=attrs) self.assertEquals(len(res), 2) - self.assertEquals(str(str(res[0].dn)), s4.dn("cn=Y")) + self.assertEquals(str(str(res[0].dn)), self.samba4.dn("cn=Y")) self.assertEquals(res[0]["dnsHostName"], "y") self.assertEquals(res[0]["lastLogon"], "y") - self.assertEquals(str(str(res[1].dn)), s4.dn("cn=X")) + self.assertEquals(str(str(res[1].dn)), self.samba4.dn("cn=X")) self.assertEquals(res[1]["dnsHostName"], "x") self.assertEquals(res[1]["lastLogon"], "x") # Search by kept attribute attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(description=y)", scope=SCOPE_DEFAULT, attrs=attrs) + res = self.ldb.search(expression="(description=y)", scope=SCOPE_DEFAULT, attrs=attrs) self.assertEquals(len(res), 2) - self.assertEquals(str(str(res[0].dn)), s4.dn("cn=Z")) + self.assertEquals(str(str(res[0].dn)), self.samba4.dn("cn=Z")) self.assertEquals(res[0]["dnsHostName"], "z") self.assertEquals(res[0]["lastLogon"], "z") - self.assertEquals(str(str(res[1].dn)), s4.dn("cn=C")) - self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(str(str(res[1].dn)), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "z") # Search by renamed attribute attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT, attrs=attrs) + res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT, attrs=attrs) self.assertEquals(len(res), 2) - self.assertEquals(str(res[0].dn), s4.dn("cn=B")) - self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) + self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "y") - self.assertEquals(str(res[1].dn), s4.dn("cn=A")) - self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "x") # Search by converted attribute @@ -431,15 +425,15 @@ description: y # TODO: # Using the SID directly in the parse tree leads to conversion # errors, letting the search fail with no results. - #res = ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", NULL, ldb. SCOPE_DEFAULT, attrs) - res = ldb.search(expression="(objectSid=*)", attrs=attrs) + #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs) + res = self.ldb.search(expression="(objectSid=*)", attrs=attrs) self.assertEquals(len(res), 3) - self.assertEquals(str(res[0].dn), s4.dn("cn=X")) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X")) self.assertEquals(res[0]["dnsHostName"], "x") self.assertEquals(res[0]["lastLogon"], "x") self.assertEquals(res[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552") - self.assertEquals(str(res[1].dn), s4.dn("cn=A")) - self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "x") self.assertEquals(res[1]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552") @@ -447,10 +441,10 @@ description: y # In most cases, this even works when the mapping is missing # a `convert_operator' by enumerating the remote db. attrs = ["dnsHostName", "lastLogon", "primaryGroupID"] - res = ldb.search(expression="(primaryGroupID=512)", attrs=attrs) + res = self.ldb.search(expression="(primaryGroupID=512)", attrs=attrs) self.assertEquals(len(res), 1) - self.assertEquals(str(res[0].dn), s4.dn("cn=A")) - self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "x") self.assertEquals(res[0]["primaryGroupID"], "512") @@ -470,40 +464,40 @@ description: y # Search by remote name of renamed attribute */ attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(sambaBadPasswordCount=*)", attrs=attrs) + res = self.ldb.search(expression="(sambaBadPasswordCount=*)", attrs=attrs) self.assertEquals(len(res), 0) # Search by objectClass attrs = ["dnsHostName", "lastLogon", "objectClass"] - res = ldb.search(expression="(objectClass=user)", attrs=attrs) + res = self.ldb.search(expression="(objectClass=user)", attrs=attrs) self.assertEquals(len(res), 2) - self.assertEquals(str(res[0].dn), s4.dn("cn=X")) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X")) self.assertEquals(res[0]["dnsHostName"], "x") self.assertEquals(res[0]["lastLogon"], "x") self.assertTrue(res[0]["objectClass"] is not None) self.assertEquals(res[0]["objectClass"][0], "user") - self.assertEquals(str(res[1].dn), s4.dn("cn=A")) - self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "x") self.assertTrue(res[1]["objectClass"] is not None) self.assertEquals(res[1]["objectClass"][0], "user") # Prove that the objectClass is actually used for the search - res = ldb.search(expression="(|(objectClass=user)(badPwdCount=x))", attrs=attrs) + res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))", attrs=attrs) self.assertEquals(len(res), 3) - self.assertEquals(str(res[0].dn), s4.dn("cn=B")) - self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) + self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "y") self.assertTrue(res[0]["objectClass"] is not None) for oc in set(res[0]["objectClass"]): self.assertEquals(oc, "user") - self.assertEquals(str(res[1].dn), s4.dn("cn=X")) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) self.assertEquals(res[1]["dnsHostName"], "x") self.assertEquals(res[1]["lastLogon"], "x") self.assertTrue(res[1]["objectClass"] is not None) self.assertEquals(res[1]["objectClass"][0], "user") - self.assertEquals(str(res[2].dn), s4.dn("cn=A")) - self.assertEquals(res[2]["dnsHostName"], undefined) + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[2]) self.assertEquals(res[2]["lastLogon"], "x") self.assertTrue(res[2]["objectClass"] is not None) self.assertEquals(res[2]["objectClass"][0], "user") @@ -512,262 +506,258 @@ description: y # Search by conjunction of local attributes attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(&(codePage=x)(revision=x))", attrs=attrs) + res = self.ldb.search(expression="(&(codePage=x)(revision=x))", attrs=attrs) self.assertEquals(len(res), 2) - self.assertEquals(str(res[0].dn), s4.dn("cn=Y")) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) self.assertEquals(res[0]["dnsHostName"], "y") self.assertEquals(res[0]["lastLogon"], "y") - self.assertEquals(str(res[1].dn), s4.dn("cn=X")) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) self.assertEquals(res[1]["dnsHostName"], "x") self.assertEquals(res[1]["lastLogon"], "x") # Search by conjunction of remote attributes attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(&(lastLogon=x)(description=x))", attrs=attrs) + res = self.ldb.search(expression="(&(lastLogon=x)(description=x))", attrs=attrs) self.assertEquals(len(res), 2) - self.assertEquals(str(res[0].dn), s4.dn("cn=X")) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X")) self.assertEquals(res[0]["dnsHostName"], "x") self.assertEquals(res[0]["lastLogon"], "x") - self.assertEquals(str(res[1].dn), s4.dn("cn=A")) - self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "x") # Search by conjunction of local and remote attribute attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(&(codePage=x)(description=x))", attrs=attrs) + res = self.ldb.search(expression="(&(codePage=x)(description=x))", attrs=attrs) self.assertEquals(len(res), 2) - self.assertEquals(str(res[0].dn), s4.dn("cn=Y")) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) self.assertEquals(res[0]["dnsHostName"], "y") self.assertEquals(res[0]["lastLogon"], "y") - self.assertEquals(str(res[1].dn), s4.dn("cn=X")) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) self.assertEquals(res[1]["dnsHostName"], "x") self.assertEquals(res[1]["lastLogon"], "x") # Search by conjunction of local and remote attribute w/o match attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(&(codePage=x)(nextRid=x))", attrs=attrs) + res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))", attrs=attrs) self.assertEquals(len(res), 0) - res = ldb.search(expression="(&(revision=x)(lastLogon=z))", attrs=attrs) + res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))", attrs=attrs) self.assertEquals(len(res), 0) # Search by disjunction of local attributes attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(|(revision=x)(dnsHostName=x))", attrs=attrs) + res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))", attrs=attrs) self.assertEquals(len(res), 2) - self.assertEquals(str(res[0].dn), s4.dn("cn=Y")) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) self.assertEquals(res[0]["dnsHostName"], "y") self.assertEquals(res[0]["lastLogon"], "y") - self.assertEquals(str(res[1].dn), s4.dn("cn=X")) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) self.assertEquals(res[1]["dnsHostName"], "x") self.assertEquals(res[1]["lastLogon"], "x") # Search by disjunction of remote attributes attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", attrs=attrs) + res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", attrs=attrs) self.assertEquals(len(res), 3) - self.assertEquals(str(res[0].dn), s4.dn("cn=B")) - self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) + self.assertTrue("dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "y") - self.assertEquals(str(res[1].dn), s4.dn("cn=X")) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) self.assertEquals(res[1]["dnsHostName"], "x") self.assertEquals(res[1]["lastLogon"], "x") - self.assertEquals(str(res[2].dn), s4.dn("cn=A")) - self.assertEquals(res[2]["dnsHostName"], undefined) + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A")) + self.assertTrue("dnsHostName" in res[2]) self.assertEquals(res[2]["lastLogon"], "x") # Search by disjunction of local and remote attribute attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(|(revision=x)(lastLogon=y))", attrs=attrs) + res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))", attrs=attrs) self.assertEquals(len(res), 3) - self.assertEquals(str(res[0].dn), s4.dn("cn=Y")) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) self.assertEquals(res[0]["dnsHostName"], "y") self.assertEquals(res[0]["lastLogon"], "y") - self.assertEquals(str(res[1].dn), s4.dn("cn=B")) - self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B")) + self.assertTrue("dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "y") - self.assertEquals(str(res[2].dn), s4.dn("cn=X")) + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X")) self.assertEquals(res[2]["dnsHostName"], "x") self.assertEquals(res[2]["lastLogon"], "x") # Search by disjunction of local and remote attribute w/o match attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(|(codePage=y)(nextRid=z))", attrs=attrs) + res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))", attrs=attrs) self.assertEquals(len(res), 0) # Search by negated local attribute attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(!(revision=x))", attrs=attrs) + res = self.ldb.search(expression="(!(revision=x))", attrs=attrs) self.assertEquals(len(res), 5) - self.assertEquals(str(res[0].dn), s4.dn("cn=B")) - self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) + self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "y") - self.assertEquals(str(res[1].dn), s4.dn("cn=A")) - self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "x") - self.assertEquals(str(res[2].dn), s4.dn("cn=Z")) + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[2]["dnsHostName"], "z") self.assertEquals(res[2]["lastLogon"], "z") - self.assertEquals(str(res[3].dn), s4.dn("cn=C")) - self.assertEquals(res[3]["dnsHostName"], undefined) + self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[3]) self.assertEquals(res[3]["lastLogon"], "z") # Search by negated remote attribute attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(!(description=x))", attrs=attrs) + res = self.ldb.search(expression="(!(description=x))", attrs=attrs) self.assertEquals(len(res), 3) - self.assertEquals(str(res[0].dn), s4.dn("cn=Z")) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[0]["dnsHostName"], "z") self.assertEquals(res[0]["lastLogon"], "z") - self.assertEquals(str(res[1].dn), s4.dn("cn=C")) - self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "z") # Search by negated conjunction of local attributes attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(!(&(codePage=x)(revision=x)))", attrs=attrs) + res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))", attrs=attrs) self.assertEquals(len(res), 5) - self.assertEquals(str(res[0].dn), s4.dn("cn=B")) - self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) + self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "y") - self.assertEquals(str(res[1].dn), s4.dn("cn=A")) - self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "x") - self.assertEquals(str(res[2].dn), s4.dn("cn=Z")) + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[2]["dnsHostName"], "z") self.assertEquals(res[2]["lastLogon"], "z") - self.assertEquals(str(res[3].dn), s4.dn("cn=C")) - self.assertEquals(res[3]["dnsHostName"], undefined) + self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[3]) self.assertEquals(res[3]["lastLogon"], "z") # Search by negated conjunction of remote attributes attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(!(&(lastLogon=x)(description=x)))", attrs=attrs) + res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))", attrs=attrs) self.assertEquals(len(res), 5) - self.assertEquals(str(res[0].dn), s4.dn("cn=Y")) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) self.assertEquals(res[0]["dnsHostName"], "y") self.assertEquals(res[0]["lastLogon"], "y") - self.assertEquals(str(res[1].dn), s4.dn("cn=B")) - self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B")) + self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "y") - self.assertEquals(str(res[2].dn), s4.dn("cn=Z")) + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[2]["dnsHostName"], "z") self.assertEquals(res[2]["lastLogon"], "z") - self.assertEquals(str(res[3].dn), s4.dn("cn=C")) - self.assertEquals(res[3]["dnsHostName"], undefined) + self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[3]) self.assertEquals(res[3]["lastLogon"], "z") # Search by negated conjunction of local and remote attribute attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(!(&(codePage=x)(description=x)))", attrs=attrs) + res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))", attrs=attrs) self.assertEquals(len(res), 5) - self.assertEquals(str(res[0].dn), s4.dn("cn=B")) - self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) + self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "y") - self.assertEquals(str(res[1].dn), s4.dn("cn=A")) - self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "x") - self.assertEquals(str(res[2].dn), s4.dn("cn=Z")) + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[2]["dnsHostName"], "z") self.assertEquals(res[2]["lastLogon"], "z") - self.assertEquals(str(res[3].dn), s4.dn("cn=C")) - self.assertEquals(res[3]["dnsHostName"], undefined) + self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[3]) self.assertEquals(res[3]["lastLogon"], "z") # Search by negated disjunction of local attributes attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", attrs=attrs) - self.assertEquals(str(res[0].dn), s4.dn("cn=B")) - self.assertEquals(res[0]["dnsHostName"], undefined) + res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", attrs=attrs) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) + self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "y") - self.assertEquals(str(res[1].dn), s4.dn("cn=A")) - self.assertEquals(res[1]["dnsHostName"], undefined) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "x") - self.assertEquals(str(res[2].dn), s4.dn("cn=Z")) + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[2]["dnsHostName"], "z") self.assertEquals(res[2]["lastLogon"], "z") - self.assertEquals(str(res[3].dn), s4.dn("cn=C")) - self.assertEquals(res[3]["dnsHostName"], undefined) + self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[3]) self.assertEquals(res[3]["lastLogon"], "z") # Search by negated disjunction of remote attributes attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", attrs=attrs) + res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", attrs=attrs) self.assertEquals(len(res), 4) - self.assertEquals(str(res[0].dn), s4.dn("cn=Y")) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) self.assertEquals(res[0]["dnsHostName"], "y") self.assertEquals(res[0]["lastLogon"], "y") - self.assertEquals(str(res[1].dn), s4.dn("cn=Z")) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[1]["dnsHostName"], "z") self.assertEquals(res[1]["lastLogon"], "z") - self.assertEquals(str(res[2].dn), s4.dn("cn=C")) - self.assertEquals(res[2]["dnsHostName"], undefined) + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[2]) self.assertEquals(res[2]["lastLogon"], "z") # Search by negated disjunction of local and remote attribute attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", attrs=attrs) + res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", attrs=attrs) self.assertEquals(len(res), 4) - self.assertEquals(str(res[0].dn), s4.dn("cn=A")) - self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "x") - self.assertEquals(str(res[1].dn), s4.dn("cn=Z")) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[1]["dnsHostName"], "z") self.assertEquals(res[1]["lastLogon"], "z") - self.assertEquals(str(res[2].dn), s4.dn("cn=C")) - self.assertEquals(res[2]["dnsHostName"], undefined) + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[2]) self.assertEquals(res[2]["lastLogon"], "z") print "Search by complex parse tree" attrs = ["dnsHostName", "lastLogon"] - res = ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=attrs) + res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=attrs) self.assertEquals(len(res), 6) - self.assertEquals(str(res[0].dn), s4.dn("cn=B")) - self.assertEquals(res[0]["dnsHostName"], undefined) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) + self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "y") - self.assertEquals(str(res[1].dn), s4.dn("cn=X")) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) self.assertEquals(res[1]["dnsHostName"], "x") self.assertEquals(res[1]["lastLogon"], "x") - self.assertEquals(str(res[2].dn), s4.dn("cn=A")) - self.assertEquals(res[2]["dnsHostName"], undefined) + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[2]) self.assertEquals(res[2]["lastLogon"], "x") - self.assertEquals(str(res[3].dn), s4.dn("cn=Z")) + self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[3]["dnsHostName"], "z") self.assertEquals(res[3]["lastLogon"], "z") - self.assertEquals(str(res[4].dn), s4.dn("cn=C")) - self.assertEquals(res[4]["dnsHostName"], undefined) + self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[4]) self.assertEquals(res[4]["lastLogon"], "z") # Clean up - dns = [s4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]] + dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]] for dn in dns: - ldb.delete(dn) + self.ldb.delete(dn) def test_map_modify_local(self): """Modification of local records.""" - s3 = self.samba3 - ldb = self.ldb - s4 = self.samba4 - # Add local record dn = "cn=test,dc=idealx,dc=org" - ldb.add({"dn": dn, + self.ldb.add({"dn": dn, "cn": "test", "foo": "bar", "revision": "1", "description": "test"}) # Check it's there attrs = ["foo", "revision", "description"] - res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["foo"], "bar") self.assertEquals(res[0]["revision"], "1") self.assertEquals(res[0]["description"], "test") # Check it's not in the local db - res = s4.db.search(expression="(cn=test)", scope=SCOPE_DEFAULT, attrs=attrs) + res = self.samba4.db.search(expression="(cn=test)", scope=SCOPE_DEFAULT, attrs=attrs) self.assertEquals(len(res), 0) # Check it's not in the remote db - res = s3.db.search(expression="(cn=test)", scope=SCOPE_DEFAULT, attrs=attrs) + res = self.samba3.db.search(expression="(cn=test)", scope=SCOPE_DEFAULT, attrs=attrs) self.assertEquals(len(res), 0) # Modify local record @@ -778,9 +768,9 @@ foo: baz replace: description description: foo """ - ldb.modify_ldif(ldif) + self.ldb.modify_ldif(ldif) # Check in local db - res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["foo"], "baz") @@ -789,9 +779,9 @@ description: foo # Rename local record dn2 = "cn=toast,dc=idealx,dc=org" - ldb.rename(dn, dn2) + self.ldb.rename(dn, dn2) # Check in local db - res = ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs) + res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["foo"], "baz") @@ -799,28 +789,24 @@ description: foo self.assertEquals(res[0]["description"], "foo") # Delete local record - ldb.delete(dn2) + self.ldb.delete(dn2) # Check it's gone - res = ldb.search(dn2, scope=SCOPE_BASE) + res = self.ldb.search(dn2, scope=SCOPE_BASE) self.assertEquals(len(res), 0) def test_map_modify_remote_remote(self): """Modification of remote data of remote records""" - s3 = self.samba3 - ldb = self.ldb - s4 = self.samba4 - # Add remote record - dn = s4.dn("cn=test") - dn2 = s3.dn("cn=test") - s3.db.add({"dn": dn2, + dn = self.samba4.dn("cn=test") + dn2 = self.samba3.dn("cn=test") + self.samba3.db.add({"dn": dn2, "cn": "test", "description": "foo", "sambaBadPasswordCount": "3", "sambaNextRid": "1001"}) # Check it's there attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"] - res = s3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) + res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "foo") @@ -828,14 +814,14 @@ description: foo self.assertEquals(res[0]["sambaNextRid"], "1001") # Check in mapped db attrs = ["description", "badPwdCount", "nextRid"] - res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "foo") self.assertEquals(res[0]["badPwdCount"], "3") self.assertEquals(res[0]["nextRid"], "1001") # Check in local db - res = s4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 0) # Modify remote data of remote record @@ -846,10 +832,10 @@ description: test replace: badPwdCount badPwdCount: 4 """ - ldb.modify_ldif(ldif) + self.ldb.modify_ldif(ldif) # Check in mapped db attrs = ["description", "badPwdCount", "nextRid"] - res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "test") @@ -857,7 +843,7 @@ badPwdCount: 4 self.assertEquals(res[0]["nextRid"], "1001") # Check in remote db attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"] - res = s3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) + res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") @@ -865,21 +851,21 @@ badPwdCount: 4 self.assertEquals(res[0]["sambaNextRid"], "1001") # Rename remote record - dn2 = s4.dn("cn=toast") - ldb.rename(dn, dn2) + dn2 = self.samba4.dn("cn=toast") + self.ldb.rename(dn, dn2) # Check in mapped db dn = dn2 attrs = ["description", "badPwdCount", "nextRid"] - res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "test") self.assertEquals(res[0]["badPwdCount"], "4") self.assertEquals(res[0]["nextRid"], "1001") # Check in remote db - dn2 = s3.dn("cn=toast") + dn2 = self.samba3.dn("cn=toast") attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"] - res = s3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) + res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") @@ -887,24 +873,20 @@ badPwdCount: 4 self.assertEquals(res[0]["sambaNextRid"], "1001") # Delete remote record - ldb.delete(dn) + self.ldb.delete(dn) # Check in mapped db - res = ldb.search(dn, scope=SCOPE_BASE) + res = self.ldb.search(dn, scope=SCOPE_BASE) self.assertEquals(len(res), 0) # Check in remote db - res = s3.db.search(dn2, scope=SCOPE_BASE) + res = self.samba3.db.search(dn2, scope=SCOPE_BASE) self.assertEquals(len(res), 0) def test_map_modify_remote_local(self): """Modification of local data of remote records""" - s3 = self.samba3 - ldb = self.ldb - s4 = self.samba4 - # Add remote record (same as before) - dn = s4.dn("cn=test") - dn2 = s3.dn("cn=test") - s3.db.add({"dn": dn2, + dn = self.samba4.dn("cn=test") + dn2 = self.samba3.dn("cn=test") + self.samba3.db.add({"dn": dn2, "cn": "test", "description": "foo", "sambaBadPasswordCount": "3", @@ -918,40 +900,36 @@ revision: 1 replace: description description: test """ - ldb.modify_ldif(ldif) + self.ldb.modify_ldif(ldif) # Check in mapped db attrs = ["revision", "description"] - res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "test") self.assertEquals(res[0]["revision"], "1") # Check in remote db - res = s3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) + res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") - self.assertEquals(res[0]["revision"], undefined) + self.assertTrue(not "revision" in res[0]) # Check in local db - res = s4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) - self.assertEquals(res[0]["description"], undefined) + self.assertTrue(not "description" in res[0]) self.assertEquals(res[0]["revision"], "1") # Delete (newly) split record - ldb.delete(dn) + self.ldb.delete(dn) def test_map_modify_split(self): """Testing modification of split records""" - s3 = self.samba3 - ldb = self.ldb - s4 = self.samba4 - # Add split record - dn = s4.dn("cn=test") - dn2 = s3.dn("cn=test") - ldb.add({ + dn = self.samba4.dn("cn=test") + dn2 = self.samba3.dn("cn=test") + self.ldb.add({ "dn": dn, "cn": "test", "description": "foo", @@ -960,7 +938,7 @@ description: test "revision": "1"}) # Check it's there attrs = ["description", "badPwdCount", "nextRid", "revision"] - res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "foo") @@ -968,22 +946,22 @@ description: test self.assertEquals(res[0]["nextRid"], "1001") self.assertEquals(res[0]["revision"], "1") # Check in local db - res = s4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) - self.assertEquals(res[0]["description"], undefined) - self.assertEquals(res[0]["badPwdCount"], undefined) - self.assertEquals(res[0]["nextRid"], undefined) + self.assertTrue(not "description" in res[0]) + self.assertTrue(not "badPwdCount" in res[0]) + self.assertTrue(not "nextRid" in res[0]) self.assertEquals(res[0]["revision"], "1") # Check in remote db attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"] - res = s3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) + res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "foo") self.assertEquals(res[0]["sambaBadPasswordCount"], "3") self.assertEquals(res[0]["sambaNextRid"], "1001") - self.assertEquals(res[0]["revision"], undefined) + self.assertTrue(not "revision" in res[0]) # Modify of split record ldif = """ @@ -995,10 +973,10 @@ badPwdCount: 4 replace: revision revision: 2 """ - ldb.modify_ldif(ldif) + self.ldb.modify_ldif(ldif) # Check in mapped db attrs = ["description", "badPwdCount", "nextRid", "revision"] - res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "test") @@ -1006,30 +984,30 @@ revision: 2 self.assertEquals(res[0]["nextRid"], "1001") self.assertEquals(res[0]["revision"], "2") # Check in local db - res = s4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) - self.assertEquals(res[0]["description"], undefined) - self.assertEquals(res[0]["badPwdCount"], undefined) - self.assertEquals(res[0]["nextRid"], undefined) + self.assertTrue(not "description" in res[0]) + self.assertTrue(not "badPwdCount" in res[0]) + self.assertTrue(not "nextRid" in res[0]) self.assertEquals(res[0]["revision"], "2") # Check in remote db attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"] - res = s3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) + res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") self.assertEquals(res[0]["sambaBadPasswordCount"], "4") self.assertEquals(res[0]["sambaNextRid"], "1001") - self.assertEquals(res[0]["revision"], undefined) + self.assertTrue(not "revision" in res[0]) # Rename split record - dn2 = s4.dn("cn=toast") - ldb.rename(dn, dn2) + dn2 = self.samba4.dn("cn=toast") + self.ldb.rename(dn, dn2) # Check in mapped db dn = dn2 attrs = ["description", "badPwdCount", "nextRid", "revision"] - res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "test") @@ -1037,32 +1015,32 @@ revision: 2 self.assertEquals(res[0]["nextRid"], "1001") self.assertEquals(res[0]["revision"], "2") # Check in local db - res = s4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) + res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) - self.assertEquals(res[0]["description"], undefined) - self.assertEquals(res[0]["badPwdCount"], undefined) - self.assertEquals(res[0]["nextRid"], undefined) + self.assertTrue(not "description" in res[0]) + self.assertTrue(not "badPwdCount" in res[0]) + self.assertTrue(not "nextRid" in res[0]) self.assertEquals(res[0]["revision"], "2") # Check in remote db - dn2 = s3.dn("cn=toast") + dn2 = self.samba3.dn("cn=toast") attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"] - res = s3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) + res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") self.assertEquals(res[0]["sambaBadPasswordCount"], "4") self.assertEquals(res[0]["sambaNextRid"], "1001") - self.assertEquals(res[0]["revision"], undefined) + self.assertTrue(not "revision" in res[0]) # Delete split record - ldb.delete(dn) + self.ldb.delete(dn) # Check in mapped db - res = ldb.search(dn, scope=SCOPE_BASE) + res = self.ldb.search(dn, scope=SCOPE_BASE) self.assertEquals(len(res), 0) # Check in local db - res = s4.db.search(dn, scope=SCOPE_BASE) + res = self.samba4.db.search(dn, scope=SCOPE_BASE) self.assertEquals(len(res), 0) # Check in remote db - res = s3.db.search(dn2, scope=SCOPE_BASE) + res = self.samba3.db.search(dn2, scope=SCOPE_BASE) self.assertEquals(len(res), 0) -- cgit From 5c6364ba0655316294833f192281d49a4de63b0c Mon Sep 17 00:00:00 2001 From: Andrew Bartlett Date: Mon, 18 Aug 2008 12:01:27 +1000 Subject: Remove references to the unused @SUBCLASS feature. This was removed from ldb_tdb a while ago Andrew Bartlett (This used to be commit fcb87e77860b449ac3483ccec5e6b5ed087540f2) --- source4/dsdb/samdb/ldb_modules/tests/samba3sam.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'source4/dsdb/samdb/ldb_modules/tests/samba3sam.py') diff --git a/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py b/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py index 7c408d0436..428e6b4d4b 100644 --- a/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py +++ b/source4/dsdb/samdb/ldb_modules/tests/samba3sam.py @@ -47,7 +47,7 @@ class MapBaseTestCase(TestCaseInTempDir): ldb.add({"dn": "@PARTITION", "partition": [s4.basedn + ":" + s4.url, s3.basedn + ":" + s3.url], - "replicateEntries": ["@SUBCLASSES", "@ATTRIBUTES", "@INDEXLIST"]}) + "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"]}) def setUp(self): super(MapBaseTestCase, self).setUp() -- cgit