summaryrefslogtreecommitdiff
path: root/source4/lib/ldb/tests/python/ldap_schema.py
diff options
context:
space:
mode:
authorNadezhda Ivanova <nadezhda.ivanova@postpath.com>2010-01-04 11:24:10 +0200
committerNadezhda Ivanova <nadezhda.ivanova@postpath.com>2010-01-04 11:24:10 +0200
commitfb5383c69ee52fb5e6d066a43451dc8c806cc795 (patch)
tree45b72e03f68ab6d212755c524f8e8a60a3b4373a /source4/lib/ldb/tests/python/ldap_schema.py
parent60d8ab3b7b0bd2c9b633f0380d1fdf5bcf5e2621 (diff)
parenta06e5cdb99ddf7abf16486d3837105ec4e0da9ee (diff)
downloadsamba-fb5383c69ee52fb5e6d066a43451dc8c806cc795.tar.gz
samba-fb5383c69ee52fb5e6d066a43451dc8c806cc795.tar.bz2
samba-fb5383c69ee52fb5e6d066a43451dc8c806cc795.zip
Merge branch 'master' of git://git.samba.org/samba
Diffstat (limited to 'source4/lib/ldb/tests/python/ldap_schema.py')
-rwxr-xr-xsource4/lib/ldb/tests/python/ldap_schema.py500
1 files changed, 500 insertions, 0 deletions
diff --git a/source4/lib/ldb/tests/python/ldap_schema.py b/source4/lib/ldb/tests/python/ldap_schema.py
new file mode 100755
index 0000000000..0a31db82f7
--- /dev/null
+++ b/source4/lib/ldb/tests/python/ldap_schema.py
@@ -0,0 +1,500 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+# This is a port of the original in testprogs/ejs/ldap.js
+
+import getopt
+import optparse
+import sys
+import time
+import random
+import base64
+import os
+
+sys.path.append("bin/python")
+sys.path.append("../lib/subunit/python")
+
+import samba.getopt as options
+
+from samba.auth import system_session
+from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE, LdbError
+from ldb import ERR_NO_SUCH_OBJECT, ERR_ATTRIBUTE_OR_VALUE_EXISTS
+from ldb import ERR_ENTRY_ALREADY_EXISTS, ERR_UNWILLING_TO_PERFORM
+from ldb import ERR_NOT_ALLOWED_ON_NON_LEAF, ERR_OTHER, ERR_INVALID_DN_SYNTAX
+from ldb import ERR_NO_SUCH_ATTRIBUTE, ERR_INSUFFICIENT_ACCESS_RIGHTS
+from ldb import ERR_OBJECT_CLASS_VIOLATION, ERR_NOT_ALLOWED_ON_RDN
+from ldb import ERR_NAMING_VIOLATION, ERR_CONSTRAINT_VIOLATION
+from ldb import ERR_UNDEFINED_ATTRIBUTE_TYPE
+from ldb import Message, MessageElement, Dn
+from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE
+from samba import Ldb
+from samba import UF_NORMAL_ACCOUNT, UF_TEMP_DUPLICATE_ACCOUNT
+from samba import UF_SERVER_TRUST_ACCOUNT, UF_WORKSTATION_TRUST_ACCOUNT
+from samba import UF_INTERDOMAIN_TRUST_ACCOUNT
+from samba import UF_PASSWD_NOTREQD, UF_ACCOUNTDISABLE
+from samba import GTYPE_SECURITY_BUILTIN_LOCAL_GROUP
+from samba import GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_DOMAIN_LOCAL_GROUP
+from samba import GTYPE_SECURITY_UNIVERSAL_GROUP
+from samba import GTYPE_DISTRIBUTION_GLOBAL_GROUP
+from samba import GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP
+from samba import GTYPE_DISTRIBUTION_UNIVERSAL_GROUP
+from samba import ATYPE_NORMAL_ACCOUNT, ATYPE_WORKSTATION_TRUST
+from samba import ATYPE_SECURITY_GLOBAL_GROUP, ATYPE_SECURITY_LOCAL_GROUP
+from samba import ATYPE_SECURITY_UNIVERSAL_GROUP
+from samba import ATYPE_DISTRIBUTION_GLOBAL_GROUP
+from samba import ATYPE_DISTRIBUTION_LOCAL_GROUP
+from samba import ATYPE_DISTRIBUTION_UNIVERSAL_GROUP
+from samba import DS_DC_FUNCTION_2003
+
+from subunit import SubunitTestRunner
+import unittest
+
+from samba.ndr import ndr_pack, ndr_unpack
+from samba.dcerpc import security
+
+parser = optparse.OptionParser("ldap [options] <host>")
+sambaopts = options.SambaOptions(parser)
+parser.add_option_group(sambaopts)
+parser.add_option_group(options.VersionOptions(parser))
+# use command line creds if available
+credopts = options.CredentialsOptions(parser)
+parser.add_option_group(credopts)
+opts, args = parser.parse_args()
+
+if len(args) < 1:
+ parser.print_usage()
+ sys.exit(1)
+
+host = args[0]
+
+lp = sambaopts.get_loadparm()
+creds = credopts.get_credentials(lp)
+
+
+class SchemaTests(unittest.TestCase):
+ def delete_force(self, ldb, dn):
+ try:
+ ldb.delete(dn)
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_NO_SUCH_OBJECT)
+
+ def find_schemadn(self, ldb):
+ res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["schemaNamingContext"])
+ self.assertEquals(len(res), 1)
+ return res[0]["schemaNamingContext"][0]
+
+ def find_basedn(self, ldb):
+ res = ldb.search(base="", expression="", scope=SCOPE_BASE,
+ attrs=["defaultNamingContext"])
+ self.assertEquals(len(res), 1)
+ return res[0]["defaultNamingContext"][0]
+
+ def setUp(self):
+ self.ldb = ldb
+ self.schema_dn = self.find_schemadn(ldb)
+ self.base_dn = self.find_basedn(ldb)
+
+ def test_generated_schema(self):
+ """Testing we can read the generated schema via LDAP"""
+ res = self.ldb.search("cn=aggregate,"+self.schema_dn, scope=SCOPE_BASE,
+ attrs=["objectClasses", "attributeTypes", "dITContentRules"])
+ self.assertEquals(len(res), 1)
+ self.assertTrue("dITContentRules" in res[0])
+ self.assertTrue("objectClasses" in res[0])
+ self.assertTrue("attributeTypes" in res[0])
+
+ def test_generated_schema_is_operational(self):
+ """Testing we don't get the generated schema via LDAP by default"""
+ res = self.ldb.search("cn=aggregate,"+self.schema_dn, scope=SCOPE_BASE,
+ attrs=["*"])
+ self.assertEquals(len(res), 1)
+ self.assertFalse("dITContentRules" in res[0])
+ self.assertFalse("objectClasses" in res[0])
+ self.assertFalse("attributeTypes" in res[0])
+
+ def test_schemaUpdateNow(self):
+ """Testing schemaUpdateNow"""
+ attr_name = "test-Attr" + time.strftime("%s", time.gmtime())
+ attr_ldap_display_name = attr_name.replace("-", "")
+
+ ldif = """
+dn: CN=%s,%s""" % (attr_name, self.schema_dn) + """
+objectClass: top
+objectClass: attributeSchema
+adminDescription: """ + attr_name + """
+adminDisplayName: """ + attr_name + """
+cn: """ + attr_name + """
+attributeId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9940
+attributeSyntax: 2.5.5.12
+omSyntax: 64
+instanceType: 4
+isSingleValued: TRUE
+systemOnly: FALSE
+"""
+ self.ldb.add_ldif(ldif)
+
+ class_name = "test-Class" + time.strftime("%s", time.gmtime())
+ class_ldap_display_name = class_name.replace("-", "")
+
+ ldif = """
+dn: CN=%s,%s""" % (class_name, self.schema_dn) + """
+objectClass: top
+objectClass: classSchema
+adminDescription: """ + class_name + """
+adminDisplayName: """ + class_name + """
+cn: """ + class_name + """
+governsId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9939
+instanceType: 4
+objectClassCategory: 1
+subClassOf: organizationalPerson
+systemFlags: 16
+rDNAttID: cn
+systemMustContain: cn
+systemMustContain: """ + attr_ldap_display_name + """
+systemOnly: FALSE
+"""
+ self.ldb.add_ldif(ldif)
+
+ ldif = """
+dn:
+changetype: modify
+add: schemaUpdateNow
+schemaUpdateNow: 1
+"""
+ self.ldb.modify_ldif(ldif)
+
+ object_name = "obj" + time.strftime("%s", time.gmtime())
+
+ ldif = """
+dn: CN=%s,CN=Users,%s"""% (object_name, self.base_dn) + """
+objectClass: organizationalPerson
+objectClass: person
+objectClass: """ + class_ldap_display_name + """
+objectClass: top
+cn: """ + object_name + """
+instanceType: 4
+objectCategory: CN=%s,%s"""% (class_name, self.schema_dn) + """
+distinguishedName: CN=%s,CN=Users,%s"""% (object_name, self.base_dn) + """
+name: """ + object_name + """
+""" + attr_ldap_display_name + """: test
+"""
+ self.ldb.add_ldif(ldif)
+
+ # Search for created attribute
+ res = []
+ res = self.ldb.search("cn=%s,%s" % (attr_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"])
+ self.assertEquals(len(res), 1)
+ self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_display_name)
+ self.assertTrue("schemaIDGUID" in res[0])
+
+ # Search for created objectclass
+ res = []
+ res = self.ldb.search("cn=%s,%s" % (class_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"])
+ self.assertEquals(len(res), 1)
+ self.assertEquals(res[0]["lDAPDisplayName"][0], class_ldap_display_name)
+ self.assertEquals(res[0]["defaultObjectCategory"][0], res[0]["distinguishedName"][0])
+ self.assertTrue("schemaIDGUID" in res[0])
+
+ # Search for created object
+ res = []
+ res = self.ldb.search("cn=%s,cn=Users,%s" % (object_name, self.base_dn), scope=SCOPE_BASE, attrs=["*"])
+ self.assertEquals(len(res), 1)
+ # Delete the object
+ self.delete_force(self.ldb, "cn=%s,cn=Users,%s" % (object_name, self.base_dn))
+
+
+class SchemaTests_msDS_IntId(unittest.TestCase):
+
+ def setUp(self):
+ self.ldb = ldb
+ res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["*"])
+ self.assertEquals(len(res), 1)
+ self.schema_dn = res[0]["schemaNamingContext"][0]
+ self.base_dn = res[0]["defaultNamingContext"][0]
+ self.forest_level = int(res[0]["forestFunctionality"][0])
+
+ def _ldap_schemaUpdateNow(self):
+ ldif = """
+dn:
+changetype: modify
+add: schemaUpdateNow
+schemaUpdateNow: 1
+"""
+ self.ldb.modify_ldif(ldif)
+
+ def _make_obj_names(self, prefix):
+ class_name = prefix + time.strftime("%s", time.gmtime())
+ class_ldap_name = class_name.replace("-", "")
+ class_dn = "CN=%s,%s" % (class_name, self.schema_dn)
+ return (class_name, class_ldap_name, class_dn)
+
+ def _is_schema_base_object(self, ldb_msg):
+ """Test systemFlags for SYSTEM_FLAG_SCHEMA_BASE_OBJECT (16)"""
+ systemFlags = 0
+ if "systemFlags" in ldb_msg:
+ systemFlags = int(ldb_msg["systemFlags"][0])
+ return (systemFlags & 16) != 0
+
+ def _make_attr_ldif(self, attr_name, attr_dn):
+ ldif = """
+dn: """ + attr_dn + """
+objectClass: top
+objectClass: attributeSchema
+adminDescription: """ + attr_name + """
+adminDisplayName: """ + attr_name + """
+cn: """ + attr_name + """
+attributeId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9940
+attributeSyntax: 2.5.5.12
+omSyntax: 64
+instanceType: 4
+isSingleValued: TRUE
+systemOnly: FALSE
+"""
+ return ldif
+
+ def test_msDS_IntId_on_attr(self):
+ """Testing msDs-IntId creation for Attributes.
+ See MS-ADTS - 3.1.1.Attributes
+
+ This test should verify that:
+ - Creating attribute with 'msDS-IntId' fails with ERR_UNWILLING_TO_PERFORM
+ - Adding 'msDS-IntId' on existing attribute fails with ERR_CONSTRAINT_VIOLATION
+ - Creating attribute with 'msDS-IntId' set and FLAG_SCHEMA_BASE_OBJECT flag
+ set fails with ERR_UNWILLING_TO_PERFORM
+ - Attributes created with FLAG_SCHEMA_BASE_OBJECT not set have
+ 'msDS-IntId' attribute added internally
+ """
+
+ # 1. Create attribute without systemFlags
+ # msDS-IntId should be created if forest functional
+ # level is >= DS_DC_FUNCTION_2003
+ # and missing otherwise
+ (attr_name, attr_ldap_name, attr_dn) = self._make_obj_names("msDS-IntId-Attr-1-")
+ ldif = self._make_attr_ldif(attr_name, attr_dn)
+
+ # try to add msDS-IntId during Attribute creation
+ ldif_fail = ldif + "msDS-IntId: -1993108831\n"
+ try:
+ self.ldb.add_ldif(ldif_fail)
+ self.fail("Adding attribute with preset msDS-IntId should fail")
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
+
+ # add the new attribute and update schema
+ self.ldb.add_ldif(ldif)
+ self._ldap_schemaUpdateNow()
+
+ # Search for created attribute
+ res = []
+ res = self.ldb.search(attr_dn, scope=SCOPE_BASE, attrs=["*"])
+ self.assertEquals(len(res), 1)
+ self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_name)
+ if self.forest_level >= DS_DC_FUNCTION_2003:
+ if self._is_schema_base_object(res[0]):
+ self.assertTrue("msDS-IntId" not in res[0])
+ else:
+ self.assertTrue("msDS-IntId" in res[0])
+ else:
+ self.assertTrue("msDS-IntId" not in res[0])
+
+ msg = Message()
+ msg.dn = Dn(self.ldb, attr_dn)
+ msg["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE, "msDS-IntId")
+ try:
+ self.ldb.modify(msg)
+ self.fail("Modifying msDS-IntId should return error")
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+
+ # 2. Create attribute with systemFlags = FLAG_SCHEMA_BASE_OBJECT
+ # msDS-IntId should be created if forest functional
+ # level is >= DS_DC_FUNCTION_2003
+ # and missing otherwise
+ (attr_name, attr_ldap_name, attr_dn) = self._make_obj_names("msDS-IntId-Attr-2-")
+ ldif = self._make_attr_ldif(attr_name, attr_dn)
+ ldif += "systemFlags: 16\n"
+
+ # try to add msDS-IntId during Attribute creation
+ ldif_fail = ldif + "msDS-IntId: -1993108831\n"
+ try:
+ self.ldb.add_ldif(ldif_fail)
+ self.fail("Adding attribute with preset msDS-IntId should fail")
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
+
+ # add the new attribute and update schema
+ self.ldb.add_ldif(ldif)
+ self._ldap_schemaUpdateNow()
+
+ # Search for created attribute
+ res = []
+ res = self.ldb.search(attr_dn, scope=SCOPE_BASE, attrs=["*"])
+ self.assertEquals(len(res), 1)
+ self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_name)
+ if self.forest_level >= DS_DC_FUNCTION_2003:
+ if self._is_schema_base_object(res[0]):
+ self.assertTrue("msDS-IntId" not in res[0])
+ else:
+ self.assertTrue("msDS-IntId" in res[0])
+ else:
+ self.assertTrue("msDS-IntId" not in res[0])
+
+ msg = Message()
+ msg.dn = Dn(self.ldb, attr_dn)
+ msg["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE, "msDS-IntId")
+ try:
+ self.ldb.modify(msg)
+ self.fail("Modifying msDS-IntId should return error")
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+
+
+ def _make_class_ldif(self, class_dn, class_name):
+ ldif = """
+dn: """ + class_dn + """
+objectClass: top
+objectClass: classSchema
+adminDescription: """ + class_name + """
+adminDisplayName: """ + class_name + """
+cn: """ + class_name + """
+governsId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9939
+instanceType: 4
+objectClassCategory: 1
+subClassOf: organizationalPerson
+rDNAttID: cn
+systemMustContain: cn
+systemOnly: FALSE
+"""
+ return ldif
+
+ def test_msDS_IntId_on_class(self):
+ """Testing msDs-IntId creation for Class
+ Reference: MS-ADTS - 3.1.1.2.4.8 Class classSchema"""
+
+ # 1. Create Class without systemFlags
+ # msDS-IntId should be created if forest functional
+ # level is >= DS_DC_FUNCTION_2003
+ # and missing otherwise
+ (class_name, class_ldap_name, class_dn) = self._make_obj_names("msDS-IntId-Class-1-")
+ ldif = self._make_class_ldif(class_dn, class_name)
+
+ # try to add msDS-IntId during Class creation
+ ldif_add = ldif + "msDS-IntId: -1993108831\n"
+ self.ldb.add_ldif(ldif_add)
+ self._ldap_schemaUpdateNow()
+
+ res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
+ self.assertEquals(len(res), 1)
+ self.assertEquals(res[0]["msDS-IntId"][0], "-1993108831")
+
+ # add a new Class and update schema
+ (class_name, class_ldap_name, class_dn) = self._make_obj_names("msDS-IntId-Class-2-")
+ ldif = self._make_class_ldif(class_dn, class_name)
+
+ self.ldb.add_ldif(ldif)
+ self._ldap_schemaUpdateNow()
+
+ # Search for created Class
+ res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
+ self.assertEquals(len(res), 1)
+ self.assertFalse("msDS-IntId" in res[0])
+
+ msg = Message()
+ msg.dn = Dn(self.ldb, class_dn)
+ msg["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE, "msDS-IntId")
+ try:
+ self.ldb.modify(msg)
+ self.fail("Modifying msDS-IntId should return error")
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+
+ # 2. Create Class with systemFlags = FLAG_SCHEMA_BASE_OBJECT
+ # msDS-IntId should be created if forest functional
+ # level is >= DS_DC_FUNCTION_2003
+ # and missing otherwise
+ (class_name, class_ldap_name, class_dn) = self._make_obj_names("msDS-IntId-Class-3-")
+ ldif = self._make_class_ldif(class_dn, class_name)
+ ldif += "systemFlags: 16\n"
+
+ # try to add msDS-IntId during Class creation
+ ldif_add = ldif + "msDS-IntId: -1993108831\n"
+ self.ldb.add_ldif(ldif_add)
+
+ res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
+ self.assertEquals(len(res), 1)
+ self.assertEquals(res[0]["msDS-IntId"][0], "-1993108831")
+
+ # add the new Class and update schema
+ (class_name, class_ldap_name, class_dn) = self._make_obj_names("msDS-IntId-Class-4-")
+ ldif = self._make_class_ldif(class_dn, class_name)
+ ldif += "systemFlags: 16\n"
+
+ self.ldb.add_ldif(ldif)
+ self._ldap_schemaUpdateNow()
+
+ # Search for created Class
+ res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
+ self.assertEquals(len(res), 1)
+ self.assertFalse("msDS-IntId" in res[0])
+
+ msg = Message()
+ msg.dn = Dn(self.ldb, class_dn)
+ msg["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE, "msDS-IntId")
+ try:
+ self.ldb.modify(msg)
+ self.fail("Modifying msDS-IntId should return error")
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+ res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
+ self.assertEquals(len(res), 1)
+ self.assertFalse("msDS-IntId" in res[0])
+
+
+ def test_verify_msDS_IntId(self):
+ """Verify msDS-IntId exists only on attributes without FLAG_SCHEMA_BASE_OBJECT flag set"""
+ count = 0
+ res = self.ldb.search(self.schema_dn, scope=SCOPE_ONELEVEL,
+ expression="objectClass=attributeSchema",
+ attrs=["systemFlags", "msDS-IntId", "attributeID", "cn"])
+ self.assertTrue(len(res) > 1)
+ for ldb_msg in res:
+ if self.forest_level >= DS_DC_FUNCTION_2003:
+ if self._is_schema_base_object(ldb_msg):
+ self.assertTrue("msDS-IntId" not in ldb_msg)
+ else:
+ # don't assert here as there are plenty of
+ # attributes under w2k8 that are not part of
+ # Base Schema (SYSTEM_FLAG_SCHEMA_BASE_OBJECT flag not set)
+ # has not msDS-IntId attribute set
+ #self.assertTrue("msDS-IntId" in ldb_msg, "msDS-IntId expected on: %s" % ldb_msg.dn)
+ if "msDS-IntId" not in ldb_msg:
+ count = count + 1
+ print "%3d warning: msDS-IntId expected on: %-30s %s" % (count, ldb_msg["attributeID"], ldb_msg["cn"])
+ else:
+ self.assertTrue("msDS-IntId" not in ldb_msg)
+
+
+if not "://" in host:
+ if os.path.isfile(host):
+ host = "tdb://%s" % host
+ else:
+ host = "ldap://%s" % host
+
+ldb_options = []
+if host.startswith("ldap://"):
+ # user 'paged_search' module when connecting remotely
+ ldb_options = ["modules:paged_searches"]
+
+ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp, options=ldb_options)
+if not "tdb://" in host:
+ gc_ldb = Ldb("%s:3268" % host, credentials=creds,
+ session_info=system_session(), lp=lp)
+else:
+ gc_ldb = None
+
+runner = SubunitTestRunner()
+rc = 0
+if not runner.run(unittest.makeSuite(SchemaTests)).wasSuccessful():
+ rc = 1
+if not runner.run(unittest.makeSuite(SchemaTests_msDS_IntId)).wasSuccessful():
+ rc = 1
+sys.exit(rc)