#!/usr/bin/python # Unix SMB/CIFS implementation. # Copyright (C) Jelmer Vernooij 2005-2007 # Copyright (C) Martin Kuehl 2006 # # This is a Python port of the original in testprogs/ejs/samba3sam.js # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP.""" import os import sys import samba import ldb from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE from samba import Ldb, substitute_var from samba.tests import LdbTestCase, TestCaseInTempDir datadir = os.path.join(os.path.dirname(__file__), "../../../../../testdata/samba3") def ldb_debug(l, text): print text class MapBaseTestCase(TestCaseInTempDir): def setup_data(self, obj, ldif): self.assertTrue(ldif is not None) obj.db.add_ldif(substitute_var(ldif, obj.substvars)) def setup_modules(self, ldb, s3, s4): ldb.add({"dn": "@MAP=samba3sam", "@FROM": s4.basedn, "@TO": "sambaDomainName=TESTS," + s3.basedn}) ldb.add({"dn": "@MODULES", "@LIST": "rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"}) ldb.add({"dn": "@PARTITION", "partition": [s4.basedn + ":" + s4.url, s3.basedn + ":" + s3.url], "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"]}) def setUp(self): super(MapBaseTestCase, self).setUp() def make_dn(basedn, rdn): return rdn + ",sambaDomainName=TESTS," + basedn def make_s4dn(basedn, rdn): return rdn + "," + basedn self.ldbfile = os.path.join(self.tempdir, "test.ldb") self.ldburl = "tdb://" + self.ldbfile tempdir = self.tempdir class Target: """Simple helper class that contains data for a specific SAM connection.""" def __init__(self, file, basedn, dn): self.file = os.path.join(tempdir, file) self.url = "tdb://" + self.file self.basedn = basedn self.substvars = {"BASEDN": self.basedn} self.db = Ldb() self._dn = dn def dn(self, rdn): return self._dn(rdn, self.basedn) def connect(self): return self.db.connect(self.url) self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn) self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn) self.templates = Target("templates.ldb", "cn=templates", None) self.samba3.connect() self.templates.connect() self.samba4.connect() def tearDown(self): os.unlink(self.ldbfile) os.unlink(self.samba3.file) os.unlink(self.templates.file) os.unlink(self.samba4.file) super(MapBaseTestCase, self).tearDown() class Samba3SamTestCase(MapBaseTestCase): def setUp(self): super(Samba3SamTestCase, self).setUp() ldb = Ldb(self.ldburl) self.setup_data(self.samba3, open(os.path.join(datadir, "samba3.ldif"), 'r').read()) self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read()) ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read() ldb.add_ldif(substitute_var(ldif, self.samba4.substvars)) self.setup_modules(ldb, self.samba3, self.samba4) self.ldb = Ldb(self.ldburl) def test_search_non_mapped(self): """Looking up by non-mapped attribute""" msg = self.ldb.search(expression="(cn=Administrator)") self.assertEquals(len(msg), 1) self.assertEquals(msg[0]["cn"], "Administrator") def test_search_non_mapped(self): """Looking up by mapped attribute""" msg = self.ldb.search(expression="(name=Backup Operators)") self.assertEquals(len(msg), 1) self.assertEquals(msg[0]["name"], "Backup Operators") def test_old_name_of_renamed(self): """Looking up by old name of renamed attribute""" msg = self.ldb.search(expression="(displayName=Backup Operators)") self.assertEquals(len(msg), 0) def test_mapped_containing_sid(self): """Looking up mapped entry containing SID""" msg = self.ldb.search(expression="(cn=Replicator)") self.assertEquals(len(msg), 1) self.assertEquals(str(msg[0].dn), "cn=Replicator,ou=Groups,dc=vernstok,dc=nl") self.assertEquals(msg[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552") # Check mapping of objectClass oc = set(msg[0]["objectClass"]) self.assertTrue(oc is not None) for i in oc: self.assertEquals(oc[i] == "posixGroup" or oc[i], "group") def test_search_by_objclass(self): """Looking up by objectClass""" msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))") self.assertEquals(len(msg), 2) for i in range(len(msg)): self.assertTrue((str(msg[i].dn) == "unixName=Administrator,ou=Users,dc=vernstok,dc=nl") or (str(msg[i].dn) == "unixName=nobody,ou=Users,dc=vernstok,dc=nl")) def test_s3sam_modify(self): print "Adding a record that will be fallbacked" self.ldb.add({"dn": "cn=Foo", "foo": "bar", "blah": "Blie", "cn": "Foo", "showInAdvancedViewOnly": "TRUE"} ) print "Checking for existence of record (local)" # TODO: This record must be searched in the local database, which is currently only supported for base searches # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')] # TODO: Actually, this version should work as well but doesn't... # # msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo", scope=SCOPE_BASE, attrs=['foo','blah','cn','showInAdvancedViewOnly']) self.assertEquals(len(msg), 1) self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE") self.assertEquals(msg[0]["foo"], "bar") self.assertEquals(msg[0]["blah"], "Blie") # Adding record that will be mapped self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl", "objectClass": "user", "unixName": "bin", "sambaUnicodePwd": "geheim", "cn": "Niemand"}) # Checking for existence of record (remote) msg = self.ldb.search(expression="(unixName=bin)", attrs=['unixName','cn','dn', 'sambaUnicodePwd']) self.assertEquals(len(msg), 1) self.assertEquals(msg[0]["cn"], "Niemand") self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim") # Checking for existence of record (local && remote) msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", attrs=['unixName','cn','dn', 'sambaUnicodePwd']) self.assertEquals(len(msg), 1) # TODO: should check with more records self.assertEquals(msg[0]["cn"], "Niemand") self.assertEquals(msg[0]["unixName"], "bin") self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim") # Checking for existence of record (local || remote) msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", attrs=['unixName','cn','dn', 'sambaUnicodePwd']) #print "got %d replies" % len(msg) self.assertEquals(len(msg), 1) # TODO: should check with more records self.assertEquals(msg[0]["cn"], "Niemand") self.assertEquals(msg[0]["unixName"], "bin") self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim") # Checking for data in destination database msg = self.samba3.db.search(expression="(cn=Niemand)") self.assertTrue(len(msg) >= 1) self.assertEquals(msg[0]["sambaSID"], "S-1-5-21-4231626423-2410014848-2360679739-2001") self.assertEquals(msg[0]["displayName"], "Niemand") # Adding attribute... self.ldb.modify_ldif(""" dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl changetype: modify add: description description: Blah """) # Checking whether changes are still there... msg = self.ldb.search(expression="(cn=Niemand)") self.assertTrue(len(msg) >= 1) self.assertEquals(msg[0]["cn"], "Niemand") self.assertEquals(msg[0]["description"], "Blah") # Modifying attribute... self.ldb.modify_ldif(""" dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl changetype: modify replace: description description: Blie """) # Checking whether changes are still there... msg = self.ldb.search(expression="(cn=Niemand)") self.assertTrue(len(msg) >= 1) self.assertEquals(msg[0]["description"], "Blie") # Deleting attribute... self.ldb.modify_ldif(""" dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl changetype: modify delete: description """) # Checking whether changes are no longer there... msg = self.ldb.search(expression="(cn=Niemand)") self.assertTrue(len(msg) >= 1) self.assertTrue(not "description" in msg[0]) # Renaming record... self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", "cn=Niemand2,cn=Users,dc=vernstok,dc=nl") # Checking whether DN has changed... msg = self.ldb.search(expression="(cn=Niemand2)") self.assertEquals(len(msg), 1) self.assertEquals(str(msg[0].dn), "cn=Niemand2,cn=Users,dc=vernstok,dc=nl") # Deleting record... self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl") # Checking whether record is gone... msg = self.ldb.search(expression="(cn=Niemand2)") self.assertEquals(len(msg), 0) class MapTestCase(MapBaseTestCase): def setUp(self): super(MapTestCase, self).setUp() ldb = Ldb(self.ldburl) self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read()) ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read() ldb.add_ldif(substitute_var(ldif, self.samba4.substvars)) self.setup_modules(ldb, self.samba3, self.samba4) self.ldb = Ldb(self.ldburl) def test_map_search(self): """Running search tests on mapped data.""" ldif = """ dn: """ + "sambaDomainName=TESTS,""" + self.samba3.basedn + """ objectclass: sambaDomain objectclass: top sambaSID: S-1-5-21-4231626423-2410014848-2360679739 sambaNextRid: 2000 sambaDomainName: TESTS""" self.samba3.db.add_ldif(substitute_var(ldif, self.samba3.substvars)) # Add a set of split records ldif = """ dn: """ + self.samba4.dn("cn=X") + """ objectClass: user cn: X codePage: x revision: x dnsHostName: x nextRid: y lastLogon: x description: x objectSid: S-1-5-21-4231626423-2410014848-2360679739-552 primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512 """ self.ldb.add_ldif(substitute_var(ldif, self.samba4.substvars)) ldif = """ dn: """ + self.samba4.dn("cn=Y") + """ objectClass: top cn: Y codePage: x revision: x dnsHostName: y nextRid: y lastLogon: y description: x """ self.ldb.add_ldif(substitute_var(ldif, self.samba4.substvars)) ldif = """ dn: """ + self.samba4.dn("cn=Z") + """ objectClass: top cn: Z codePage: x revision: y dnsHostName: z nextRid: y lastLogon: z description: y """ self.ldb.add_ldif(substitute_var(ldif, self.samba4.substvars)) # Add a set of remote records ldif = """ dn: """ + self.samba3.dn("cn=A") + """ objectClass: posixAccount cn: A sambaNextRid: x sambaBadPasswordCount: x sambaLogonTime: x description: x sambaSID: S-1-5-21-4231626423-2410014848-2360679739-552 sambaPrimaryGroupSID: S-1-5-21-4231626423-2410014848-2360679739-512 dn: """ + self.samba3.dn("cn=B") + """ objectClass: top cn:B sambaNextRid: x sambaBadPasswordCount: x sambaLogonTime: y description: x dn: """ + self.samba3.dn("cn=C") + """ objectClass: top cn: C sambaNextRid: x sambaBadPasswordCount: y sambaLogonTime: z description: y """ self.samba3.add_ldif(substitute_var(ldif, self.samba3.substvars)) # Testing search by DN # Search remote record by local DN dn = self.samba4.dn("cn=A") attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(str(res[0].dn)), dn) self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "x") # Search remote record by remote DN dn = self.samba3.dn("cn=A") attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"] res = self.samba3.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(str(res[0].dn)), dn) self.assertTrue(not "dnsHostName" in res[0]) self.assertTrue(not "lastLogon" in res[0]) self.assertEquals(res[0]["sambaLogonTime"], "x") # Search split record by local DN dn = self.samba4.dn("cn=X") attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(str(res[0].dn)), dn) self.assertEquals(res[0]["dnsHostName"], "x") self.assertEquals(res[0]["lastLogon"], "x") # Search split record by remote DN dn = self.samba3.dn("cn=X") attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"] res = self.samba3.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(str(res[0].dn)), dn) self.assertTrue(not "dnsHostName" in res[0]) self.assertTrue(not "lastLogon" in res[0]) self.assertEquals(res[0]["sambaLogonTime"], "x") # Testing search by attribute # Search by ignored attribute attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, attrs=attrs) self.assertEquals(len(res), 2) self.assertEquals(str(str(res[0].dn)), self.samba4.dn("cn=Y")) self.assertEquals(res[0]["dnsHostName"], "y") self.assertEquals(res[0]["lastLogon"], "y") self.assertEquals(str(str(res[1].dn)), self.samba4.dn("cn=X")) self.assertEquals(res[1]["dnsHostName"], "x") self.assertEquals(res[1]["lastLogon"], "x") # Search by kept attribute attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(description=y)", scope=SCOPE_DEFAULT, attrs=attrs) self.assertEquals(len(res), 2) self.assertEquals(str(str(res[0].dn)), self.samba4.dn("cn=Z")) self.assertEquals(res[0]["dnsHostName"], "z") self.assertEquals(res[0]["lastLogon"], "z") self.assertEquals(str(str(res[1].dn)), self.samba4.dn("cn=C")) self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "z") # Search by renamed attribute attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT, attrs=attrs) self.assertEquals(len(res), 2) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "y") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "x") # Search by converted attribute attrs = ["dnsHostName", "lastLogon", "objectSid"] # TODO: # Using the SID directly in the parse tree leads to conversion # errors, letting the search fail with no results. #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs) res = self.ldb.search(expression="(objectSid=*)", attrs=attrs) self.assertEquals(len(res), 3) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X")) self.assertEquals(res[0]["dnsHostName"], "x") self.assertEquals(res[0]["lastLogon"], "x") self.assertEquals(res[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "x") self.assertEquals(res[1]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552") # Search by generated attribute # In most cases, this even works when the mapping is missing # a `convert_operator' by enumerating the remote db. attrs = ["dnsHostName", "lastLogon", "primaryGroupID"] res = self.ldb.search(expression="(primaryGroupID=512)", attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "x") self.assertEquals(res[0]["primaryGroupID"], "512") # TODO: There should actually be two results, A and X. The # primaryGroupID of X seems to get corrupted somewhere, and the # objectSid isn't available during the generation of remote (!) data, # which can be observed with the following search. Also note that Xs # objectSid seems to be fine in the previous search for objectSid... */ #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs) #print len(res) + " results found" #for i in range(len(res)): # for (obj in res[i]) { # print obj + ": " + res[i][obj] # } # print "---" # # Search by remote name of renamed attribute */ attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(sambaBadPasswordCount=*)", attrs=attrs) self.assertEquals(len(res), 0) # Search by objectClass attrs = ["dnsHostName", "lastLogon", "objectClass"] res = self.ldb.search(expression="(objectClass=user)", attrs=attrs) self.assertEquals(len(res), 2) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X")) self.assertEquals(res[0]["dnsHostName"], "x") self.assertEquals(res[0]["lastLogon"], "x") self.assertTrue(res[0]["objectClass"] is not None) self.assertEquals(res[0]["objectClass"][0], "user") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "x") self.assertTrue(res[1]["objectClass"] is not None) self.assertEquals(res[1]["objectClass"][0], "user") # Prove that the objectClass is actually used for the search res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))", attrs=attrs) self.assertEquals(len(res), 3) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "y") self.assertTrue(res[0]["objectClass"] is not None) for oc in set(res[0]["objectClass"]): self.assertEquals(oc, "user") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) self.assertEquals(res[1]["dnsHostName"], "x") self.assertEquals(res[1]["lastLogon"], "x") self.assertTrue(res[1]["objectClass"] is not None) self.assertEquals(res[1]["objectClass"][0], "user") self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A")) self.assertTrue(not "dnsHostName" in res[2]) self.assertEquals(res[2]["lastLogon"], "x") self.assertTrue(res[2]["objectClass"] is not None) self.assertEquals(res[2]["objectClass"][0], "user") print "Testing search by parse tree" # Search by conjunction of local attributes attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(&(codePage=x)(revision=x))", attrs=attrs) self.assertEquals(len(res), 2) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) self.assertEquals(res[0]["dnsHostName"], "y") self.assertEquals(res[0]["lastLogon"], "y") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) self.assertEquals(res[1]["dnsHostName"], "x") self.assertEquals(res[1]["lastLogon"], "x") # Search by conjunction of remote attributes attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(&(lastLogon=x)(description=x))", attrs=attrs) self.assertEquals(len(res), 2) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X")) self.assertEquals(res[0]["dnsHostName"], "x") self.assertEquals(res[0]["lastLogon"], "x") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "x") # Search by conjunction of local and remote attribute attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(&(codePage=x)(description=x))", attrs=attrs) self.assertEquals(len(res), 2) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) self.assertEquals(res[0]["dnsHostName"], "y") self.assertEquals(res[0]["lastLogon"], "y") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) self.assertEquals(res[1]["dnsHostName"], "x") self.assertEquals(res[1]["lastLogon"], "x") # Search by conjunction of local and remote attribute w/o match attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))", attrs=attrs) self.assertEquals(len(res), 0) res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))", attrs=attrs) self.assertEquals(len(res), 0) # Search by disjunction of local attributes attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))", attrs=attrs) self.assertEquals(len(res), 2) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) self.assertEquals(res[0]["dnsHostName"], "y") self.assertEquals(res[0]["lastLogon"], "y") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) self.assertEquals(res[1]["dnsHostName"], "x") self.assertEquals(res[1]["lastLogon"], "x") # Search by disjunction of remote attributes attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", attrs=attrs) self.assertEquals(len(res), 3) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) self.assertTrue("dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "y") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) self.assertEquals(res[1]["dnsHostName"], "x") self.assertEquals(res[1]["lastLogon"], "x") self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A")) self.assertTrue("dnsHostName" in res[2]) self.assertEquals(res[2]["lastLogon"], "x") # Search by disjunction of local and remote attribute attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))", attrs=attrs) self.assertEquals(len(res), 3) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) self.assertEquals(res[0]["dnsHostName"], "y") self.assertEquals(res[0]["lastLogon"], "y") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B")) self.assertTrue("dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "y") self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X")) self.assertEquals(res[2]["dnsHostName"], "x") self.assertEquals(res[2]["lastLogon"], "x") # Search by disjunction of local and remote attribute w/o match attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))", attrs=attrs) self.assertEquals(len(res), 0) # Search by negated local attribute attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(!(revision=x))", attrs=attrs) self.assertEquals(len(res), 5) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "y") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "x") self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[2]["dnsHostName"], "z") self.assertEquals(res[2]["lastLogon"], "z") self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C")) self.assertTrue(not "dnsHostName" in res[3]) self.assertEquals(res[3]["lastLogon"], "z") # Search by negated remote attribute attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(!(description=x))", attrs=attrs) self.assertEquals(len(res), 3) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[0]["dnsHostName"], "z") self.assertEquals(res[0]["lastLogon"], "z") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C")) self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "z") # Search by negated conjunction of local attributes attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))", attrs=attrs) self.assertEquals(len(res), 5) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "y") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "x") self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[2]["dnsHostName"], "z") self.assertEquals(res[2]["lastLogon"], "z") self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C")) self.assertTrue(not "dnsHostName" in res[3]) self.assertEquals(res[3]["lastLogon"], "z") # Search by negated conjunction of remote attributes attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))", attrs=attrs) self.assertEquals(len(res), 5) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) self.assertEquals(res[0]["dnsHostName"], "y") self.assertEquals(res[0]["lastLogon"], "y") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B")) self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "y") self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[2]["dnsHostName"], "z") self.assertEquals(res[2]["lastLogon"], "z") self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C")) self.assertTrue(not "dnsHostName" in res[3]) self.assertEquals(res[3]["lastLogon"], "z") # Search by negated conjunction of local and remote attribute attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))", attrs=attrs) self.assertEquals(len(res), 5) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "y") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "x") self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[2]["dnsHostName"], "z") self.assertEquals(res[2]["lastLogon"], "z") self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C")) self.assertTrue(not "dnsHostName" in res[3]) self.assertEquals(res[3]["lastLogon"], "z") # Search by negated disjunction of local attributes attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", attrs=attrs) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "y") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) self.assertTrue(not "dnsHostName" in res[1]) self.assertEquals(res[1]["lastLogon"], "x") self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[2]["dnsHostName"], "z") self.assertEquals(res[2]["lastLogon"], "z") self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C")) self.assertTrue(not "dnsHostName" in res[3]) self.assertEquals(res[3]["lastLogon"], "z") # Search by negated disjunction of remote attributes attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", attrs=attrs) self.assertEquals(len(res), 4) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) self.assertEquals(res[0]["dnsHostName"], "y") self.assertEquals(res[0]["lastLogon"], "y") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[1]["dnsHostName"], "z") self.assertEquals(res[1]["lastLogon"], "z") self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C")) self.assertTrue(not "dnsHostName" in res[2]) self.assertEquals(res[2]["lastLogon"], "z") # Search by negated disjunction of local and remote attribute attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", attrs=attrs) self.assertEquals(len(res), 4) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "x") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[1]["dnsHostName"], "z") self.assertEquals(res[1]["lastLogon"], "z") self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C")) self.assertTrue(not "dnsHostName" in res[2]) self.assertEquals(res[2]["lastLogon"], "z") print "Search by complex parse tree" attrs = ["dnsHostName", "lastLogon"] res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=attrs) self.assertEquals(len(res), 6) self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) self.assertTrue(not "dnsHostName" in res[0]) self.assertEquals(res[0]["lastLogon"], "y") self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) self.assertEquals(res[1]["dnsHostName"], "x") self.assertEquals(res[1]["lastLogon"], "x") self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A")) self.assertTrue(not "dnsHostName" in res[2]) self.assertEquals(res[2]["lastLogon"], "x") self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z")) self.assertEquals(res[3]["dnsHostName"], "z") self.assertEquals(res[3]["lastLogon"], "z") self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C")) self.assertTrue(not "dnsHostName" in res[4]) self.assertEquals(res[4]["lastLogon"], "z") # Clean up dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]] for dn in dns: self.ldb.delete(dn) def test_map_modify_local(self): """Modification of local records.""" # Add local record dn = "cn=test,dc=idealx,dc=org" self.ldb.add({"dn": dn, "cn": "test", "foo": "bar", "revision": "1", "description": "test"}) # Check it's there attrs = ["foo", "revision", "description"] res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["foo"], "bar") self.assertEquals(res[0]["revision"], "1") self.assertEquals(res[0]["description"], "test") # Check it's not in the local db res = self.samba4.db.search(expression="(cn=test)", scope=SCOPE_DEFAULT, attrs=attrs) self.assertEquals(len(res), 0) # Check it's not in the remote db res = self.samba3.db.search(expression="(cn=test)", scope=SCOPE_DEFAULT, attrs=attrs) self.assertEquals(len(res), 0) # Modify local record ldif = """ dn: """ + dn + """ replace: foo foo: baz replace: description description: foo """ self.ldb.modify_ldif(ldif) # Check in local db res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["foo"], "baz") self.assertEquals(res[0]["revision"], "1") self.assertEquals(res[0]["description"], "foo") # Rename local record dn2 = "cn=toast,dc=idealx,dc=org" self.ldb.rename(dn, dn2) # Check in local db res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["foo"], "baz") self.assertEquals(res[0]["revision"], "1") self.assertEquals(res[0]["description"], "foo") # Delete local record self.ldb.delete(dn2) # Check it's gone res = self.ldb.search(dn2, scope=SCOPE_BASE) self.assertEquals(len(res), 0) def test_map_modify_remote_remote(self): """Modification of remote data of remote records""" # Add remote record dn = self.samba4.dn("cn=test") dn2 = self.samba3.dn("cn=test") self.samba3.db.add({"dn": dn2, "cn": "test", "description": "foo", "sambaBadPasswordCount": "3", "sambaNextRid": "1001"}) # Check it's there attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"] res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "foo") self.assertEquals(res[0]["sambaBadPasswordCount"], "3") self.assertEquals(res[0]["sambaNextRid"], "1001") # Check in mapped db attrs = ["description", "badPwdCount", "nextRid"] res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "foo") self.assertEquals(res[0]["badPwdCount"], "3") self.assertEquals(res[0]["nextRid"], "1001") # Check in local db res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 0) # Modify remote data of remote record ldif = """ dn: """ + dn + """ replace: description description: test replace: badPwdCount badPwdCount: 4 """ self.ldb.modify_ldif(ldif) # Check in mapped db attrs = ["description", "badPwdCount", "nextRid"] res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "test") self.assertEquals(res[0]["badPwdCount"], "4") self.assertEquals(res[0]["nextRid"], "1001") # Check in remote db attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"] res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") self.assertEquals(res[0]["sambaBadPasswordCount"], "4") self.assertEquals(res[0]["sambaNextRid"], "1001") # Rename remote record dn2 = self.samba4.dn("cn=toast") self.ldb.rename(dn, dn2) # Check in mapped db dn = dn2 attrs = ["description", "badPwdCount", "nextRid"] res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "test") self.assertEquals(res[0]["badPwdCount"], "4") self.assertEquals(res[0]["nextRid"], "1001") # Check in remote db dn2 = self.samba3.dn("cn=toast") attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"] res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") self.assertEquals(res[0]["sambaBadPasswordCount"], "4") self.assertEquals(res[0]["sambaNextRid"], "1001") # Delete remote record self.ldb.delete(dn) # Check in mapped db that it's removed res = self.ldb.search(dn, scope=SCOPE_BASE) self.assertEquals(len(res), 0) # Check in remote db res = self.samba3.db.search(dn2, scope=SCOPE_BASE) self.assertEquals(len(res), 0) def test_map_modify_remote_local(self): """Modification of local data of remote records""" # Add remote record (same as before) dn = self.samba4.dn("cn=test") dn2 = self.samba3.dn("cn=test") self.samba3.db.add({"dn": dn2, "cn": "test", "description": "foo", "sambaBadPasswordCount": "3", "sambaNextRid": "1001"}) # Modify local data of remote record ldif = """ dn: """ + dn + """ add: revision revision: 1 replace: description description: test """ self.ldb.modify_ldif(ldif) # Check in mapped db attrs = ["revision", "description"] res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "test") self.assertEquals(res[0]["revision"], "1") # Check in remote db res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") self.assertTrue(not "revision" in res[0]) # Check in local db res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertTrue(not "description" in res[0]) self.assertEquals(res[0]["revision"], "1") # Delete (newly) split record self.ldb.delete(dn) def test_map_modify_split(self): """Testing modification of split records""" # Add split record dn = self.samba4.dn("cn=test") dn2 = self.samba3.dn("cn=test") self.ldb.add({ "dn": dn, "cn": "test", "description": "foo", "badPwdCount": "3", "nextRid": "1001", "revision": "1"}) # Check it's there attrs = ["description", "badPwdCount", "nextRid", "revision"] res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "foo") self.assertEquals(res[0]["badPwdCount"], "3") self.assertEquals(res[0]["nextRid"], "1001") self.assertEquals(res[0]["revision"], "1") # Check in local db res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertTrue(not "description" in res[0]) self.assertTrue(not "badPwdCount" in res[0]) self.assertTrue(not "nextRid" in res[0]) self.assertEquals(res[0]["revision"], "1") # Check in remote db attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"] res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "foo") self.assertEquals(res[0]["sambaBadPasswordCount"], "3") self.assertEquals(res[0]["sambaNextRid"], "1001") self.assertTrue(not "revision" in res[0]) # Modify of split record ldif = """ dn: """ + dn + """ replace: description description: test replace: badPwdCount badPwdCount: 4 replace: revision revision: 2 """ self.ldb.modify_ldif(ldif) # Check in mapped db attrs = ["description", "badPwdCount", "nextRid", "revision"] res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "test") self.assertEquals(res[0]["badPwdCount"], "4") self.assertEquals(res[0]["nextRid"], "1001") self.assertEquals(res[0]["revision"], "2") # Check in local db res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertTrue(not "description" in res[0]) self.assertTrue(not "badPwdCount" in res[0]) self.assertTrue(not "nextRid" in res[0]) self.assertEquals(res[0]["revision"], "2") # Check in remote db attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"] res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") self.assertEquals(res[0]["sambaBadPasswordCount"], "4") self.assertEquals(res[0]["sambaNextRid"], "1001") self.assertTrue(not "revision" in res[0]) # Rename split record dn2 = self.samba4.dn("cn=toast") self.ldb.rename(dn, dn2) # Check in mapped db dn = dn2 attrs = ["description", "badPwdCount", "nextRid", "revision"] res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertEquals(res[0]["description"], "test") self.assertEquals(res[0]["badPwdCount"], "4") self.assertEquals(res[0]["nextRid"], "1001") self.assertEquals(res[0]["revision"], "2") # Check in local db res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn) self.assertTrue(not "description" in res[0]) self.assertTrue(not "badPwdCount" in res[0]) self.assertTrue(not "nextRid" in res[0]) self.assertEquals(res[0]["revision"], "2") # Check in remote db dn2 = self.samba3.dn("cn=toast") attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"] res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) self.assertEquals(len(res), 1) self.assertEquals(str(res[0].dn), dn2) self.assertEquals(res[0]["description"], "test") self.assertEquals(res[0]["sambaBadPasswordCount"], "4") self.assertEquals(res[0]["sambaNextRid"], "1001") self.assertTrue(not "revision" in res[0]) # Delete split record self.ldb.delete(dn) # Check in mapped db res = self.ldb.search(dn, scope=SCOPE_BASE) self.assertEquals(len(res), 0) # Check in local db res = self.samba4.db.search(dn, scope=SCOPE_BASE) self.assertEquals(len(res), 0) # Check in remote db res = self.samba3.db.search(dn2, scope=SCOPE_BASE) self.assertEquals(len(res), 0)