summaryrefslogtreecommitdiff
path: root/server
diff options
context:
space:
mode:
authorStephen Gallagher <sgallagh@redhat.com>2009-09-21 06:46:29 -0400
committerStephen Gallagher <sgallagh@redhat.com>2009-10-12 14:18:14 -0400
commit12f673d54bd7a1a5ecdc2f519ac85876bb22ecae (patch)
tree6d9cd6138f4a023cde4166b71ef00abce7b30cbd /server
parent85378ff4fb73e16261ef52b6b6217458374440bf (diff)
downloadsssd-12f673d54bd7a1a5ecdc2f519ac85876bb22ecae.tar.gz
sssd-12f673d54bd7a1a5ecdc2f519ac85876bb22ecae.tar.bz2
sssd-12f673d54bd7a1a5ecdc2f519ac85876bb22ecae.zip
Add new SSSDConfig python API
Also adds unit tests for the SSSDConfig API
Diffstat (limited to 'server')
-rw-r--r--server/config/SSSDConfig.py645
-rw-r--r--server/config/SSSDConfigTest.py1310
-rw-r--r--server/config/etc/sssd.api.conf48
-rw-r--r--server/config/etc/sssd.api.d/sssd-krb5.conf13
-rw-r--r--server/config/etc/sssd.api.d/sssd-ldap.conf32
-rw-r--r--server/config/etc/sssd.api.d/sssd-local.conf11
-rw-r--r--server/config/testconfigs/noparse.api.conf7
-rw-r--r--server/config/testconfigs/sssd-invalid.conf3
-rw-r--r--server/config/testconfigs/sssd-valid.conf42
9 files changed, 2111 insertions, 0 deletions
diff --git a/server/config/SSSDConfig.py b/server/config/SSSDConfig.py
new file mode 100644
index 00000000..07e967ba
--- /dev/null
+++ b/server/config/SSSDConfig.py
@@ -0,0 +1,645 @@
+'''
+Created on Sep 18, 2009
+
+@author: sgallagh
+'''
+
+import os
+import exceptions
+from ConfigParser import RawConfigParser, NoSectionError
+
+# Exceptions
+class SSSDConfigException(Exception): pass
+class ParsingError(Exception): pass
+class AlreadyInitializedError(SSSDConfigException): pass
+class NotInitializedError(SSSDConfigException): pass
+class NoOutputFileError(SSSDConfigException): pass
+class NoServiceError(SSSDConfigException): pass
+class NoSectionError(SSSDConfigException): pass
+class NoOptionError(SSSDConfigException): pass
+class ServiceNotRecognizedError(SSSDConfigException): pass
+class ServiceAlreadyExists(SSSDConfigException): pass
+class NoDomainError(SSSDConfigException): pass
+class DomainNotRecognized(SSSDConfigException): pass
+class NoSuchProviderError(SSSDConfigException): pass
+class NoSuchProviderSubtypeError(SSSDConfigException): pass
+class ProviderSubtypeInUse(SSSDConfigException): pass
+
+class SSSDConfigSchema(RawConfigParser):
+ def __init__(self, schemafile, schemaplugindir):
+ #TODO: get these from a global setting
+ if not schemafile:
+ schemafile = '/etc/sssd/sssd.api.conf'
+ if not schemaplugindir:
+ schemaplugindir = '/etc/sssd/sssd.api.d'
+
+ RawConfigParser.__init__(self, None, dict)
+ try:
+ #Read the primary config file
+ fd = open(schemafile, 'r')
+ self.readfp(fd)
+ fd.close()
+ # Read in the provider files
+ for file in os.listdir(schemaplugindir):
+ fd = open(schemaplugindir+ "/" + file)
+ self.readfp(fd)
+ fd.close()
+ except IOError:
+ raise
+ except:
+ raise ParsingError
+
+ # Set up lookup table for types
+ self.type_lookup = {
+ 'bool' : bool,
+ 'int' : int,
+ 'long' : long,
+ 'float': float,
+ 'str' : str,
+ 'list' : list,
+ 'None' : None
+ }
+
+ # Lookup table for acceptable boolean values
+ self.bool_lookup = {
+ 'false' : False,
+ 'true' : True,
+ }
+
+ def _striplist(self, l):
+ return([x.strip() for x in l])
+
+ def get_options(self, section):
+ if not self.has_section(section):
+ raise NoSectionError
+ options = self.options(section)
+
+ # Parse values
+ parsed_options = {}
+ for option in options:
+ unparsed_option = self.get(section, option)
+ split_option = self._striplist(unparsed_option.split(','))
+ optionlen = len(split_option)
+
+ primarytype = self.type_lookup[split_option[0]]
+ subtype = self.type_lookup[split_option[1]]
+
+ if optionlen == 2:
+ # This option has no defaults
+ parsed_options[option] = \
+ (primarytype,
+ subtype,
+ None)
+ elif optionlen == 3:
+ if type(split_option[2]) == primarytype:
+ parsed_options[option] = \
+ (primarytype,
+ subtype,
+ split_option[2])
+ elif primarytype == list:
+ if (type(split_option[2]) == subtype):
+ parsed_options[option] = \
+ (primarytype,
+ subtype,
+ [split_option[2]])
+ else:
+ try:
+ parsed_options[option] = \
+ (primarytype,
+ subtype,
+ [subtype(split_option[2])])
+ except ValueError:
+ raise ParsingError
+ else:
+ try:
+ parsed_options[option] = \
+ (primarytype,
+ subtype,
+ primarytype(split_option[2]))
+ except ValueError:
+ raise ParsingError
+
+ elif optionlen > 3:
+ if (primarytype != list):
+ raise ParsingError
+ fixed_options = []
+ for x in split_option[2:]:
+ if type(x) != subtype:
+ try:
+ fixed_options.extend([subtype(x)])
+ except ValueError:
+ raise ParsingError
+ else:
+ fixed_options.extend([x])
+ parsed_options[option] = \
+ (primarytype,
+ subtype,
+ fixed_options)
+ else:
+ # Bad config file
+ raise ParsingError
+
+ return parsed_options
+
+ def get_option(self, section, option):
+ if not self.has_section(section):
+ raise NoSectionError(section)
+ if not self.has_option(section, option):
+ raise NoOptionError("Section [%s] has no option [%s]" %
+ (section, option))
+
+ return self.get_options(section)[option]
+
+ def get_defaults(self, section):
+ if not self.has_section(section):
+ raise NoSectionError(section)
+
+ schema_options = self.get_options(section)
+ defaults = dict([(x,schema_options[x][2])
+ for x in schema_options.keys()
+ if schema_options[x][2] != None])
+
+ return defaults
+
+ def get_services(self):
+ service_list = [x for x in self.sections()
+ if x != 'service' and
+ not x.startswith('domain') and
+ not x.startswith('provider')]
+ return service_list
+
+ def get_providers(self):
+ providers = {}
+ for section in self._sections:
+ splitsection = section.split('/')
+ if (splitsection[0] == 'provider'):
+ if(len(splitsection) == 3):
+ if not providers.has_key(splitsection[1]):
+ providers[splitsection[1]] = []
+ providers[splitsection[1]].extend([splitsection[2]])
+ for key in providers.keys():
+ providers[key] = tuple(providers[key])
+ return providers
+
+class SSSDService:
+ '''
+ classdocs
+ '''
+
+ def __init__(self, servicename, apischema):
+ if not isinstance(apischema, SSSDConfigSchema) or type(servicename) != str:
+ raise TypeError
+
+ if not apischema.has_section(servicename):
+ raise ServiceNotRecognizedError(servicename)
+
+ self.name = servicename
+ self.schema = apischema
+
+ # Set up the service object with any known defaults
+ self.options = {}
+
+ # Set up default options for all services
+ self.options.update(self.schema.get_defaults('service'))
+
+ # Set up default options for this service
+ self.options.update(self.schema.get_defaults(self.name))
+
+ def get_name(self):
+ return self.name
+
+ def list_options(self):
+ options = {}
+
+ # Get the list of available options for all services
+ schema_options = self.schema.get_options('service')
+ options.update(schema_options)
+
+ schema_options = self.schema.get_options(self.name)
+ options.update(schema_options)
+
+ return options
+
+ def _striplist(self, l):
+ return([x.strip() for x in l])
+
+ def set_option(self, optionname, value):
+ if self.schema.has_option(self.name, optionname):
+ option_schema = self.schema.get_option(self.name, optionname)
+ elif self.schema.has_option('service', optionname):
+ option_schema = self.schema.get_option('service', optionname)
+ else:
+ raise NoOptionError('Section [%s] has no option [%s]' % (self.name, optionname))
+
+ if value == None:
+ self.remove_option(optionname)
+ return
+
+ # If we were expecting a list and didn't get one,
+ # Create a list with a single entry. If it's the
+ # wrong subtype, it will fail below
+ if option_schema[0] == list and type(value) != list:
+ if type(value) == str:
+ value = self._striplist(value.split(','))
+ else:
+ value = [value]
+
+ if type(value) != option_schema[0]:
+ # If it's possible to convert it, do so
+ try:
+ value = option_schema[0](value)
+ except ValueError:
+ raise TypeError('Expected %s for %s, received %s' %
+ (option_schema[0], optionname, type(value)))
+
+ if type(value) == list:
+ # Iterate through the list an ensure that all members
+ # are of the appropriate subtype
+ try:
+ value = [option_schema[1](x)
+ for x in value]
+ except ValueError:
+ raise TypeError('Expected %s' % option_schema[1])
+
+ self.options[optionname] = value
+
+ def get_option(self, optionname):
+ if optionname in self.options.keys():
+ return self.options[optionname]
+ raise NoOptionError(optionname)
+
+ def get_all_options(self):
+ return self.options
+
+ def remove_option(self, optionname):
+ if self.options.has_key(optionname):
+ del self.options[optionname]
+
+class SSSDDomain:
+ def __init__(self, domainname, apischema):
+ if not isinstance(apischema, SSSDConfigSchema) or type(domainname) != str:
+ raise TypeError
+
+ self.name = domainname
+ self.schema = apischema
+ self.active = False
+ self.oldname = None
+ self.providers = []
+
+ # Set up the domain object with any known defaults
+ self.options = {}
+
+ # Set up default options for all domains
+ self.options.update(self.schema.get_defaults('provider'))
+ self.options.update(self.schema.get_defaults('domain'))
+
+ def get_name(self):
+ return self.name
+
+ def set_active(self, active):
+ self.active = bool(active)
+
+ def list_options(self):
+ options = {}
+ # Get the list of available options for all domains
+ options.update(self.schema.get_options('provider'))
+
+ options.update(self.schema.get_options('domain'))
+
+ # Candidate for future optimization: will update primary type
+ # for each subtype
+ for (provider, providertype) in self.providers:
+ schema_options = self.schema.get_options('provider/%s'
+ % provider)
+ options.update(schema_options)
+ schema_options = self.schema.get_options('provider/%s/%s'
+ % (provider, providertype))
+ options.update(schema_options)
+ return options
+
+ def list_provider_options(self, provider, provider_type=None):
+ #TODO section checking
+
+ options = self.schema.get_options('provider/%s' % provider)
+ if(provider_type):
+ options.update(self.schema.get_options('provider/%s/%s' %
+ (provider, provider_type)))
+ else:
+ # Add options from all provider subtypes
+ known_providers = self.list_providers()
+ for provider_type in known_providers[provider]:
+ options.update(self.list_provider_options(provider,
+ provider_type))
+ return options
+
+ def list_providers(self):
+ return self.schema.get_providers()
+
+ def set_option(self, option, value):
+ options = self.list_options()
+ if (option not in options.keys()):
+ raise NoOptionError('Section [%s] has no option [%s]' %
+ (self.name, option))
+
+ if value == None:
+ self.remove_option(option)
+ return
+
+ option_schema = options[option]
+
+ # If we were expecting a list and didn't get one,
+ # Create a list with a single entry. If it's the
+ # wrong subtype, it will fail below
+ if option_schema[0] == list and type(value) != list:
+ if type(value) == str:
+ value = self._striplist(value.split(','))
+ else:
+ value = [value]
+
+ if type(value) != option_schema[0]:
+ # If it's possible to convert it, do so
+ try:
+ value = option_schema[0](value)
+ except ValueError:
+ raise TypeError('Expected %s for %s, received %s' %
+ (option_schema[0], option, type(value)))
+
+ if type(value) == list:
+ # Iterate through the list an ensure that all members
+ # are of the appropriate subtype
+ try:
+ value = [option_schema[1](x)
+ for x in value]
+ except ValueError:
+ raise TypeError('Expected %s' % option_schema[1])
+
+ # Check whether we're adding a provider entry.
+ # This requires special handling
+ is_provider = option.rfind('_provider')
+ if (is_provider > 0):
+ provider = option[:is_provider]
+ self.add_provider(value, provider)
+ else:
+ self.options[option] = value
+
+ def get_option(self, optionname):
+ if optionname in self.options.keys():
+ return self.options[optionname]
+ raise NoOptionError(optionname)
+
+ def get_all_options(self):
+ return self.options
+
+ def remove_option(self, optionname):
+ if optionname in self.options.keys():
+ del self.options[optionname]
+
+ def add_provider(self, provider, provider_type):
+ # Check that provider and provider_type are valid
+ configured_providers = self.list_providers()
+ if provider in configured_providers.keys():
+ if provider_type not in configured_providers[provider]:
+ raise NoSuchProviderSubtypeError(provider_type)
+ else:
+ raise NoSuchProviderError
+
+ # Don't add a provider twice
+ with_this_type = [x for x in self.providers if x[1] == provider_type]
+ if len(with_this_type) > 1:
+ # This should never happen!
+ raise ProviderSubtypeInUser
+ if len(with_this_type) == 1:
+ if with_this_type[0][0] != provider:
+ raise ProviderSubtypeInUse(with_this_type[0][0])
+ else:
+ self.providers.extend([(provider, provider_type)])
+
+ option_name = '%s_provider' % provider_type
+ self.options[option_name] = provider
+
+ # Add defaults for this provider
+ self.options.update(self.schema.get_defaults('provider/%s' %
+ provider))
+ self.options.update(self.schema.get_defaults('provider/%s/%s' %
+ (provider,
+ provider_type)))
+
+
+ def remove_provider(self, provider, provider_type):
+ if (provider,provider_type) not in self.providers:
+ return
+
+ # TODO: safely remove any unused options when removing
+ # the provider. This will require modifying the schema
+ # to account for multiple providers making use of the
+ # same options (such ask krb5_realm)
+
+ self.providers.remove((provider,provider_type))
+
+class SSSDConfig(RawConfigParser):
+ def __init__(self, schemafile=None, schemaplugindir=None):
+ RawConfigParser.__init__(self, None, dict)
+ self.schema = SSSDConfigSchema(schemafile, schemaplugindir)
+ self.configfile = None
+ self.initialized = False
+
+ def import_config(self,configfile=None):
+ if self.initialized:
+ raise AlreadyInitializedError
+
+ if not configfile:
+ #TODO: get this from a global setting
+ configfile = '/etc/sssd/sssd.conf'
+ # open will raise an IOError if it fails
+ fd = open(configfile, 'r')
+
+ try:
+ self.readfp(fd)
+ except:
+ raise ParsingError
+
+ fd.close()
+ self.configfile = configfile
+ self.initialized = True
+
+ def new_config(self):
+ if self.initialized:
+ raise AlreadyInitializedError
+
+ self.initialized = True
+
+ #Initialize all services
+ for servicename in self.schema.get_services():
+ service = self.new_service(servicename)
+
+ def write(self, outputfile=None):
+ if not self.initialized:
+ raise NotInitializedError
+
+ if outputfile == None:
+ if(self.configfile == None):
+ raise NoOutputFileError
+
+ outputfile = self.configfile
+
+ # open() will raise IOError if it fails
+ of = open(outputfile, 'w')
+ RawConfigParser.write(self, of)
+ of.close()
+
+ def list_services(self):
+ if not self.initialized:
+ raise NotInitializedError
+
+ service_list = [x for x in self.sections()
+ if not x.startswith('domain')]
+ return service_list
+
+ def get_service(self, name):
+ if not self.initialized:
+ raise NotInitializedError
+ if not self.has_section(name):
+ raise NoServiceError
+
+ service = SSSDService(name, self.schema)
+ [service.set_option(option, value)
+ for (option,value) in self.items(name)]
+
+ return service
+
+ def new_service(self, name):
+ if not self.initialized:
+ raise NotInitializedError
+ if (self.has_section(name)):
+ raise ServiceAlreadyExists(name)
+
+ service = SSSDService(name, self.schema)
+ self.save_service(service)
+ return service
+
+ def delete_service(self, name):
+ if not self.initialized:
+ raise NotInitializedError
+ self.remove_section(name)
+
+ def save_service(self, service):
+ if not self.initialized:
+ raise NotInitializedError
+ if not isinstance(service, SSSDService):
+ raise TypeError
+
+ name = service.get_name()
+ # Ensure that the existing section is removed
+ # This way we ensure that we are getting a
+ # complete copy of the service.
+ # remove_section() is a noop if the section
+ # does not exist.
+ self.remove_section(name)
+ self.add_section(name)
+ option_dict = service.get_all_options()
+ for option in option_dict.keys():
+ value = option_dict[option]
+ if (type(value) == list):
+ value = ', '.join(value)
+
+ self.set(name, option, value)
+
+ def _striplist(self, l):
+ return([x.strip() for x in l])
+
+ def list_active_domains(self):
+ if not self.initialized:
+ raise NotInitializedError
+
+ if (self.has_option('sssd', 'domains')):
+ active_domains = self._striplist(self.get('sssd', 'domains').split(','))
+ else:
+ active_domains = []
+
+ domains = [x for x in self.list_domains()
+ if x in active_domains]
+ return domains
+
+ def list_inactive_domains(self):
+ if not self.initialized:
+ raise NotInitializedError
+
+ if (self.has_option('sssd', 'domains')):
+ active_domains = self._striplist(self.get('sssd', 'domains').split(','))
+ else:
+ active_domains = []
+
+ domains = [x for x in self.list_domains()
+ if x not in active_domains]
+ return domains
+
+ def list_domains(self):
+ if not self.initialized:
+ raise NotInitializedError
+ domains = [x[7:] for x in self.sections() if x.startswith('domain/')]
+ return domains
+
+ def get_domain(self, name):
+ if not self.initialized:
+ raise NotInitializedError
+ if not self.has_section('domain/%s' % name):
+ raise NoDomainError(name)
+
+ domain = SSSDDomain(name, self.schema)
+
+ # Read in the providers first or we may have type
+ # errors trying to read in their options
+ providers = [x for x in self.items('domain/%s' % name)
+ if x[0].rfind('_provider') > 0]
+ [domain.set_option(option, value)
+ for (option, value) in providers]
+
+ [domain.set_option(option, value)
+ for (option,value) in self.items('domain/%s' % name)
+ if (option,value) not in providers]
+
+ return domain
+
+ def new_domain(self, name):
+ if not self.initialized:
+ raise NotInitializedError
+ if self.has_section('domain/%s' % name):
+ raise DomainAlreadyExistsError
+
+ domain = SSSDDomain(name, self.schema)
+ self.save_domain(domain);
+ return domain
+
+ def delete_domain(self, name):
+ if not self.initialized:
+ raise NotInitializedError
+ self.remove_section('domain/%s' % name)
+
+ def save_domain(self, domain):
+ if not self.initialized:
+ raise NotInitializedError
+ if not isinstance(domain, SSSDDomain):
+ raise TypeError
+
+ name = domain.get_name()
+ sectionname = 'domain/%s' % name
+ # Ensure that the existing section is removed
+ # This way we ensure that we are getting a
+ # complete copy of the service.
+ # remove_section() is a noop if the section
+ # does not exist.
+ self.remove_section(sectionname)
+ self.add_section(sectionname)
+ option_dict = domain.get_all_options()
+ [self.set(sectionname, option, option_dict[option])
+ for option in option_dict.keys()]
+
+ if domain.active:
+ if domain.get_name not in self.list_active_domains():
+ # Add it to the list of active domains
+ if (self.has_option('sssd','domains')):
+ active_domains = self.get('sssd', 'domains')
+ active_domains += ", %s" % domain.get_name()
+ else:
+ active_domains = domain.get_name()
+ self.set('sssd', 'domains', active_domains)
diff --git a/server/config/SSSDConfigTest.py b/server/config/SSSDConfigTest.py
new file mode 100644
index 00000000..b597f760
--- /dev/null
+++ b/server/config/SSSDConfigTest.py
@@ -0,0 +1,1310 @@
+'''
+Created on Sep 18, 2009
+
+@author: sgallagh
+'''
+import unittest
+
+import SSSDConfig
+
+class SSSDConfigTestValid(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def testServices(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+ sssdconfig.import_config("testconfigs/sssd-valid.conf")
+
+ # Validate services
+ services = sssdconfig.list_services()
+ self.assertTrue('sssd' in services)
+ self.assertTrue('nss' in services)
+ self.assertTrue('pam' in services)
+ self.assertTrue('dp' in services)
+
+ #Verify service attributes
+ sssd_service = sssdconfig.get_service('sssd')
+ service_opts = sssd_service.list_options()
+
+ self.assertTrue('config_file_version' in service_opts.keys())
+ self.assertEquals(sssd_service.get_option('config_file_version'), 2)
+
+ self.assertTrue('services' in service_opts.keys())
+ service_list = sssd_service.get_option('services')
+ self.assertTrue('nss' in service_list)
+ self.assertTrue('pam' in service_list)
+
+ self.assertTrue('domains' in service_opts)
+
+ self.assertTrue('reconnection_retries' in service_opts)
+
+ del sssdconfig
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+ sssdconfig.new_config()
+ sssdconfig.delete_service('sssd')
+ new_sssd_service = sssdconfig.new_service('sssd');
+ new_options = new_sssd_service.list_options();
+
+ self.assertTrue('debug_level' in new_options)
+ self.assertEquals(new_options['debug_level'][0], int)
+
+ self.assertTrue('command' in new_options)
+ self.assertEquals(new_options['command'][0], str)
+
+ self.assertTrue('reconnection_retries' in new_options)
+ self.assertEquals(new_options['reconnection_retries'][0], int)
+
+ self.assertTrue('config_file_version' in new_options)
+ self.assertEquals(new_options['config_file_version'][0], int)
+
+ self.assertTrue('services' in new_options)
+ self.assertEquals(new_options['debug_level'][0], int)
+
+ self.assertTrue('domains' in new_options)
+ self.assertEquals(new_options['domains'][0], list)
+ self.assertEquals(new_options['domains'][1], str)
+
+ self.assertTrue('sbus_timeout' in new_options)
+ self.assertEquals(new_options['sbus_timeout'][0], int)
+
+ self.assertTrue('re_expression' in new_options)
+ self.assertEquals(new_options['re_expression'][0], str)
+
+ self.assertTrue('full_name_format' in new_options)
+ self.assertEquals(new_options['full_name_format'][0], str)
+
+ del sssdconfig
+ pass
+
+ def testDomains(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+ sssdconfig.import_config("testconfigs/sssd-valid.conf")
+
+ #Validate domain list
+ domains = sssdconfig.list_domains()
+ self.assertTrue('LOCAL' in domains)
+ self.assertTrue('LDAP' in domains)
+ self.assertTrue('PROXY' in domains)
+ self.assertTrue('IPA' in domains)
+
+ #Verify domain attributes
+ ipa_domain = sssdconfig.get_domain('IPA')
+ domain_opts = ipa_domain.list_options()
+ self.assertTrue('debug_level' in domain_opts.keys())
+ self.assertTrue('id_provider' in domain_opts.keys())
+ self.assertTrue('auth_provider' in domain_opts.keys())
+
+ del sssdconfig
+ pass
+
+ def testListProviders(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+
+ sssdconfig.new_config()
+ junk_domain = sssdconfig.new_domain('junk')
+ providers = junk_domain.list_providers()
+ self.assertTrue('ldap' in providers.keys())
+
+ def testCreateNewLocalConfig(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+
+ sssdconfig.new_config()
+
+ local_domain = sssdconfig.new_domain('LOCAL')
+ local_domain.add_provider('local', 'id')
+ local_domain.set_option('debug_level', 1)
+ local_domain.set_option('default_shell', '/bin/tcsh')
+ local_domain.set_active(True)
+ sssdconfig.save_domain(local_domain)
+
+ sssdconfig.write('/tmp/testCreateNewLocalConfig.conf')
+
+ def testCreateNewLDAPConfig(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+
+ sssdconfig.new_config()
+
+ ldap_domain = sssdconfig.new_domain('LDAP')
+ ldap_domain.add_provider('ldap', 'id')
+ ldap_domain.set_option('debug_level', 1)
+ ldap_domain.set_active(True)
+ sssdconfig.save_domain(ldap_domain)
+
+ sssdconfig.write('/tmp/testCreateNewLDAPConfig.conf')
+
+ def testModifyExistingConfig(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+ sssdconfig.import_config("testconfigs/sssd-valid.conf")
+
+ ldap_domain = sssdconfig.get_domain('LDAP')
+ ldap_domain.set_option('debug_level', 3)
+
+ ldap_domain.remove_provider('ldap', 'auth')
+ ldap_domain.add_provider('krb5', 'auth')
+ ldap_domain.set_active(True)
+ sssdconfig.save_domain(ldap_domain)
+
+ sssdconfig.write('/tmp/testModifyExistingConfig.conf')
+
+class SSSDConfigTestSSSDService(unittest.TestCase):
+ def setUp(self):
+ self.schema = SSSDConfig.SSSDConfigSchema("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+ pass
+
+ def tearDown(self):
+ pass
+
+ def testInit(self):
+ # Positive test
+ service = SSSDConfig.SSSDService('sssd', self.schema)
+
+ # Type Error test
+ # Name is not a string
+ try:
+ service = SSSDConfig.SSSDService(3, self.schema)
+ except TypeError:
+ pass
+ else:
+ self.fail("Expected TypeError exception")
+
+ # TypeError test
+ # schema is not an SSSDSchema
+ try:
+ service = SSSDConfig.SSSDService('3', self)
+ except TypeError:
+ pass
+ else:
+ self.fail("Expected TypeError exception")
+
+ # ServiceNotRecognizedError test
+ try:
+ service = SSSDConfig.SSSDService('ssd', self.schema)
+ except SSSDConfig.ServiceNotRecognizedError:
+ pass
+ else:
+ self.fail("Expected ServiceNotRecognizedError")
+
+
+ def testListOptions(self):
+ service = SSSDConfig.SSSDService('sssd', self.schema)
+
+ options = service.list_options()
+ control_list = [
+ 'config_file_version',
+ 'services',
+ 'domains',
+ 'sbus_timeout',
+ 're_expression',
+ 'full_name_format',
+ 'debug_level',
+ 'command',
+ 'reconnection_retries']
+
+ self.assertTrue(type(options) == dict,
+ "Options should be a dictionary")
+
+ # Ensure that all of the expected defaults are there
+ for option in control_list:
+ self.assertTrue(option in options.keys(),
+ "Option [%s] missing" %
+ option)
+
+ # Ensure that there aren't any unexpected options listed
+ for option in options.keys():
+ self.assertTrue(option in control_list,
+ 'Option [%s] unexpectedly found' %
+ option)
+
+ self.assertTrue(type(options['config_file_version']) == tuple,
+ "Option values should be a tuple")
+
+ self.assertTrue(options['config_file_version'][0] == int,
+ "config_file_version should require an int. " +
+ "list_options is requiring a %s" %
+ options['config_file_version'][0])
+
+ self.assertTrue(options['config_file_version'][1] == None,
+ "config_file_version should not require a subtype. " +
+ "list_options is requiring a %s" %
+ options['config_file_version'][1])
+
+ self.assertTrue(options['config_file_version'][0] == int,
+ "config_file_version should default to 2. " +
+ "list_options specifies %d" %
+ options['config_file_version'][2])
+
+ self.assertTrue(type(options['services']) == tuple,
+ "Option values should be a tuple")
+
+ self.assertTrue(options['services'][0] == list,
+ "services should require an list. " +
+ "list_options is requiring a %s" %
+ options['services'][0])
+
+ self.assertTrue(options['services'][1] == str,
+ "services should require a subtype of str. " +
+ "list_options is requiring a %s" %
+ options['services'][1])
+
+ def testSetOption(self):
+ service = SSSDConfig.SSSDService('sssd', self.schema)
+
+ # Positive test - Exactly right
+ service.set_option('debug_level', 2)
+ self.assertEqual(service.get_option('debug_level'), 2)
+
+ # Positive test - Allow converting "safe" values
+ service.set_option('debug_level', '2')
+ self.assertEqual(service.get_option('debug_level'), 2)
+
+ # Positive test - Remove option if value is None
+ service.set_option('debug_level', None)
+ self.assertTrue('debug_level' not in service.options.keys())
+
+ # Negative test - Nonexistent Option
+ try:
+ service.set_option('nosuchoption', 1)
+ except SSSDConfig.NoOptionError:
+ pass
+ else:
+ self.fail("Expected NoOptionError")
+
+ # Negative test - Incorrect type
+ try:
+ service.set_option('debug_level', 'two')
+ except TypeError:
+ pass
+ else:
+ self.fail("Expected TypeError")
+
+ def testGetOption(self):
+ service = SSSDConfig.SSSDService('sssd', self.schema)
+
+ # Positive test - Single-valued
+ self.assertEqual(service.get_option('config_file_version'), 2)
+
+ # Positive test - List of values
+ self.assertEqual(service.get_option('services'), ['nss', 'pam'])
+
+ # Negative Test - Bad Option
+ try:
+ service.get_option('nosuchoption')
+ except SSSDConfig.NoOptionError:
+ pass
+ else:
+ self.fail("Expected NoOptionError")
+
+ def testGetAllOptions(self):
+ service = SSSDConfig.SSSDService('sssd', self.schema)
+
+ #Positive test
+ options = service.get_all_options()
+ control_list = [
+ 'config_file_version',
+ 'services',
+ 'sbus_timeout',
+ 're_expression',
+ 'full_name_format',
+ 'debug_level',
+ 'reconnection_retries']
+
+ self.assertTrue(type(options) == dict,
+ "Options should be a dictionary")
+
+ # Ensure that all of the expected defaults are there
+ for option in control_list:
+ self.assertTrue(option in options.keys(),
+ "Option [%s] missing" %
+ option)
+
+ # Ensure that there aren't any unexpected options listed
+ for option in options.keys():
+ self.assertTrue(option in control_list,
+ 'Option [%s] unexpectedly found' %
+ option)
+
+ def testRemoveOption(self):
+ service = SSSDConfig.SSSDService('sssd', self.schema)
+
+ # Positive test - Remove an option that exists
+ self.assertEqual(service.get_option('debug_level'), 0)
+ service.remove_option('debug_level')
+ try:
+ service.get_option('debug_level')
+ except SSSDConfig.NoOptionError:
+ pass
+ else:
+ self.fail("debug_level should have been removed")
+
+ # Positive test - Remove an option that doesn't exist
+ try:
+ service.get_option('nosuchentry')
+ except SSSDConfig.NoOptionError:
+ pass
+ else:
+ self.fail("nosuchentry should not exist")
+
+ service.remove_option('nosuchentry')
+
+class SSSDConfigTestSSSDDomain(unittest.TestCase):
+ def setUp(self):
+ self.schema = SSSDConfig.SSSDConfigSchema("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+ pass
+
+ def tearDown(self):
+ pass
+
+ def testInit(self):
+ # Positive Test
+ domain = SSSDConfig.SSSDDomain('mydomain', self.schema)
+
+ # Negative Test - Name not a string
+ try:
+ domain = SSSDConfig.SSSDDomain(2, self.schema)
+ except TypeError:
+ pass
+ else:
+ self.fail("Expected TypeError")
+
+ # Negative Test - Schema is not an SSSDSchema
+ try:
+ domain = SSSDConfig.SSSDDomain('mydomain', self)
+ except TypeError:
+ pass
+ else:
+ self.fail("Expected TypeError")
+
+ def testGetName(self):
+ # Positive Test
+ domain = SSSDConfig.SSSDDomain('mydomain', self.schema)
+
+ self.assertEqual(domain.get_name(), 'mydomain')
+
+ def testSetActive(self):
+ #Positive Test
+ domain = SSSDConfig.SSSDDomain('mydomain', self.schema)
+
+ # Should default to inactive
+ self.assertFalse(domain.active)
+ domain.set_active(True)
+ self.assertTrue(domain.active)
+ domain.set_active(False)
+ self.assertFalse(domain.active)
+
+ def testListOptions(self):
+ domain = SSSDConfig.SSSDDomain('sssd', self.schema)
+
+ # First test default options
+ options = domain.list_options()
+ control_list = [
+ 'debug_level',
+ 'min_id',
+ 'max_id',
+ 'timeout',
+ 'magic_private_groups',
+ 'enumerate',
+ 'cache_credentials',
+ 'use_fully_qualified_names',
+ 'id_provider',
+ 'auth_provider',
+ 'access_provider',
+ 'chpass_provider']
+
+ self.assertTrue(type(options) == dict,
+ "Options should be a dictionary")
+
+ # Ensure that all of the expected defaults are there
+ for option in control_list:
+ self.assertTrue(option in options.keys(),
+ "Option [%s] missing" %
+ option)
+
+ # Ensure that there aren't any unexpected options listed
+ for option in options.keys():
+ self.assertTrue(option in control_list,
+ 'Option [%s] unexpectedly found' %
+ option)
+
+ self.assertTrue(type(options['max_id']) == tuple,
+ "Option values should be a tuple")
+
+ self.assertTrue(options['max_id'][0] == int,
+ "config_file_version should require an int. " +
+ "list_options is requiring a %s" %
+ options['max_id'][0])
+
+ self.assertTrue(options['max_id'][1] == None,
+ "config_file_version should not require a subtype. " +
+ "list_options is requiring a %s" %
+ options['max_id'][1])
+
+ # Add a provider and verify that the new options appear
+ domain.add_provider('local', 'id')
+ control_list.extend(
+ ['default_shell',
+ 'base_directory'])
+
+ options = domain.list_options()
+
+ self.assertTrue(type(options) == dict,
+ "Options should be a dictionary")
+
+ # Ensure that all of the expected defaults are there
+ for option in control_list:
+ self.assertTrue(option in options.keys(),
+ "Option [%s] missing" %
+ option)
+
+ # Ensure that there aren't any unexpected options listed
+ for option in options.keys():
+ self.assertTrue(option in control_list,
+ 'Option [%s] unexpectedly found' %
+ option)
+
+ # Add a provider that has global options and verify that
+ # The new options appear.
+ domain.add_provider('krb5', 'auth')
+
+ backup_list = control_list[:]
+ control_list.extend(
+ ['krb5_kdcip',
+ 'krb5_realm',
+ 'krb5_ccachedir',
+ 'krb5_ccname_template',
+ 'krb5_auth_timeout'])
+
+ options = domain.list_options()
+
+ self.assertTrue(type(options) == dict,
+ "Options should be a dictionary")
+
+ # Ensure that all of the expected defaults are there
+ for option in control_list:
+ self.assertTrue(option in options.keys(),
+ "Option [%s] missing" %
+ option)
+
+ # Ensure that there aren't any unexpected options listed
+ for option in options.keys():
+ self.assertTrue(option in control_list,
+ 'Option [%s] unexpectedly found' %
+ option)
+
+ # Remove the auth domain and verify that the options
+ # revert to the backup_list
+ domain.remove_provider('krb5', 'auth')
+ options = domain.list_options()
+
+ self.assertTrue(type(options) == dict,
+ "Options should be a dictionary")
+
+ # Ensure that all of the expected defaults are there
+ for option in backup_list:
+ self.assertTrue(option in options.keys(),
+ "Option [%s] missing" %
+ option)
+
+ # Ensure that there aren't any unexpected options listed
+ for option in options.keys():
+ self.assertTrue(option in backup_list,
+ 'Option [%s] unexpectedly found' %
+ option)
+
+ def testListProviders(self):
+ domain = SSSDConfig.SSSDDomain('sssd', self.schema)
+
+ control_provider_dict = {
+ 'krb5': ('auth', 'access', 'chpass'),
+ 'local': ('auth', 'chpass', 'access', 'id'),
+ 'ldap': ('id', 'auth')}
+
+ providers = domain.list_providers()
+
+ self.assertEqual(providers, control_provider_dict)
+
+ def testListProviderOptions(self):
+ domain = SSSDConfig.SSSDDomain('sssd', self.schema)
+
+ # Test looking up a specific provider type
+ options = domain.list_provider_options('krb5', 'auth')
+ control_list = [
+ 'krb5_kdcip',
+ 'krb5_realm',
+ 'krb5_ccachedir',
+ 'krb5_ccname_template',
+ 'krb5_auth_timeout']
+
+ self.assertTrue(type(options) == dict,
+ "Options should be a dictionary")
+
+ # Ensure that all of the expected defaults are there
+ for option in control_list:
+ self.assertTrue(option in options.keys(),
+ "Option [%s] missing" %
+ option)
+
+ # Ensure that there aren't any unexpected options listed
+ for option in options.keys():
+ self.assertTrue(option in control_list,
+ 'Option [%s] unexpectedly found' %
+ option)
+
+ #Test looking up all provider values
+ options = domain.list_provider_options('krb5')
+ control_list.extend(['krb5_changepw_principal'])
+
+ self.assertTrue(type(options) == dict,
+ "Options should be a dictionary")
+
+ # Ensure that all of the expected defaults are there
+ for option in control_list:
+ self.assertTrue(option in options.keys(),
+ "Option [%s] missing" %
+ option)
+
+ # Ensure that there aren't any unexpected options listed
+ for option in options.keys():
+ self.assertTrue(option in control_list,
+ 'Option [%s] unexpectedly found' %
+ option)
+
+ def testAddProvider(self):
+ domain = SSSDConfig.SSSDDomain('sssd', self.schema)
+
+ # Positive Test
+ domain.add_provider('local', 'id')
+
+ # Negative Test - No such backend type
+ try:
+ domain.add_provider('nosuchbackend', 'auth')
+ except SSSDConfig.NoSuchProviderError:
+ pass
+ else:
+ self.fail("Expected NoSuchProviderError")
+
+ # Negative Test - No such backend subtype
+ try:
+ domain.add_provider('ldap', 'nosuchsubtype')
+ except SSSDConfig.NoSuchProviderSubtypeError:
+ pass
+ else:
+ self.fail("Expected NoSuchProviderSubtypeError")
+
+ # Negative Test - Try to add a second provider of the same type
+ try:
+ domain.add_provider('ldap', 'id')
+ except SSSDConfig.ProviderSubtypeInUse:
+ pass
+ else:
+ self.fail("Expected ProviderSubtypeInUse")
+
+ def testRemoveProvider(self):
+ domain = SSSDConfig.SSSDDomain('sssd', self.schema)
+
+ # First test default options
+ options = domain.list_options()
+ control_list = [
+ 'debug_level',
+ 'min_id',
+ 'max_id',
+ 'timeout',
+ 'magic_private_groups',
+ 'enumerate',
+ 'cache_credentials',
+ 'use_fully_qualified_names',
+ 'id_provider',
+ 'auth_provider',
+ 'access_provider',
+ 'chpass_provider']
+
+ self.assertTrue(type(options) == dict,
+ "Options should be a dictionary")
+
+ # Ensure that all of the expected defaults are there
+ for option in control_list:
+ self.assertTrue(option in options.keys(),
+ "Option [%s] missing" %
+ option)
+
+ # Ensure that there aren't any unexpected options listed
+ for option in options.keys():
+ self.assertTrue(option in control_list,
+ 'Option [%s] unexpectedly found' %
+ option)
+
+ self.assertTrue(type(options['max_id']) == tuple,
+ "Option values should be a tuple")
+
+ self.assertTrue(options['max_id'][0] == int,
+ "config_file_version should require an int. " +
+ "list_options is requiring a %s" %
+ options['max_id'][0])
+
+ self.assertTrue(options['max_id'][1] == None,
+ "config_file_version should not require a subtype. " +
+ "list_options is requiring a %s" %
+ options['max_id'][1])
+
+ # Add a provider and verify that the new options appear
+ domain.add_provider('local', 'id')
+ control_list.extend(
+ ['default_shell',
+ 'base_directory'])
+
+ options = domain.list_options()
+
+ self.assertTrue(type(options) == dict,
+ "Options should be a dictionary")
+
+ # Ensure that all of the expected defaults are there
+ for option in control_list:
+ self.assertTrue(option in options.keys(),
+ "Option [%s] missing" %
+ option)
+
+ # Ensure that there aren't any unexpected options listed
+ for option in options.keys():
+ self.assertTrue(option in control_list,
+ 'Option [%s] unexpectedly found' %
+ option)
+
+ # Add a provider that has global options and verify that
+ # The new options appear.
+ domain.add_provider('krb5', 'auth')
+
+ backup_list = control_list[:]
+ control_list.extend(
+ ['krb5_kdcip',
+ 'krb5_realm',
+ 'krb5_ccachedir',
+ 'krb5_ccname_template',
+ 'krb5_auth_timeout'])
+
+ options = domain.list_options()
+
+ self.assertTrue(type(options) == dict,
+ "Options should be a dictionary")
+
+ # Ensure that all of the expected defaults are there
+ for option in control_list:
+ self.assertTrue(option in options.keys(),
+ "Option [%s] missing" %
+ option)
+
+ # Ensure that there aren't any unexpected options listed
+ for option in options.keys():
+ self.assertTrue(option in control_list,
+ 'Option [%s] unexpectedly found' %
+ option)
+
+ # Remove the auth domain and verify that the options
+ # revert to the backup_list
+ domain.remove_provider('krb5', 'auth')
+ options = domain.list_options()
+
+ self.assertTrue(type(options) == dict,
+ "Options should be a dictionary")
+
+ # Ensure that all of the expected defaults are there
+ for option in backup_list:
+ self.assertTrue(option in options.keys(),
+ "Option [%s] missing" %
+ option)
+
+ # Ensure that there aren't any unexpected options listed
+ for option in options.keys():
+ self.assertTrue(option in backup_list,
+ 'Option [%s] unexpectedly found' %
+ option)
+
+ # Test removing nonexistent provider - Real
+ domain.remove_provider('ldap', 'id')
+
+ # Test removing nonexistent provider - Bad backend type
+ # Should pass without complaint
+ domain.remove_provider('nosuchbackend', 'id')
+
+ # Test removing nonexistent provider - Bad provider type
+ # Should pass without complaint
+ domain.remove_provider('ldap', 'nosuchprovider')
+
+ def testGetOption(self):
+ domain = SSSDConfig.SSSDDomain('sssd', self.schema)
+
+ # Positive Test - Ensure that we can get a valid option
+ self.assertEqual(domain.get_option('debug_level'), 0)
+
+ # Negative Test - Try to get valid option that is not set
+ try:
+ domain.get_option('max_id')
+ except SSSDConfig.NoOptionError:
+ pass
+ else:
+ self.fail("Expected NoOptionError")
+
+ # Positive Test - Set the above option and get it
+ domain.set_option('max_id', 10000)
+ self.assertEqual(domain.get_option('max_id'), 10000)
+
+ # Negative Test - Try yo get invalid option
+ try:
+ domain.get_option('nosuchoption')
+ except SSSDConfig.NoOptionError:
+ pass
+ else:
+ self.fail("Expected NoOptionError")
+
+ def testSetOption(self):
+ domain = SSSDConfig.SSSDDomain('sssd', self.schema)
+
+ # Positive Test
+ domain.set_option('max_id', 10000)
+ self.assertEqual(domain.get_option('max_id'), 10000)
+
+ # Positive Test - Remove option if value is None
+ domain.set_option('max_id', None)
+ self.assertTrue('max_id' not in domain.get_all_options().keys())
+
+ # Negative Test - invalid option
+ try:
+ domain.set_option('nosuchoption', 1)
+ except SSSDConfig.NoOptionError:
+ pass
+ else:
+ self.fail("Expected NoOptionError")
+
+ # Negative Test - incorrect type
+ try:
+ domain.set_option('max_id', 'a string')
+ except TypeError:
+ pass
+ else:
+ self.fail("Expected TypeError")
+
+ # Positive Test - Coax options to appropriate type
+ domain.set_option('max_id', '10000')
+ self.assertEqual(domain.get_option('max_id'), 10000)
+
+ domain.set_option('max_id', 30.2)
+ self.assertEqual(domain.get_option('max_id'), 30)
+
+ def testRemoveOption(self):
+ domain = SSSDConfig.SSSDDomain('sssd', self.schema)
+
+ # Positive test - Remove existing option
+ self.assertTrue('min_id' in domain.get_all_options().keys())
+ domain.remove_option('min_id')
+ self.assertFalse('min_id' in domain.get_all_options().keys())
+
+ # Positive test - Remove unset but valid option
+ self.assertFalse('max_id' in domain.get_all_options().keys())
+ domain.remove_option('max_id')
+ self.assertFalse('max_id' in domain.get_all_options().keys())
+
+ # Positive test - Remove unset and unknown option
+ self.assertFalse('nosuchoption' in domain.get_all_options().keys())
+ domain.remove_option('nosuchoption')
+ self.assertFalse('nosuchoption' in domain.get_all_options().keys())
+
+class SSSDConfigTestSSSDConfig(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def testInit(self):
+ # Positive test
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+
+ # Negative Test - No Such File
+ try:
+ sssdconfig = SSSDConfig.SSSDConfig("nosuchfile.api.conf",
+ "etc/sssd.api.d")
+ except IOError:
+ pass
+ else:
+ self.fail("Expected IOError")
+
+ # Negative Test - Schema is not parsable
+ try:
+ sssdconfig = SSSDConfig.SSSDConfig("testconfigs/noparse.api.conf",
+ "etc/sssd.api.d")
+ except SSSDConfig.ParsingError:
+ pass
+ else:
+ self.fail("Expected ParsingError")
+
+ def testImportConfig(self):
+ # Positive Test
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+ sssdconfig.import_config("testconfigs/sssd-valid.conf")
+
+ # Verify that all sections were imported
+ control_list = [
+ 'sssd',
+ 'nss',
+ 'pam',
+ 'dp',
+ 'domain/PROXY',
+ 'domain/IPA',
+ 'domain/LOCAL',
+ 'domain/LDAP',
+ ]
+
+ for section in control_list:
+ self.assertTrue(sssdconfig.has_section(section),
+ "Section [%s] missing" %
+ section)
+ for section in sssdconfig.sections():
+ self.assertTrue(section in control_list)
+
+ # Verify that all options were imported for a section
+ control_list = [
+ 'services',
+ 'reconnection_retries',
+ 'domains',
+ 'config_file_version']
+
+ for option in control_list:
+ self.assertTrue(sssdconfig.has_option('sssd', option),
+ "Option [%s] missing from [sssd]" %
+ option)
+ for option in sssdconfig.options('sssd'):
+ self.assertTrue(option in control_list,
+ "Option [%s] unexpectedly found" %
+ option)
+
+ #TODO: Check the types and values of the settings
+
+ # Negative Test - Missing config file
+ try:
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+ sssdconfig.import_config("nosuchfile.conf")
+ except IOError:
+ pass
+ else:
+ self.fail("Expected IOError")
+
+ # Negative Test - Invalid config file
+ try:
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+ sssdconfig.import_config("testconfigs/sssd-invalid.conf")
+ except SSSDConfig.ParsingError:
+ pass
+ else:
+ self.fail("Expected ParsingError")
+
+ # Negative Test - Already initialized
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+ sssdconfig.import_config("testconfigs/sssd-valid.conf")
+ try:
+ sssdconfig.import_config("testconfigs/sssd-valid.conf")
+ except SSSDConfig.AlreadyInitializedError:
+ pass
+ else:
+ self.fail("Expected AlreadyInitializedError")
+
+ def testNewConfig(self):
+ # Positive Test
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+ sssdconfig.new_config()
+
+ # Check that the defaults were set
+ control_list = [
+ 'sssd',
+ 'nss',
+ 'pam']
+ for section in control_list:
+ self.assertTrue(sssdconfig.has_section(section),
+ "Section [%s] missing" %
+ section)
+ for section in sssdconfig.sections():
+ self.assertTrue(section in control_list)
+
+ control_list = [
+ 'config_file_version',
+ 'services',
+ 'sbus_timeout',
+ 're_expression',
+ 'full_name_format',
+ 'debug_level',
+ 'reconnection_retries']
+ for option in control_list:
+ self.assertTrue(sssdconfig.has_option('sssd', option),
+ "Option [%s] missing from [sssd]" %
+ option)
+ for option in sssdconfig.options('sssd'):
+ self.assertTrue(option in control_list,
+ "Option [%s] unexpectedly found" %
+ option)
+
+ # Negative Test - Already Initialized
+ try:
+ sssdconfig.new_config()
+ except SSSDConfig.AlreadyInitializedError:
+ pass
+ else:
+ self.fail("Expected AlreadyInitializedError")
+
+ def testWrite(self):
+ #TODO Write tests to compare output files
+ pass
+
+ def testListServices(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+
+ # Negative Test - sssdconfig not initialized
+ try:
+ sssdconfig.list_services()
+ except SSSDConfig.NotInitializedError:
+ pass
+ else:
+ self.fail("Expected NotInitializedError")
+
+ sssdconfig.new_config()
+
+ control_list = [
+ 'sssd',
+ 'pam',
+ 'nss']
+ service_list = sssdconfig.list_services()
+ for service in control_list:
+ self.assertTrue(service in service_list,
+ "Service [%s] missing" %
+ service)
+ for service in service_list:
+ self.assertTrue(service in control_list,
+ "Service [%s] unexpectedly found" %
+ service)
+
+ def testGetService(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+
+ # Negative Test - Not initialized
+ try:
+ service = sssdconfig.get_service('sssd')
+ except SSSDConfig.NotInitializedError:
+ pass
+ else:
+ self.fail("Expected NotInitializedError")
+
+ sssdconfig.new_config()
+
+ service = sssdconfig.get_service('sssd')
+ self.assertTrue(isinstance(service, SSSDConfig.SSSDService))
+
+ # TODO verify the contents of this service
+
+ # Negative Test - No such service
+ try:
+ service = sssdconfig.get_service('nosuchservice')
+ except SSSDConfig.NoServiceError:
+ pass
+ else:
+ self.fail("Expected NoServiceError")
+
+ def testNewService(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+
+ # Negative Test - Not initialized
+ try:
+ service = sssdconfig.new_service('sssd')
+ except SSSDConfig.NotInitializedError:
+ pass
+ else:
+ self.fail("Expected NotInitializedError")
+
+ sssdconfig.new_config()
+
+ # Positive Test
+ # First need to remove the existing service
+ sssdconfig.delete_service('sssd')
+ service = sssdconfig.new_service('sssd')
+ self.failUnless(service.get_name() in sssdconfig.list_services())
+
+ # TODO: check that the values of this new service
+ # are set to the defaults from the schema
+
+ def testDeleteService(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+
+ # Negative Test - Not initialized
+ try:
+ service = sssdconfig.delete_service('sssd')
+ except SSSDConfig.NotInitializedError:
+ pass
+ else:
+ self.fail("Expected NotInitializedError")
+
+ sssdconfig.new_config()
+
+ # Positive Test
+ service = sssdconfig.delete_service('sssd')
+
+ def testSaveService(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+
+ new_service = SSSDConfig.SSSDService('sssd', sssdconfig.schema)
+
+ # Negative Test - Not initialized
+ try:
+ service = sssdconfig.save_service(new_service)
+ except SSSDConfig.NotInitializedError:
+ pass
+ else:
+ self.fail("Expected NotInitializedError")
+
+ # Positive Test
+ sssdconfig.new_config()
+ sssdconfig.save_service(new_service)
+
+ # TODO: check that all entries were saved correctly (change a few)
+
+ # Negative Test - Type Error
+ try:
+ sssdconfig.save_service(self)
+ except TypeError:
+ pass
+ else:
+ self.fail("Expected TypeError")
+
+ def testListActiveDomains(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+
+ # Negative Test - Not Initialized
+ try:
+ sssdconfig.list_active_domains()
+ except SSSDConfig.NotInitializedError:
+ pass
+ else:
+ self.fail("Expected NotInitializedError")
+
+ # Positive Test
+ sssdconfig.import_config('testconfigs/sssd-valid.conf')
+
+ control_list = [
+ 'IPA',
+ 'LOCAL']
+ active_domains = sssdconfig.list_active_domains()
+
+ for domain in control_list:
+ self.assertTrue(domain in active_domains,
+ "Domain [%s] missing" %
+ domain)
+ for domain in active_domains:
+ self.assertTrue(domain in control_list,
+ "Domain [%s] unexpectedly found" %
+ domain)
+
+ def testListInactiveDomains(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+
+ # Negative Test - Not Initialized
+ try:
+ sssdconfig.list_inactive_domains()
+ except SSSDConfig.NotInitializedError:
+ pass
+ else:
+ self.fail("Expected NotInitializedError")
+
+ # Positive Test
+ sssdconfig.import_config('testconfigs/sssd-valid.conf')
+
+ control_list = [
+ 'PROXY',
+ 'LDAP']
+ inactive_domains = sssdconfig.list_inactive_domains()
+
+ for domain in control_list:
+ self.assertTrue(domain in inactive_domains,
+ "Domain [%s] missing" %
+ domain)
+ for domain in inactive_domains:
+ self.assertTrue(domain in control_list,
+ "Domain [%s] unexpectedly found" %
+ domain)
+
+ def testListDomains(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+
+ # Negative Test - Not Initialized
+ try:
+ sssdconfig.list_domains()
+ except SSSDConfig.NotInitializedError:
+ pass
+ else:
+ self.fail("Expected NotInitializedError")
+
+ # Positive Test
+ sssdconfig.import_config('testconfigs/sssd-valid.conf')
+
+ control_list = [
+ 'IPA',
+ 'LOCAL',
+ 'PROXY',
+ 'LDAP']
+ domains = sssdconfig.list_domains()
+
+ for domain in control_list:
+ self.assertTrue(domain in domains,
+ "Domain [%s] missing" %
+ domain)
+ for domain in domains:
+ self.assertTrue(domain in control_list,
+ "Domain [%s] unexpectedly found" %
+ domain)
+
+ def testGetDomain(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+
+ # Negative Test - Not initialized
+ try:
+ domain = sssdconfig.get_domain('sssd')
+ except SSSDConfig.NotInitializedError:
+ pass
+ else:
+ self.fail("Expected NotInitializedError")
+
+ sssdconfig.import_config('testconfigs/sssd-valid.conf')
+
+ domain = sssdconfig.get_domain('IPA')
+ self.assertTrue(isinstance(domain, SSSDConfig.SSSDDomain))
+
+ # TODO verify the contents of this domain
+
+ # Negative Test - No such domain
+ try:
+ domain = sssdconfig.get_domain('nosuchdomain')
+ except SSSDConfig.NoDomainError:
+ pass
+ else:
+ self.fail("Expected NoDomainError")
+
+ def testNewDomain(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+
+ # Negative Test - Not initialized
+ try:
+ domain = sssdconfig.new_domain('example.com')
+ except SSSDConfig.NotInitializedError:
+ pass
+ else:
+ self.fail("Expected NotInitializedError")
+
+ sssdconfig.new_config()
+
+ # Positive Test
+ domain = sssdconfig.new_domain('example.com')
+ self.assertTrue(isinstance(domain, SSSDConfig.SSSDDomain))
+ self.failUnless(domain.get_name() in sssdconfig.list_domains())
+ self.failUnless(domain.get_name() in sssdconfig.list_inactive_domains())
+
+ # TODO: check that the values of this new domain
+ # are set to the defaults from the schema
+
+ def testDeleteDomain(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+
+ # Negative Test - Not initialized
+ try:
+ sssdconfig.delete_domain('IPA')
+ except SSSDConfig.NotInitializedError:
+ pass
+ else:
+ self.fail("Expected NotInitializedError")
+
+ # Positive Test
+ sssdconfig.import_config('testconfigs/sssd-valid.conf')
+
+ self.assertTrue('IPA' in sssdconfig.list_domains())
+ self.assertTrue('IPA' in sssdconfig.list_active_domains())
+ sssdconfig.delete_domain('IPA')
+ self.assertFalse('IPA' in sssdconfig.list_domains())
+ self.assertFalse('IPA' in sssdconfig.list_active_domains())
+
+ def testSaveDomain(self):
+ sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf",
+ "etc/sssd.api.d")
+ # Negative Test - Not initialized
+ try:
+ sssdconfig.delete_domain('IPA')
+ except SSSDConfig.NotInitializedError:
+ pass
+ else:
+ self.fail("Expected NotInitializedError")
+
+ # Positive Test
+ sssdconfig.new_config()
+ domain = sssdconfig.new_domain('example.com')
+ domain.add_provider('ldap', 'id')
+ domain.set_option('ldap_uri', 'ldap://ldap.example.com')
+ domain.set_active(True)
+ sssdconfig.save_domain(domain)
+
+ self.assertTrue('example.com' in sssdconfig.list_domains())
+ self.assertTrue('example.com' in sssdconfig.list_active_domains())
+ self.assertEqual(sssdconfig.get('domain/example.com', 'ldap_uri'),
+ 'ldap://ldap.example.com')
+
+ # Negative Test - Type Error
+ try:
+ sssdconfig.save_service(self)
+ except TypeError:
+ pass
+ else:
+ self.fail("Expected TypeError")
+
+if __name__ == "__main__":
+ error = 0
+
+ suite = unittest.TestLoader().loadTestsFromTestCase(SSSDConfigTestSSSDService)
+ res = unittest.TextTestRunner(verbosity=99).run(suite)
+ if not res.wasSuccessful():
+ error |= 0x1
+
+ suite = unittest.TestLoader().loadTestsFromTestCase(SSSDConfigTestSSSDDomain)
+ res = unittest.TextTestRunner(verbosity=99).run(suite)
+ if not res.wasSuccessful():
+ error |= 0x2
+
+ suite = unittest.TestLoader().loadTestsFromTestCase(SSSDConfigTestSSSDConfig)
+ res = unittest.TextTestRunner(verbosity=99).run(suite)
+ if not res.wasSuccessful():
+ error |= 0x4
+
+ suite = unittest.TestLoader().loadTestsFromTestCase(SSSDConfigTestValid)
+ res = unittest.TextTestRunner(verbosity=99).run(suite)
+ if not res.wasSuccessful():
+ error |= 0x8
+
+ exit(error)
diff --git a/server/config/etc/sssd.api.conf b/server/config/etc/sssd.api.conf
new file mode 100644
index 00000000..04634ca5
--- /dev/null
+++ b/server/config/etc/sssd.api.conf
@@ -0,0 +1,48 @@
+# Format:
+# option = type, subtype[, default]
+
+[service]
+# Options available to all services
+debug_level = int, None, 0
+command = str, None
+reconnection_retries = int, None, 3
+
+[sssd]
+# Monitor service
+config_file_version = int, None, 2
+services = list, str, nss, pam
+domains = list, str
+sbus_timeout = int, None, -1
+re_expression = str, None, (?P<name>[^@]+)@?(?P<domain>[^@]*$)
+full_name_format = str, None, %1$s@%2$s
+
+[nss]
+# Name service
+nss_enum_cache_timeout = int, None
+nss_entry_cache_timeout = int, None
+nss_entry_cache_no_wait_timeout = int, None
+nss_entry_negative_timeout = int, None
+nss_filter_users = list, str, root
+nss_filter_groups = list, str, root
+nss_filter_users_in_groups = bool, None, true
+
+[pam]
+# Authentication service
+
+[provider]
+#Available provider types
+id_provider = str, None
+auth_provider = str, None
+access_provider = str, None
+chpass_provider = str, None
+
+[domain]
+# Options available to all domains
+debug_level = int, None, 0
+min_id = int, None, 1000
+max_id = int, None
+timeout = int, None, 0
+magic_private_groups = bool, None, false
+enumerate = bool, None, true
+cache_credentials = bool, None, false
+use_fully_qualified_names = bool, None, false
diff --git a/server/config/etc/sssd.api.d/sssd-krb5.conf b/server/config/etc/sssd.api.d/sssd-krb5.conf
new file mode 100644
index 00000000..85067e93
--- /dev/null
+++ b/server/config/etc/sssd.api.d/sssd-krb5.conf
@@ -0,0 +1,13 @@
+[provider/krb5]
+krb5_kdcip = str, None
+krb5_realm = str, None
+krb5_auth_timeout = int, None
+
+[provider/krb5/auth]
+krb5_ccachedir = str, None
+krb5_ccname_template = str, None
+
+[provider/krb5/access]
+
+[provider/krb5/chpass]
+krb5_changepw_principal = str, None \ No newline at end of file
diff --git a/server/config/etc/sssd.api.d/sssd-ldap.conf b/server/config/etc/sssd.api.d/sssd-ldap.conf
new file mode 100644
index 00000000..700de021
--- /dev/null
+++ b/server/config/etc/sssd.api.d/sssd-ldap.conf
@@ -0,0 +1,32 @@
+[provider/ldap]
+ldap_uri = str, None, ldap://localhost
+ldap_schema = str, None, rfc2307
+ldap_default_bind_dn = str, None
+ldap_default_authtok_type = str, None
+ldap_default_authtok = str, None
+ldap_network_timeout = int, None
+ldap_opt_timeout = int, None
+ldap_tls_reqcert = str, None
+
+[provider/ldap/id]
+ldap_user_search_base = str, None
+ldap_user_object_class = str, None
+ldap_user_name = str, None
+ldap_user_uid_number = str, None
+ldap_user_gid_number = str, None
+ldap_user_gecos = str, None
+ldap_user_homedir = str, None
+ldap_user_shell = str, None
+ldap_user_uuid = str, None
+ldap_user_principal = str, None
+ldap_user_fullname = str, None
+ldap_user_memberof = str, None
+ldap_group_search_base = str, None
+ldap_group_object_class = str, None
+ldap_group_name = str, None
+ldap_group_gid_number = str, None
+ldap_group_member = str, None
+ldap_group_UUID = str, None
+ldap_force_upper_case_realm = bool, None
+
+[provider/ldap/auth]
diff --git a/server/config/etc/sssd.api.d/sssd-local.conf b/server/config/etc/sssd.api.d/sssd-local.conf
new file mode 100644
index 00000000..48ffae28
--- /dev/null
+++ b/server/config/etc/sssd.api.d/sssd-local.conf
@@ -0,0 +1,11 @@
+[provider/local]
+
+[provider/local/id]
+default_shell = str, None, /bin/bash
+base_directory = str, None, /home
+
+[provider/local/auth]
+
+[provider/local/access]
+
+[provider/local/chpass] \ No newline at end of file
diff --git a/server/config/testconfigs/noparse.api.conf b/server/config/testconfigs/noparse.api.conf
new file mode 100644
index 00000000..50651001
--- /dev/null
+++ b/server/config/testconfigs/noparse.api.conf
@@ -0,0 +1,7 @@
+# Format:
+# option = type, subtype[, default]
+
+[service]
+# Options available to all services
+debug_level = int, None, 0
+command \ No newline at end of file
diff --git a/server/config/testconfigs/sssd-invalid.conf b/server/config/testconfigs/sssd-invalid.conf
new file mode 100644
index 00000000..3a84ae11
--- /dev/null
+++ b/server/config/testconfigs/sssd-invalid.conf
@@ -0,0 +1,3 @@
+[sssd]
+services
+config_file_version = 2
diff --git a/server/config/testconfigs/sssd-valid.conf b/server/config/testconfigs/sssd-valid.conf
new file mode 100644
index 00000000..6725e0a7
--- /dev/null
+++ b/server/config/testconfigs/sssd-valid.conf
@@ -0,0 +1,42 @@
+[nss]
+nss_filter_groups = root
+nss_entry_negative_timeout = 15
+debug_level = 0
+nss_filter_users_in_groups = true
+nss_filter_users = root
+nss_entry_cache_no_wait_timeout = 60
+nss_entry_cache_timeout = 600
+nss_enum_cache_timeout = 120
+
+[sssd]
+services = nss, pam
+reconnection_retries = 3
+domains = LOCAL, IPA
+config_file_version = 2
+
+[domain/PROXY]
+id_provider = proxy
+auth_provider = proxy
+debug_level = 0
+
+[domain/IPA]
+id_provider = ldap
+auth_provider = krb5
+debug_level = 0
+
+[domain/LOCAL]
+id_provider = local
+auth_provider = local
+debug_level = 0
+
+[domain/LDAP]
+id_provider = ldap
+auth_provider = ldap
+debug_level = 0
+
+[pam]
+debug_level = 0
+
+[dp]
+debug_level = 0
+