diff options
Diffstat (limited to 'server/config')
-rw-r--r-- | server/config/SSSDConfig.py | 1664 | ||||
-rwxr-xr-x | server/config/SSSDConfigTest.py | 1521 | ||||
-rw-r--r-- | server/config/etc/sssd.api.conf | 66 | ||||
-rw-r--r-- | server/config/etc/sssd.api.d/sssd-ipa.conf | 77 | ||||
-rw-r--r-- | server/config/etc/sssd.api.d/sssd-krb5.conf | 13 | ||||
-rw-r--r-- | server/config/etc/sssd.api.d/sssd-ldap.conf | 68 | ||||
-rw-r--r-- | server/config/etc/sssd.api.d/sssd-local.conf | 10 | ||||
-rw-r--r-- | server/config/etc/sssd.api.d/sssd-proxy.conf | 7 | ||||
-rw-r--r-- | server/config/ipachangeconf.py | 588 | ||||
-rw-r--r-- | server/config/setup.py | 35 | ||||
-rw-r--r-- | server/config/testconfigs/noparse.api.conf | 7 | ||||
-rw-r--r-- | server/config/testconfigs/sssd-badversion.conf | 42 | ||||
-rw-r--r-- | server/config/testconfigs/sssd-invalid-badbool.conf | 43 | ||||
-rw-r--r-- | server/config/testconfigs/sssd-invalid.conf | 3 | ||||
-rw-r--r-- | server/config/testconfigs/sssd-noversion.conf | 41 | ||||
-rw-r--r-- | server/config/testconfigs/sssd-valid.conf | 43 | ||||
-rw-r--r-- | server/config/upgrade_config.py | 405 |
17 files changed, 0 insertions, 4633 deletions
diff --git a/server/config/SSSDConfig.py b/server/config/SSSDConfig.py deleted file mode 100644 index a004c33b..00000000 --- a/server/config/SSSDConfig.py +++ /dev/null @@ -1,1664 +0,0 @@ -''' -Created on Sep 18, 2009 - -@author: sgallagh -''' - -import os -import gettext -import exceptions -from ipachangeconf import SSSDChangeConf - -# Exceptions -class SSSDConfigException(Exception): pass -class ParsingError(Exception): pass -class AlreadyInitializedError(SSSDConfigException): pass -class NotInitializedError(SSSDConfigException): pass -class NoOutputFileError(SSSDConfigException): pass -class NoServiceError(SSSDConfigException): pass -class NoSectionError(SSSDConfigException): pass -class NoOptionError(SSSDConfigException): pass -class ServiceNotRecognizedError(SSSDConfigException): pass -class ServiceAlreadyExists(SSSDConfigException): pass -class NoDomainError(SSSDConfigException): pass -class DomainNotRecognized(SSSDConfigException): pass -class DomainAlreadyExistsError(SSSDConfigException): pass -class NoSuchProviderError(SSSDConfigException): pass -class NoSuchProviderSubtypeError(SSSDConfigException): pass -class ProviderSubtypeInUse(SSSDConfigException): pass - -PACKAGE = 'sss_daemon' -LOCALEDIR = '/usr/share/locale' - -translation = gettext.translation(PACKAGE, LOCALEDIR, fallback=True) -_ = translation.ugettext - -# TODO: This needs to be made external -option_strings = { - # [service] - 'debug_level' : _('Set the verbosity of the debug logging'), - 'debug_timestamps' : _('Include timestamps in debug logs'), - 'debug_to_files' : _('Write debug messages to logfiles'), - 'timeout' : _('Ping timeout before restarting service'), - 'command' : _('Command to start service'), - 'reconnection_retries' : _('Number of times to attempt connection to Data Providers'), - - # [sssd] - 'services' : _('SSSD Services to start'), - 'domains' : _('SSSD Domains to start'), - 'sbus_timeout' : _('Timeout for messages sent over the SBUS'), - 're_expression' : _('Regex to parse username and domain'), - 'full_name_format' : _('Printf-compatible format for displaying fully-qualified names'), - - # [nss] - 'enum_cache_timeout' : _('Enumeration cache timeout length (seconds)'), - 'entry_cache_no_wait_timeout' : _('Entry cache background update timeout length (seconds)'), - 'entry_negative_timeout' : _('Negative cache timeout length (seconds)'), - 'filter_users' : _('Users that SSSD should explicitly ignore'), - 'filter_groups' : _('Groups that SSSD should explicitly ignore'), - 'filter_users_in_groups' : _('Should filtered users appear in groups'), - 'pwfield' : _('The value of the password field the NSS provider should return'), - - # [pam] - 'offline_credentials_expiration' : _('How long to allow cached logins between online logins (days)'), - 'offline_failed_login_attempts' : _('How many failed logins attempts are allowed when offline'), - 'offline_failed_login_delay' : _('How long (minutes) to deny login after offline_failed_login_attempts has been reached'), - - # [provider] - 'id_provider' : _('Identity provider'), - 'auth_provider' : _('Authentication provider'), - 'access_provider' : _('Access control provider'), - 'chpass_provider' : _('Password change provider'), - - # [domain] - 'min_id' : _('Minimum user ID'), - 'max_id' : _('Maximum user ID'), - 'timeout' : _('Ping timeout before restarting domain'), - 'enumerate' : _('Enable enumerating all users/groups'), - 'cache_credentials' : _('Cache credentials for offline login'), - 'store_legacy_passwords' : _('Store password hashes'), - 'use_fully_qualified_names' : _('Display users/groups in fully-qualified form'), - 'entry_cache_timeout' : _('Entry cache timeout length (seconds)'), - - # [provider/ipa] - 'ipa_domain' : _('IPA domain'), - 'ipa_server' : _('IPA server address'), - 'ipa_hostname' : _('IPA client hostname'), - - # [provider/krb5] - 'krb5_kdcip' : _('Kerberos server address'), - 'krb5_realm' : _('Kerberos realm'), - 'krb5_auth_timeout' : _('Authentication timeout'), - - # [provider/krb5/auth] - 'krb5_ccachedir' : _('Directory to store credential caches'), - 'krb5_ccname_template' : _("Location of the user's credential cache"), - 'krb5_keytab' : _("Location of the keytab to validate credentials"), - 'krb5_validate' : _("Enable credential validation"), - - # [provider/krb5/chpass] - 'krb5_changepw_principal' : _('The principal of the change password service'), - - # [provider/ldap] - 'ldap_uri' : _('ldap_uri, The URI of the LDAP server'), - 'ldap_search_base' : _('The default base DN'), - 'ldap_schema' : _('The Schema Type in use on the LDAP server, rfc2307'), - 'ldap_default_bind_dn' : _('The default bind DN'), - 'ldap_default_authtok_type' : _('The type of the authentication token of the default bind DN'), - 'ldap_default_authtok' : _('The authentication token of the default bind DN'), - 'ldap_network_timeout' : _('Length of time to attempt connection'), - 'ldap_opt_timeout' : _('Length of time to attempt synchronous LDAP operations'), - 'ldap_offline_timeout' : _('Length of time between attempts to reconnect while offline'), - 'ldap_tls_cacert' : _('file that contains CA certificates'), - 'ldap_tls_reqcert' : _('Require TLS certificate verification'), - 'ldap_sasl_mech' : _('Specify the sasl mechanism to use'), - 'ldap_sasl_authid' : _('Specify the sasl authorization id to use'), - 'krb5_kdcip' : _('Kerberos server address'), - 'krb5_realm' : _('Kerberos realm'), - 'ldap_krb5_keytab' : _('Kerberos service keytab'), - 'ldap_krb5_init_creds' : _('Use Kerberos auth for LDAP connection'), - 'ldap_referrals' : _('Follow LDAP referrals'), - - # [provider/ldap/id] - 'ldap_search_timeout' : _('Length of time to wait for a search request'), - 'ldap_enumeration_refresh_timeout' : _('Length of time between enumeration updates'), - 'ldap_id_use_start_tls' : _('Require TLS for ID lookups, false'), - 'ldap_user_search_base' : _('Base DN for user lookups'), - 'ldap_user_search_scope' : _('Scope of user lookups'), - 'ldap_user_search_filter' : _('Filter for user lookups'), - 'ldap_user_object_class' : _('Objectclass for users'), - 'ldap_user_name' : _('Username attribute'), - 'ldap_user_uid_number' : _('UID attribute'), - 'ldap_user_gid_number' : _('Primary GID attribute'), - 'ldap_user_gecos' : _('GECOS attribute'), - 'ldap_user_homedir' : _('Home directory attribute'), - 'ldap_user_shell' : _('Shell attribute'), - 'ldap_user_uuid' : _('UUID attribute'), - 'ldap_user_principal' : _('User principal attribute (for Kerberos)'), - 'ldap_user_fullname' : _('Full Name'), - 'ldap_user_member_of' : _('memberOf attribute'), - 'ldap_user_modify_timestamp' : _('Modification time attribute'), - - # [provider/ldap/auth] - 'ldap_pwd_policy' : _('Policy to evaluate the password expiration'), - - # [provider/local/id] - 'default_shell' : _('Default shell, /bin/bash'), - 'base_directory' : _('Base for home directories'), - - # [provider/proxy/id] - 'proxy_lib_name' : _('The name of the NSS library to use'), - - # [provider/proxy/auth] - 'proxy_pam_target' : _('PAM stack to use') -} - -def striplist(l): - return([x.strip() for x in l]) - -def options_overlap(options1, options2): - overlap = [] - for option in options1: - if option in options2: - overlap.append(option) - return overlap - -class SSSDConfigSchema(SSSDChangeConf): - def __init__(self, schemafile, schemaplugindir): - SSSDChangeConf.__init__(self) - #TODO: get these from a global setting - if not schemafile: - schemafile = '/etc/sssd/sssd.api.conf' - if not schemaplugindir: - schemaplugindir = '/etc/sssd/sssd.api.d' - - try: - #Read the primary config file - fd = open(schemafile, 'r') - self.readfp(fd) - fd.close() - # Read in the provider files - for file in os.listdir(schemaplugindir): - fd = open(schemaplugindir+ "/" + file) - self.readfp(fd) - fd.close() - except IOError: - raise - except SyntaxError: # can be raised with readfp - raise ParsingError - - # Set up lookup table for types - self.type_lookup = { - 'bool' : bool, - 'int' : int, - 'long' : long, - 'float': float, - 'str' : str, - 'list' : list, - 'None' : None - } - - # Lookup table for acceptable boolean values - self.bool_lookup = { - 'false' : False, - 'true' : True, - } - - def get_options(self, section): - if not self.has_section(section): - raise NoSectionError - options = self.options(section) - - # Indexes - PRIMARY_TYPE = 0 - SUBTYPE = 1 - MANDATORY = 2 - DEFAULT = 3 - - # Parse values - parsed_options = {} - for option in self.strip_comments_empty(options): - unparsed_option = option['value'] - split_option = striplist(unparsed_option.split(',')) - optionlen = len(split_option) - - primarytype = self.type_lookup[split_option[PRIMARY_TYPE]] - subtype = self.type_lookup[split_option[SUBTYPE]] - mandatory = self.bool_lookup[split_option[MANDATORY]] - - if option_strings.has_key(option['name']): - desc = option_strings[option['name']] - else: - desc = None - - if optionlen == 3: - # This option has no defaults - parsed_options[option['name']] = \ - (primarytype, - subtype, - mandatory, - desc, - None) - elif optionlen == 4: - if type(split_option[DEFAULT]) == primarytype: - parsed_options[option['name']] = \ - (primarytype, - subtype, - mandatory, - desc, - split_option[DEFAULT]) - elif primarytype == list: - if (type(split_option[DEFAULT]) == subtype): - parsed_options[option['name']] = \ - (primarytype, - subtype, - mandatory, - desc, - [split_option[DEFAULT]]) - else: - try: - if subtype == bool and \ - type(split_option[DEFAULT]) == str: - parsed_options[option['name']] = \ - (primarytype, - subtype, - mandatory, - desc, - [self.bool_lookup[split_option[DEFAULT].lower()]]) - else: - parsed_options[option['name']] = \ - (primarytype, - subtype, - mandatory, - desc, - [subtype(split_option[DEFAULT])]) - except ValueError, KeyError: - raise ParsingError - else: - try: - if primarytype == bool and \ - type(split_option[DEFAULT]) == str: - parsed_options[option['name']] = \ - (primarytype, - subtype, - mandatory, - desc, - self.bool_lookup[split_option[DEFAULT].lower()]) - else: - parsed_options[option['name']] = \ - (primarytype, - subtype, - mandatory, - desc, - primarytype(split_option[DEFAULT])) - except ValueError, KeyError: - raise ParsingError - - elif optionlen > 4: - if (primarytype != list): - raise ParsingError - fixed_options = [] - for x in split_option[DEFAULT:]: - if type(x) != subtype: - try: - if (subtype == bool and type(x) == str): - newvalue = self.bool_lookup[x.lower()] - else: - newvalue = subtype(x) - fixed_options.extend([newvalue]) - except ValueError, KeyError: - raise ParsingError - else: - fixed_options.extend([x]) - parsed_options[option['name']] = \ - (primarytype, - subtype, - mandatory, - desc, - fixed_options) - else: - # Bad config file - raise ParsingError - - return parsed_options - - def get_option(self, section, option): - if not self.has_section(section): - raise NoSectionError(section) - if not self.has_option(section, option): - raise NoOptionError("Section [%s] has no option [%s]" % - (section, option)) - - return self.get_options(section)[option] - - def get_defaults(self, section): - if not self.has_section(section): - raise NoSectionError(section) - - schema_options = self.get_options(section) - defaults = dict([(x,schema_options[x][4]) - for x in schema_options.keys() - if schema_options[x][4] != None]) - - return defaults - - def get_services(self): - service_list = [x['name'] for x in self.sections() - if x['name'] != 'service' and - not x['name'].startswith('domain') and - not x['name'].startswith('provider')] - return service_list - - def get_providers(self): - providers = {} - for section in self.sections(): - splitsection = section['name'].split('/') - if (splitsection[0] == 'provider'): - if(len(splitsection) == 3): - if not providers.has_key(splitsection[1]): - providers[splitsection[1]] = [] - providers[splitsection[1]].extend([splitsection[2]]) - for key in providers.keys(): - providers[key] = tuple(providers[key]) - return providers - -class SSSDConfigObject(object): - def __init__(self): - self.name = None - self.options = {} - - def get_name(self): - """ - Return the name of the this object - - === Returns === - The domain name - - === Errors === - No errors - """ - return self.name - - def get_option(self, optionname): - """ - Return the value of an service option - - optionname: - The option to get. - - === Returns === - The value for the requested option. - - === Errors === - NoOptionError: - The specified option was not listed in the service - """ - if optionname in self.options.keys(): - return self.options[optionname] - raise NoOptionError(optionname) - - def get_all_options(self): - """ - Return a dictionary of name/value pairs for this object - - === Returns === - A dictionary of name/value pairs currently in use for this object - - === Errors === - No errors - """ - return self.options - - def remove_option(self, optionname): - """ - Remove an option from the object. If the option does not exist, it is ignored. - - === Returns === - No return value. - - === Errors === - No errors - """ - if self.options.has_key(optionname): - del self.options[optionname] - -class SSSDService(SSSDConfigObject): - ''' - Object to manipulate SSSD service options - ''' - - def __init__(self, servicename, apischema): - """ - Create a new SSSDService, setting its defaults to those found in the - schema. This constructor should not be used directly. Use - SSSDConfig.new_service() instead. - - name: - The service name - apischema: - An SSSDConfigSchema? object created by SSSDConfig.__init__() - - === Returns === - The newly-created SSSDService object. - - === Errors === - TypeError: - The API schema passed in was unusable or the name was not a string. - ServiceNotRecognizedError: - The service was not listed in the schema - """ - SSSDConfigObject.__init__(self) - - if not isinstance(apischema, SSSDConfigSchema) or type(servicename) != str: - raise TypeError - - if not apischema.has_section(servicename): - raise ServiceNotRecognizedError(servicename) - - self.name = servicename - self.schema = apischema - - # Set up the service object with any known defaults - self.options = {} - - # Include a list of hidden options - self.hidden_options = [] - - # Set up default options for all services - self.options.update(self.schema.get_defaults('service')) - - # Set up default options for this service - self.options.update(self.schema.get_defaults(self.name)) - - # For the [sssd] service, force the config file version - if servicename == 'sssd': - self.options['config_file_version'] = 2 - self.hidden_options.append('config_file_version') - - def list_options_with_mandatory(self): - """ - List options for the service, including the mandatory flag. - - === Returns === - A dictionary of configurable options. This dictionary is keyed on the - option name with a tuple of the variable type, subtype ('None' if the - type is not a collection type), whether it is mandatory, the - translated option description, and the default value (or 'None') as - the value. - - Example: - { 'enumerate' : - (bool, None, False, u'Enable enumerating all users/groups', True) } - - === Errors === - No errors - """ - options = {} - - # Get the list of available options for all services - schema_options = self.schema.get_options('service') - options.update(schema_options) - - schema_options = self.schema.get_options(self.name) - options.update(schema_options) - - return options - - def list_options(self): - """ - List all options that apply to this service - - === Returns === - A dictionary of configurable options. This dictionary is keyed on the - option name with a tuple of the variable type, subtype ('None' if the - type is not a collection type), the translated option description, and - the default value (or 'None') as the value. - - Example: - { 'services' : - (list, str, u'SSSD Services to start', ['nss', 'pam']) } - - === Errors === - No Errors - """ - options = self.list_options_with_mandatory() - - # Filter out the mandatory field to maintain compatibility - # with older versions of the API - filtered_options = {} - for key in options.keys(): - filtered_options[key] = (options[key][0], options[key][1], options[key][3], options[key][4]) - - return filtered_options - - def list_mandatory_options(self): - """ - List all mandatory options that apply to this service - - === Returns === - A dictionary of configurable options. This dictionary is keyed on the - option name with a tuple of the variable type, subtype ('None' if the - type is not a collection type), the translated option description, and - the default value (or 'None') as the value. - - Example: - { 'services' : - (list, str, u'SSSD Services to start', ['nss', 'pam']) } - - === Errors === - No Errors - """ - options = self.list_options_with_mandatory() - - # Filter out the mandatory field to maintain compatibility - # with older versions of the API - filtered_options = {} - for key in options.keys(): - if options[key][2]: - filtered_options[key] = (options[key][0], options[key][1], options[key][3], options[key][4]) - - return filtered_options - - def set_option(self, optionname, value): - """ - Set a service option to the specified value (or values) - - optionname: - The option to change - value: - The value to set. This may be a single value or a list of values. If - it is set to None, it resets the option to its default. - - === Returns === - No return value - - === Errors === - NoOptionError: - The specified option is not listed in the schema - TypeError: - The value specified was not of the expected type - """ - if self.schema.has_option(self.name, optionname): - option_schema = self.schema.get_option(self.name, optionname) - elif self.schema.has_option('service', optionname): - option_schema = self.schema.get_option('service', optionname) - elif optionname in self.hidden_options: - # Set this option and do not add it to the list of changeable values - self.options[optionname] = value - return - else: - raise NoOptionError('Section [%s] has no option [%s]' % (self.name, optionname)) - - if value == None: - self.remove_option(optionname) - return - - raise_error = False - - # If we were expecting a list and didn't get one, - # Create a list with a single entry. If it's the - # wrong subtype, it will fail below - if option_schema[0] == list and type(value) != list: - if type(value) == str: - value = striplist(value.split(',')) - else: - value = [value] - - if type(value) != option_schema[0]: - # If it's possible to convert it, do so - try: - if option_schema[0] == bool and \ - type(value) == str: - value = self.schema.bool_lookup[value.lower()] - else: - value = option_schema[0](value) - except ValueError: - raise_error = True - except KeyError: - raise_error = True - - if raise_error: - raise TypeError('Expected %s for %s, received %s' % - (option_schema[0], optionname, type(value))) - - if type(value) == list: - # Iterate through the list an ensure that all members - # are of the appropriate subtype - try: - newvalue = [] - for x in value: - if option_schema[1] == bool and \ - type(x) == str: - newvalue.extend([self.schema.bool_lookup[x.lower()]]) - else: - newvalue.extend([option_schema[1](x)]) - except ValueError: - raise_error = True - except KeyError: - raise_error = True - - if raise_error: - raise TypeError('Expected %s' % option_schema[1]) - - value = newvalue - - self.options[optionname] = value - -class SSSDDomain(SSSDConfigObject): - """ - Object to manipulate SSSD domain options - """ - def __init__(self, domainname, apischema): - """ - Creates a new, empty SSSDDomain. This domain is inactive by default. - This constructor should not be used directly. Use - SSSDConfig.new_domain() instead. - - name: - The domain name. - apischema: - An SSSDConfigSchema object created by SSSDConfig.__init__() - - === Returns === - The newly-created SSSDDomain object. - - === Errors === - TypeError: - apischema was not an SSSDConfigSchema object or domainname was not - a string - """ - SSSDConfigObject.__init__(self) - - if not isinstance(apischema, SSSDConfigSchema) or type(domainname) != str: - raise TypeError - - self.name = domainname - self.schema = apischema - self.active = False - self.oldname = None - self.providers = [] - - # Set up the domain object with any known defaults - self.options = {} - - # Set up default options for all domains - self.options.update(self.schema.get_defaults('provider')) - self.options.update(self.schema.get_defaults('domain')) - - def set_active(self, active): - """ - Enable or disable this domain - - active: - Boolean value. If True, this domain will be added to the active - domains list when it is saved. If False, it will be removed from the - active domains list when it is saved. - - === Returns === - No return value - - === Errors === - No errors - """ - self.active = bool(active) - - def list_options_with_mandatory(self): - """ - List options for the currently-configured providers, including the - mandatory flag - - === Returns === - A dictionary of configurable options. This dictionary is keyed on the - option name with a tuple of the variable type, subtype ('None' if the - type is not a collection type), whether it is mandatory, the - translated option description, and the default value (or 'None') as - the value. - - Example: - { 'enumerate' : - (bool, None, False, u'Enable enumerating all users/groups', True) } - - === Errors === - No errors - """ - options = {} - # Get the list of available options for all domains - options.update(self.schema.get_options('provider')) - - options.update(self.schema.get_options('domain')) - - # Candidate for future optimization: will update primary type - # for each subtype - for (provider, providertype) in self.providers: - schema_options = self.schema.get_options('provider/%s' - % provider) - options.update(schema_options) - schema_options = self.schema.get_options('provider/%s/%s' - % (provider, providertype)) - options.update(schema_options) - return options - - def list_options(self): - """ - List options available for the currently-configured providers. - - === Returns === - A dictionary of configurable options. This dictionary is keyed on the - option name with a tuple of the variable type, subtype ('None' if the - type is not a collection type), the translated option description, and - the default value (or 'None') as the value. - - Example: - { 'enumerate' : - (bool, None, u'Enable enumerating all users/groups', True) } - - === Errors === - No errors - """ - options = self.list_options_with_mandatory() - - # Filter out the mandatory field to maintain compatibility - # with older versions of the API - filtered_options = {} - for key in options.keys(): - filtered_options[key] = (options[key][0], options[key][1], options[key][3], options[key][4]) - - return filtered_options - - def list_mandatory_options(self): - """ - List mandatory options for the currently-configured providers. - - === Returns === - A dictionary of configurable options. This dictionary is keyed on the - option name with a tuple of the variable type, subtype ('None' if the - type is not a collection type), the translated option description, and - the default value (or 'None') as the value. - - Example: - { 'enumerate' : - (bool, None, u'Enable enumerating all users/groups', True) } - - === Errors === - No errors - """ - options = self.list_options_with_mandatory() - - # Filter out the mandatory field to maintain compatibility - # with older versions of the API - filtered_options = {} - for key in options.keys(): - if options[key][2]: - filtered_options[key] = (options[key][0], options[key][1], options[key][3], options[key][4]) - - return filtered_options - - def list_provider_options(self, provider, provider_type=None): - """ - If provider_type is specified, list all options applicable to that - target, otherwise list all possible options available for a provider. - - type: - Provider backend type. (e.g. local, ldap, krb5, etc.) - provider_type: - Subtype of the backend type. (e.g. id, auth, access, chpass) - - === Returns === - - A dictionary of configurable options for the specified provider type. - This dictionary is keyed on the option name with a tuple of the - variable type, subtype ('None' if the type is not a collection type), - the translated option description, and the default value (or 'None') - as the value. - - === Errors === - - NoSuchProviderError: - The specified provider is not listed in the schema or plugins - NoSuchProviderSubtypeError: - The specified provider subtype is not listed in the schema - """ - #TODO section checking - - options = self.schema.get_options('provider/%s' % provider) - if(provider_type): - options.update(self.schema.get_options('provider/%s/%s' % - (provider, provider_type))) - else: - # Add options from all provider subtypes - known_providers = self.list_providers() - for provider_type in known_providers[provider]: - options.update(self.list_provider_options(provider, - provider_type)) - return options - - def list_providers(self): - """ - Return a dictionary of providers. - - === Returns === - Returns a dictionary of providers, keyed on the primary type, with the - value being a tuple of the subtypes it supports. - - Example: - { 'ldap' : ('id', 'auth', 'chpass') } - - === Errors === - No Errors - """ - return self.schema.get_providers() - - def set_option(self, option, value): - """ - Set a domain option to the specified value (or values) - - option: - The option to change. - value: - The value to set. This may be a single value or a list of values. - If it is set to None, it resets the option to its default. - - === Returns === - No return value. - - === Errors === - NoOptionError: - The specified option is not listed in the schema - TypeError: - The value specified was not of the expected type - """ - options = self.list_options() - if (option not in options.keys()): - raise NoOptionError('Section [%s] has no option [%s]' % - (self.name, option)) - - if value == None: - self.remove_option(option) - return - - option_schema = options[option] - raise_error = False - - # If we were expecting a list and didn't get one, - # Create a list with a single entry. If it's the - # wrong subtype, it will fail below - if option_schema[0] == list and type(value) != list: - if type(value) == str: - value = striplist(value.split(',')) - else: - value = [value] - - if type(value) != option_schema[0]: - # If it's possible to convert it, do so - try: - if option_schema[0] == bool and \ - type(value) == str: - value = self.schema.bool_lookup[value.lower()] - else: - value = option_schema[0](value) - except ValueError: - raise_error = True - except KeyError: - raise_error = True - - if raise_error: - raise TypeError('Expected %s for %s, received %s' % - (option_schema[0], option, type(value))) - - if type(value) == list: - # Iterate through the list an ensure that all members - # are of the appropriate subtype - try: - newvalue = [] - for x in value: - if option_schema[1] == bool and \ - type(x) == str: - newvalue.extend([self.schema.bool_lookup[x.lower()]]) - else: - newvalue.extend([option_schema[1](x)]) - except ValueError: - raise_error = True - except KeyError: - raise_error = True - - if raise_error: - raise TypeError('Expected %s' % option_schema[1]) - value = newvalue - - # Check whether we're adding a provider entry. - is_provider = option.rfind('_provider') - if (is_provider > 0): - provider = option[:is_provider] - self.add_provider(value, provider) - else: - self.options[option] = value - - def set_name(self, newname): - """ - Change the name of the domain - - newname: - New name for this domain - - === Returns === - No return value. - - === Errors === - TypeError: - newname was not a string - """ - - if type(newname) != str: - raise TypeError - - if not self.oldname: - # Only set the oldname once - self.oldname = self.name - self.name = newname - - def add_provider(self, provider, provider_type): - """ - Add a new provider type to the domain - - type: - Provider backend type. (e.g. local, ldap, krb5, etc.) - subtype: - Subtype of the backend type. (e.g. id, auth, chpass) - - === Returns === - No return value. - - === Errors === - ProviderSubtypeInUse: - Another backend is already providing this subtype - NoSuchProviderError: - The specified provider is not listed in the schema or plugins - NoSuchProviderSubtypeError: - The specified provider subtype is not listed in the schema - """ - # Check that provider and provider_type are valid - configured_providers = self.list_providers() - if provider in configured_providers.keys(): - if provider_type not in configured_providers[provider]: - raise NoSuchProviderSubtypeError(provider_type) - else: - raise NoSuchProviderError - - # Don't add a provider twice - with_this_type = [x for x in self.providers if x[1] == provider_type] - if len(with_this_type) > 1: - # This should never happen! - raise ProviderSubtypeInUse - if len(with_this_type) == 1: - if with_this_type[0][0] != provider: - raise ProviderSubtypeInUse(with_this_type[0][0]) - else: - self.providers.extend([(provider, provider_type)]) - - option_name = '%s_provider' % provider_type - self.options[option_name] = provider - - # Add defaults for this provider - self.options.update(self.schema.get_defaults('provider/%s' % - provider)) - self.options.update(self.schema.get_defaults('provider/%s/%s' % - (provider, - provider_type))) - - def remove_provider(self, provider_type): - """ - Remove a provider from the domain. If the provider is not present, it - is ignored. - - provider_type: - Subtype of the backend type. (e.g. id, auth, chpass) - - === Returns === - No return value. - - === Errors === - No Errors - """ - - provider = None - for (provider, ptype) in self.providers: - if ptype == provider_type: - break - provider = None - - # Check whether the provider_type was found - if not provider: - return - - # Remove any unused options when removing the provider. - options = self.list_provider_options(provider, provider_type) - - # Trim any options that are used by other providers, - # if that provider is in use - for (prov, ptype) in self.providers: - # Ignore the one being removed - if (prov, ptype) == (provider, provider_type): - continue - - provider_options = self.list_provider_options(prov, ptype) - overlap = options_overlap(options.keys(), provider_options.keys()) - for opt in overlap: - del options[opt] - - # We should now have a list of options used only by this - # provider. So we remove them. - for option in options: - if self.options.has_key(option): - del self.options[option] - - self.providers.remove((provider, provider_type)) - -class SSSDConfig(SSSDChangeConf): - """ - class SSSDConfig - Primary class for operating on SSSD configurations - """ - def __init__(self, schemafile=None, schemaplugindir=None): - """ - Initialize the SSSD config parser/editor. This constructor does not - open or create a config file. If the schemafile and schemaplugindir - are not passed, it will use the system defaults. - - schemafile: - The path to the api schema config file. Usually - /etc/sssd/sssd.api.conf - schemaplugindir: - The path the directory containing the provider schema config files. - Usually /etc/sssd/sssd.api.d - - === Returns === - The newly-created SSSDConfig object. - - === Errors === - IOError: - Exception raised when the schema file could not be opened for - reading. - ParsingError: - The main schema file or one of those in the plugin directory could - not be parsed. - """ - SSSDChangeConf.__init__(self) - self.schema = SSSDConfigSchema(schemafile, schemaplugindir) - self.configfile = None - self.initialized = False - self.API_VERSION = 2 - - def import_config(self,configfile=None): - """ - Read in a config file, populating all of the service and domain - objects with the read values. - - configfile: - The path to the SSSD config file. If not specified, use the system - default, usually /etc/sssd/sssd.conf - - === Returns === - No return value - - === Errors === - IOError: - Exception raised when the file could not be opened for reading - ParsingError: - Exception raised when errors occur attempting to parse a file. - AlreadyInitializedError: - This SSSDConfig object was already initialized by a call to - import_config() or new_config() - """ - if self.initialized: - raise AlreadyInitializedError - - if not configfile: - #TODO: get this from a global setting - configfile = '/etc/sssd/sssd.conf' - # open will raise an IOError if it fails - fd = open(configfile, 'r') - - try: - self.readfp(fd) - except: - raise ParsingError - - fd.close() - self.configfile = configfile - self.initialized = True - - try: - if int(self.get('sssd', 'config_file_version')) != self.API_VERSION: - raise ParsingError("Wrong config_file_version") - except: - # Either the 'sssd' section or the 'config_file_version' was not - # present in the config file - raise ParsingError("File contains no config_file_version") - - def new_config(self): - """ - Initialize the SSSDConfig object with the defaults from the schema. - - === Returns === - No return value - - === Errors === - AlreadyInitializedError: - This SSSDConfig object was already initialized by a call to - import_config() or new_config() - """ - if self.initialized: - raise AlreadyInitializedError - - self.initialized = True - - #Initialize all services - for servicename in self.schema.get_services(): - service = self.new_service(servicename) - - def write(self, outputfile=None): - """ - Write out the configuration to a file. - - outputfile: - The path to write the new config file. If it is not specified, it - will use the path specified by the import() call. - === Returns === - No return value - - === Errors === - IOError: - Exception raised when the file could not be opened for writing - NotInitializedError: - This SSSDConfig object has not had import_config() or new_config() - run on it yet. - NoOutputFileError: - No outputfile was specified and this SSSDConfig object was not - initialized by import() - """ - if not self.initialized: - raise NotInitializedError - - if outputfile == None: - if(self.configfile == None): - raise NoOutputFileError - - outputfile = self.configfile - - # open() will raise IOError if it fails - of = open(outputfile, "wb") - output = self.dump(self.opts) - of.write(output) - of.close() - - def list_services(self): - """ - Retrieve a list of known services. - - === Returns === - The list of known services. - - === Errors === - NotInitializedError: - This SSSDConfig object has not had import_config() or new_config() - run on it yet. - """ - if not self.initialized: - raise NotInitializedError - - service_list = [x['name'] for x in self.sections() - if not x['name'].startswith('domain') ] - return service_list - - def get_service(self, name): - """ - Get an SSSDService object to edit a service. - - name: - The name of the service to return. - - === Returns === - An SSSDService instance containing the current state of a service in - the SSSDConfig - - === Errors === - NoServiceError: - There is no such service with the specified name in the SSSDConfig. - NotInitializedError: - This SSSDConfig object has not had import_config() or new_config() - run on it yet. - """ - if not self.initialized: - raise NotInitializedError - if not self.has_section(name): - raise NoServiceError - - service = SSSDService(name, self.schema) - [service.set_option(opt['name'], opt['value']) - for opt in self.strip_comments_empty(self.options(name)) ] - - return service - - def new_service(self, name): - """ - Create a new service from the defaults and return the SSSDService - object for it. This function will also add this service to the list of - active services in the [SSSD] section. - - name: - The name of the service to create and return. - - === Returns === - The newly-created SSSDService object - - === Errors === - ServiceNotRecognizedError: - There is no such service in the schema. - ServiceAlreadyExistsError: - The service being created already exists in the SSSDConfig object. - NotInitializedError: - This SSSDConfig object has not had import_config() or new_config() - run on it yet. - """ - if not self.initialized: - raise NotInitializedError - if (self.has_section(name)): - raise ServiceAlreadyExists(name) - - service = SSSDService(name, self.schema) - self.save_service(service) - return service - - def delete_service(self, name): - """ - Remove a service from the SSSDConfig object. This function will also - remove this service from the list of active services in the [SSSD] - section. Has no effect if the service does not exist. - - === Returns === - No return value - - === Errors === - NotInitializedError: - This SSSDConfig object has not had import_config() or new_config() - run on it yet. - """ - if not self.initialized: - raise NotInitializedError - self.delete_option('section', name) - - def save_service(self, service): - """ - Save the changes made to the service object back to the SSSDConfig - object. - - service_object: - The SSSDService object to save to the configuration. - - === Returns === - No return value - === Errors === - NotInitializedError: - This SSSDConfig object has not had import_config() or new_config() - run on it yet. - TypeError: - service_object was not of the type SSSDService - """ - if not self.initialized: - raise NotInitializedError - if not isinstance(service, SSSDService): - raise TypeError - - name = service.get_name() - # Ensure that the existing section is removed - # This way we ensure that we are getting a - # complete copy of the service. - # delete_option() is a noop if the section - # does not exist. - index = self.delete_option('section', name) - - addkw = [] - for option,value in service.get_all_options().items(): - if (type(value) == list): - value = ', '.join(value) - addkw.append( { 'type' : 'option', - 'name' : option, - 'value' : str(value) } ) - - self.add_section(name, addkw, index) - - def list_active_domains(self): - """ - Return a list of all active domains. - - === Returns === - The list of configured, active domains. - - === Errors === - NotInitializedError: - This SSSDConfig object has not had import_config() or new_config() - run on it yet. - """ - if not self.initialized: - raise NotInitializedError - - if (self.has_option('sssd', 'domains')): - active_domains = striplist(self.get('sssd', 'domains').split(',')) - domain_dict = dict.fromkeys(active_domains) - if domain_dict.has_key(''): - del domain_dict[''] - - # Remove any entries in this list that don't - # correspond to an active domain, for integrity - configured_domains = self.list_domains() - for dom in domain_dict.keys(): - if dom not in configured_domains: - del domain_dict[dom] - - active_domains = domain_dict.keys() - else: - active_domains = [] - - return active_domains - - def list_inactive_domains(self): - """ - Return a list of all configured, but disabled domains. - - === Returns === - The list of configured, inactive domains. - - === Errors === - NotInitializedError: - This SSSDConfig object has not had import_config() or new_config() - run on it yet. - """ - if not self.initialized: - raise NotInitializedError - - if (self.has_option('sssd', 'domains')): - active_domains = striplist(self.get('sssd', 'domains').split(',')) - else: - active_domains = [] - - domains = [x for x in self.list_domains() - if x not in active_domains] - return domains - - def list_domains(self): - """ - Return a list of all configured domains, including inactive domains. - - === Returns === - The list of configured domains, both active and inactive. - - === Errors === - NotInitializedError: - This SSSDConfig object has not had import_config() or new_config() - run on it yet. - """ - if not self.initialized: - raise NotInitializedError - domains = [x['name'][7:] for x in self.sections() if x['name'].startswith('domain/')] - return domains - - def get_domain(self, name): - """ - Get an SSSDDomain object to edit a domain. - - name: - The name of the domain to return. - - === Returns === - An SSSDDomain instance containing the current state of a domain in the - SSSDConfig - - === Errors === - NoDomainError: - There is no such domain with the specified name in the SSSDConfig. - NotInitializedError: - This SSSDConfig object has not had import_config() or new_config() - run on it yet. - """ - if not self.initialized: - raise NotInitializedError - if not self.has_section('domain/%s' % name): - raise NoDomainError(name) - - domain = SSSDDomain(name, self.schema) - - # Read in the providers first or we may have type - # errors trying to read in their options - providers = [ (x['name'],x['value']) for x in self.strip_comments_empty(self.options('domain/%s' % name)) - if x['name'].rfind('_provider') > 0] - [domain.set_option(option, value) - for (option, value) in providers] - - [domain.set_option(opt['name'], opt['value']) - for opt in self.strip_comments_empty(self.options('domain/%s' % name)) - if opt not in providers] - - # Determine if this domain is currently active - domain.active = self.is_domain_active(name) - - return domain - - def new_domain(self, name): - """ - Create a new, empty domain and return the SSSDDomain object for it. - - name: - The name of the domain to create and return. - - === Returns === - The newly-created SSSDDomain object - - === Errors === - DomainAlreadyExistsError: - The service being created already exists in the SSSDConfig object. - NotInitializedError: - This SSSDConfig object has not had import_config() or new_config() - run on it yet. - """ - if not self.initialized: - raise NotInitializedError - if self.has_section('domain/%s' % name): - raise DomainAlreadyExistsError - - domain = SSSDDomain(name, self.schema) - self.save_domain(domain) - return domain - - def is_domain_active(self, name): - """ - Is a particular domain set active - - name: - The name of the configured domain to check - - === Returns === - True if the domain is active, False if it is inactive - - === Errors === - NotInitializedError: - This SSSDConfig object has not had import_config() or new_config() - run on it yet. - NoDomainError: - No domain by this name is configured - """ - - if not self.initialized: - raise NotInitializedError - - if name not in self.list_domains(): - raise NoDomainError - - return name in self.list_active_domains() - - def activate_domain(self, name): - """ - Activate a configured domain - - name: - The name of the configured domain to activate - - === Returns === - No return value - - === Errors === - NotInitializedError: - This SSSDConfig object has not had import_config() or new_config() - run on it yet. - NoDomainError: - No domain by this name is configured - """ - - if not self.initialized: - raise NotInitializedError - - if name not in self.list_domains(): - raise NoDomainError - - item = self.get_option_index('sssd', 'domains')[1] - if not item: - self.set('sssd','domains', name) - return - - # Turn the items into a set of dictionary keys - # This guarantees uniqueness and makes it easy - # to add a new value - domain_dict = dict.fromkeys(striplist(item['value'].split(','))) - if domain_dict.has_key(''): - del domain_dict[''] - - # Add a new key for the domain being activated - domain_dict[name] = None - - # Write out the joined keys - self.set('sssd','domains', ", ".join(domain_dict.keys())) - - def deactivate_domain(self, name): - """ - Deactivate a configured domain - - name: - The name of the configured domain to deactivate - - === Returns === - No return value - - === Errors === - NotInitializedError: - This SSSDConfig object has not had import_config() or new_config() - run on it yet. - NoDomainError: - No domain by this name is configured - """ - - if not self.initialized: - raise NotInitializedError - - if name not in self.list_domains(): - raise NoDomainError - item = self.get_option_index('sssd', 'domains')[1] - if not item: - self.set('sssd','domains', '') - return - - # Turn the items into a set of dictionary keys - # This guarantees uniqueness and makes it easy - # to remove the one unwanted value. - domain_dict = dict.fromkeys(striplist(item['value'].split(','))) - if domain_dict.has_key(''): - del domain_dict[''] - - # Remove the unwanted domain from the lest - if domain_dict.has_key(name): - del domain_dict[name] - - # Write out the joined keys - self.set('sssd','domains', ", ".join(domain_dict.keys())) - - def delete_domain(self, name): - """ - Remove a domain from the SSSDConfig object. This function will also - remove this domain from the list of active domains in the [SSSD] - section, if it is there. - - === Returns === - No return value - - === Errors === - NotInitializedError: - This SSSDConfig object has not had import_config() or new_config() - run on it yet. - """ - if not self.initialized: - raise NotInitializedError - - # Remove the domain from the active domains list if applicable - self.deactivate_domain(name) - self.delete_option('section', 'domain/%s' % name) - - def save_domain(self, domain): - """ - Save the changes made to the domain object back to the SSSDConfig - object. If this domain is marked active, ensure it is present in the - active domain list in the [SSSD] section - - domain_object: - The SSSDDomain object to save to the configuration. - - === Returns === - No return value - - === Errors === - NotInitializedError: - This SSSDConfig object has not had import_config() or new_config() - run on it yet. - TypeError: - domain_object was not of type SSSDDomain - """ - if not self.initialized: - raise NotInitializedError - if not isinstance(domain, SSSDDomain): - raise TypeError - - name = domain.get_name() - - oldindex = None - if domain.oldname and domain.oldname != name: - # We are renaming this domain - # Remove the old section - - self.deactivate_domain(domain.oldname) - oldindex = self.delete_option('section', 'domain/%s' % - domain.oldname) - - # Reset the oldname, in case we're not done with - # this domain object. - domain.oldname = None; - - sectionname = 'domain/%s' % name - # Ensure that the existing section is removed - # This way we ensure that we are getting a - # complete copy of the service. - # delete_option() is a noop if the section - # does not exist. - index = self.delete_option('section', sectionname) - addkw = [] - for option,value in domain.get_all_options().items(): - if (type(value) == list): - value = ', '.join(value) - addkw.append( { 'type' : 'option', - 'name' : option, - 'value' : str(value) } ) - if oldindex: - self.add_section(sectionname, addkw, oldindex) - else: - self.add_section(sectionname, addkw, index) - - if domain.active: - self.activate_domain(name) - else: - self.deactivate_domain(name) diff --git a/server/config/SSSDConfigTest.py b/server/config/SSSDConfigTest.py deleted file mode 100755 index 153146f8..00000000 --- a/server/config/SSSDConfigTest.py +++ /dev/null @@ -1,1521 +0,0 @@ -#!/usr/bin/python -''' -Created on Sep 18, 2009 - -@author: sgallagh -''' -import unittest - -import SSSDConfig - -class SSSDConfigTestValid(unittest.TestCase): - def setUp(self): - pass - - def tearDown(self): - pass - - def testServices(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - sssdconfig.import_config(srcdir + "/testconfigs/sssd-valid.conf") - - # Validate services - services = sssdconfig.list_services() - self.assertTrue('sssd' in services) - self.assertTrue('nss' in services) - self.assertTrue('pam' in services) - self.assertTrue('dp' in services) - - #Verify service attributes - sssd_service = sssdconfig.get_service('sssd') - service_opts = sssd_service.list_options() - - - self.assertTrue('services' in service_opts.keys()) - service_list = sssd_service.get_option('services') - self.assertTrue('nss' in service_list) - self.assertTrue('pam' in service_list) - - self.assertTrue('domains' in service_opts) - - self.assertTrue('reconnection_retries' in service_opts) - - del sssdconfig - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - sssdconfig.new_config() - sssdconfig.delete_service('sssd') - new_sssd_service = sssdconfig.new_service('sssd'); - new_options = new_sssd_service.list_options(); - - self.assertTrue('debug_level' in new_options) - self.assertEquals(new_options['debug_level'][0], int) - - self.assertTrue('command' in new_options) - self.assertEquals(new_options['command'][0], str) - - self.assertTrue('reconnection_retries' in new_options) - self.assertEquals(new_options['reconnection_retries'][0], int) - - self.assertTrue('services' in new_options) - self.assertEquals(new_options['debug_level'][0], int) - - self.assertTrue('domains' in new_options) - self.assertEquals(new_options['domains'][0], list) - self.assertEquals(new_options['domains'][1], str) - - self.assertTrue('sbus_timeout' in new_options) - self.assertEquals(new_options['sbus_timeout'][0], int) - - self.assertTrue('re_expression' in new_options) - self.assertEquals(new_options['re_expression'][0], str) - - self.assertTrue('full_name_format' in new_options) - self.assertEquals(new_options['full_name_format'][0], str) - - del sssdconfig - - def testDomains(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - sssdconfig.import_config(srcdir + "/testconfigs/sssd-valid.conf") - - #Validate domain list - domains = sssdconfig.list_domains() - self.assertTrue('LOCAL' in domains) - self.assertTrue('LDAP' in domains) - self.assertTrue('PROXY' in domains) - self.assertTrue('IPA' in domains) - - #Verify domain attributes - ipa_domain = sssdconfig.get_domain('IPA') - domain_opts = ipa_domain.list_options() - self.assertTrue('debug_level' in domain_opts.keys()) - self.assertTrue('id_provider' in domain_opts.keys()) - self.assertTrue('auth_provider' in domain_opts.keys()) - - del sssdconfig - - def testListProviders(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - sssdconfig.new_config() - junk_domain = sssdconfig.new_domain('junk') - providers = junk_domain.list_providers() - self.assertTrue('ldap' in providers.keys()) - - def testCreateNewLocalConfig(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - sssdconfig.new_config() - - local_domain = sssdconfig.new_domain('LOCAL') - local_domain.add_provider('local', 'id') - local_domain.set_option('debug_level', 1) - local_domain.set_option('default_shell', '/bin/tcsh') - local_domain.set_active(True) - sssdconfig.save_domain(local_domain) - - sssdconfig.write('/tmp/testCreateNewLocalConfig.conf') - - def testCreateNewLDAPConfig(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - sssdconfig.new_config() - - ldap_domain = sssdconfig.new_domain('LDAP') - ldap_domain.add_provider('ldap', 'id') - ldap_domain.set_option('debug_level', 1) - ldap_domain.set_active(True) - sssdconfig.save_domain(ldap_domain) - - sssdconfig.write('/tmp/testCreateNewLDAPConfig.conf') - - def testModifyExistingConfig(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - sssdconfig.import_config(srcdir + "/testconfigs/sssd-valid.conf") - - ldap_domain = sssdconfig.get_domain('LDAP') - ldap_domain.set_option('debug_level', 3) - - ldap_domain.remove_provider('auth') - ldap_domain.add_provider('krb5', 'auth') - ldap_domain.set_active(True) - sssdconfig.save_domain(ldap_domain) - - sssdconfig.write('/tmp/testModifyExistingConfig.conf') - - def testSpaces(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - sssdconfig.import_config(srcdir + "/testconfigs/sssd-valid.conf") - ldap_domain = sssdconfig.get_domain('LDAP') - self.assertEqual(ldap_domain.get_option('auth_provider'), 'ldap') - self.assertEqual(ldap_domain.get_option('id_provider'), 'ldap') - -class SSSDConfigTestInvalid(unittest.TestCase): - def setUp(self): - pass - - def tearDown(self): - pass - - def testBadBool(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - sssdconfig.import_config(srcdir + "/testconfigs/sssd-invalid-badbool.conf") - self.assertRaises(TypeError, - sssdconfig.get_domain,'IPA') - -class SSSDConfigTestSSSDService(unittest.TestCase): - def setUp(self): - self.schema = SSSDConfig.SSSDConfigSchema(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - def tearDown(self): - pass - - def testInit(self): - # Positive test - service = SSSDConfig.SSSDService('sssd', self.schema) - - # Type Error test - # Name is not a string - self.assertRaises(TypeError, SSSDConfig.SSSDService, 3, self.schema) - - # TypeError test - # schema is not an SSSDSchema - self.assertRaises(TypeError, SSSDConfig.SSSDService, '3', self) - - # ServiceNotRecognizedError test - self.assertRaises(SSSDConfig.ServiceNotRecognizedError, - SSSDConfig.SSSDService, 'ssd', self.schema) - - def testListOptions(self): - service = SSSDConfig.SSSDService('sssd', self.schema) - - options = service.list_options() - control_list = [ - 'services', - 'domains', - 'timeout', - 'sbus_timeout', - 're_expression', - 'full_name_format', - 'debug_level', - 'debug_timestamps', - 'debug_to_files', - 'command', - 'reconnection_retries'] - - self.assertTrue(type(options) == dict, - "Options should be a dictionary") - - # Ensure that all of the expected defaults are there - for option in control_list: - self.assertTrue(option in options.keys(), - "Option [%s] missing" % - option) - - # Ensure that there aren't any unexpected options listed - for option in options.keys(): - self.assertTrue(option in control_list, - 'Option [%s] unexpectedly found' % - option) - - self.assertTrue(type(options['reconnection_retries']) == tuple, - "Option values should be a tuple") - - self.assertTrue(options['reconnection_retries'][0] == int, - "reconnection_retries should require an int. " + - "list_options is requiring a %s" % - options['reconnection_retries'][0]) - - self.assertTrue(options['reconnection_retries'][1] == None, - "reconnection_retries should not require a subtype. " + - "list_options is requiring a %s" % - options['reconnection_retries'][1]) - - self.assertTrue(options['reconnection_retries'][3] == None, - "reconnection_retries should have no default") - - self.assertTrue(type(options['services']) == tuple, - "Option values should be a tuple") - - self.assertTrue(options['services'][0] == list, - "services should require an list. " + - "list_options is requiring a %s" % - options['services'][0]) - - self.assertTrue(options['services'][1] == str, - "services should require a subtype of str. " + - "list_options is requiring a %s" % - options['services'][1]) - - def testListMandatoryOptions(self): - service = SSSDConfig.SSSDService('sssd', self.schema) - - options = service.list_mandatory_options() - control_list = [ - 'services', - 'domains'] - - self.assertTrue(type(options) == dict, - "Options should be a dictionary") - - # Ensure that all of the expected defaults are there - for option in control_list: - self.assertTrue(option in options.keys(), - "Option [%s] missing" % - option) - - # Ensure that there aren't any unexpected options listed - for option in options.keys(): - self.assertTrue(option in control_list, - 'Option [%s] unexpectedly found' % - option) - - self.assertTrue(type(options['services']) == tuple, - "Option values should be a tuple") - - self.assertTrue(options['services'][0] == list, - "services should require an list. " + - "list_options is requiring a %s" % - options['services'][0]) - - self.assertTrue(options['services'][1] == str, - "services should require a subtype of str. " + - "list_options is requiring a %s" % - options['services'][1]) - - def testSetOption(self): - service = SSSDConfig.SSSDService('sssd', self.schema) - - # Positive test - Exactly right - service.set_option('debug_level', 2) - self.assertEqual(service.get_option('debug_level'), 2) - - # Positive test - Allow converting "safe" values - service.set_option('debug_level', '2') - self.assertEqual(service.get_option('debug_level'), 2) - - # Positive test - Remove option if value is None - service.set_option('debug_level', None) - self.assertTrue('debug_level' not in service.options.keys()) - - # Negative test - Nonexistent Option - self.assertRaises(SSSDConfig.NoOptionError, service.set_option, 'nosuchoption', 1) - - # Negative test - Incorrect type - self.assertRaises(TypeError, service.set_option, 'debug_level', 'two') - - def testGetOption(self): - service = SSSDConfig.SSSDService('sssd', self.schema) - - # Positive test - Single-valued - self.assertEqual(service.get_option('config_file_version'), 2) - - # Positive test - List of values - self.assertEqual(service.get_option('services'), ['nss', 'pam']) - - # Negative Test - Bad Option - self.assertRaises(SSSDConfig.NoOptionError, service.get_option, 'nosuchoption') - - def testGetAllOptions(self): - service = SSSDConfig.SSSDService('sssd', self.schema) - - #Positive test - options = service.get_all_options() - control_list = [ - 'config_file_version', - 'services'] - - self.assertTrue(type(options) == dict, - "Options should be a dictionary") - - # Ensure that all of the expected defaults are there - for option in control_list: - self.assertTrue(option in options.keys(), - "Option [%s] missing" % - option) - - # Ensure that there aren't any unexpected options listed - for option in options.keys(): - self.assertTrue(option in control_list, - 'Option [%s] unexpectedly found' % - option) - - def testRemoveOption(self): - service = SSSDConfig.SSSDService('sssd', self.schema) - - # Positive test - Remove an option that exists - self.assertEqual(service.get_option('services'), ['nss', 'pam']) - service.remove_option('services') - self.assertRaises(SSSDConfig.NoOptionError, service.get_option, 'debug_level') - - # Positive test - Remove an option that doesn't exist - self.assertRaises(SSSDConfig.NoOptionError, service.get_option, 'nosuchentry') - service.remove_option('nosuchentry') - -class SSSDConfigTestSSSDDomain(unittest.TestCase): - def setUp(self): - self.schema = SSSDConfig.SSSDConfigSchema(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - def tearDown(self): - pass - - def testInit(self): - # Positive Test - domain = SSSDConfig.SSSDDomain('mydomain', self.schema) - - # Negative Test - Name not a string - self.assertRaises(TypeError, SSSDConfig.SSSDDomain, 2, self.schema) - - # Negative Test - Schema is not an SSSDSchema - self.assertRaises(TypeError, SSSDConfig.SSSDDomain, 'mydomain', self) - - def testGetName(self): - # Positive Test - domain = SSSDConfig.SSSDDomain('mydomain', self.schema) - - self.assertEqual(domain.get_name(), 'mydomain') - - def testSetActive(self): - #Positive Test - domain = SSSDConfig.SSSDDomain('mydomain', self.schema) - - # Should default to inactive - self.assertFalse(domain.active) - domain.set_active(True) - self.assertTrue(domain.active) - domain.set_active(False) - self.assertFalse(domain.active) - - def testListOptions(self): - domain = SSSDConfig.SSSDDomain('sssd', self.schema) - - # First test default options - options = domain.list_options() - control_list = [ - 'debug_level', - 'debug_timestamps', - 'min_id', - 'max_id', - 'timeout', - 'command', - 'enumerate', - 'cache_credentials', - 'store_legacy_passwords', - 'use_fully_qualified_names', - 'entry_cache_timeout', - 'id_provider', - 'auth_provider', - 'access_provider', - 'chpass_provider'] - - self.assertTrue(type(options) == dict, - "Options should be a dictionary") - - # Ensure that all of the expected defaults are there - for option in control_list: - self.assertTrue(option in options.keys(), - "Option [%s] missing" % - option) - - # Ensure that there aren't any unexpected options listed - for option in options.keys(): - self.assertTrue(option in control_list, - 'Option [%s] unexpectedly found' % - option) - - self.assertTrue(type(options['max_id']) == tuple, - "Option values should be a tuple") - - self.assertTrue(options['max_id'][0] == int, - "max_id should require an int. " + - "list_options is requiring a %s" % - options['max_id'][0]) - - self.assertTrue(options['max_id'][1] == None, - "max_id should not require a subtype. " + - "list_options is requiring a %s" % - options['max_id'][1]) - - # Add a provider and verify that the new options appear - domain.add_provider('local', 'id') - control_list.extend( - ['default_shell', - 'base_directory']) - - options = domain.list_options() - - self.assertTrue(type(options) == dict, - "Options should be a dictionary") - - # Ensure that all of the expected defaults are there - for option in control_list: - self.assertTrue(option in options.keys(), - "Option [%s] missing" % - option) - - # Ensure that there aren't any unexpected options listed - for option in options.keys(): - self.assertTrue(option in control_list, - 'Option [%s] unexpectedly found' % - option) - - # Add a provider that has global options and verify that - # The new options appear. - domain.add_provider('krb5', 'auth') - - backup_list = control_list[:] - control_list.extend( - ['krb5_kdcip', - 'krb5_realm', - 'krb5_ccachedir', - 'krb5_ccname_template', - 'krb5_keytab', - 'krb5_validate', - 'krb5_auth_timeout']) - - options = domain.list_options() - - self.assertTrue(type(options) == dict, - "Options should be a dictionary") - - # Ensure that all of the expected defaults are there - for option in control_list: - self.assertTrue(option in options.keys(), - "Option [%s] missing" % - option) - - # Ensure that there aren't any unexpected options listed - for option in options.keys(): - self.assertTrue(option in control_list, - 'Option [%s] unexpectedly found' % - option) - - # Remove the auth domain and verify that the options - # revert to the backup_list - domain.remove_provider('auth') - options = domain.list_options() - - self.assertTrue(type(options) == dict, - "Options should be a dictionary") - - # Ensure that all of the expected defaults are there - for option in backup_list: - self.assertTrue(option in options.keys(), - "Option [%s] missing" % - option) - - # Ensure that there aren't any unexpected options listed - for option in options.keys(): - self.assertTrue(option in backup_list, - 'Option [%s] unexpectedly found' % - option) - - def testListMandatoryOptions(self): - domain = SSSDConfig.SSSDDomain('sssd', self.schema) - - # First test default options - options = domain.list_mandatory_options() - control_list = [ - 'cache_credentials', - 'min_id', - 'id_provider', - 'auth_provider'] - - self.assertTrue(type(options) == dict, - "Options should be a dictionary") - - # Ensure that all of the expected defaults are there - for option in control_list: - self.assertTrue(option in options.keys(), - "Option [%s] missing" % - option) - - # Ensure that there aren't any unexpected options listed - for option in options.keys(): - self.assertTrue(option in control_list, - 'Option [%s] unexpectedly found' % - option) - - # Add a provider and verify that the new options appear - domain.add_provider('local', 'id') - control_list.extend( - ['default_shell', - 'base_directory']) - - options = domain.list_mandatory_options() - - self.assertTrue(type(options) == dict, - "Options should be a dictionary") - - # Ensure that all of the expected defaults are there - for option in control_list: - self.assertTrue(option in options.keys(), - "Option [%s] missing" % - option) - - # Ensure that there aren't any unexpected options listed - for option in options.keys(): - self.assertTrue(option in control_list, - 'Option [%s] unexpectedly found' % - option) - - # Add a provider that has global options and verify that - # The new options appear. - domain.add_provider('krb5', 'auth') - - backup_list = control_list[:] - control_list.extend( - ['krb5_kdcip', - 'krb5_realm']) - - options = domain.list_mandatory_options() - - self.assertTrue(type(options) == dict, - "Options should be a dictionary") - - # Ensure that all of the expected defaults are there - for option in control_list: - self.assertTrue(option in options.keys(), - "Option [%s] missing" % - option) - - # Ensure that there aren't any unexpected options listed - for option in options.keys(): - self.assertTrue(option in control_list, - 'Option [%s] unexpectedly found' % - option) - - # Remove the auth domain and verify that the options - # revert to the backup_list - domain.remove_provider('auth') - options = domain.list_mandatory_options() - - self.assertTrue(type(options) == dict, - "Options should be a dictionary") - - # Ensure that all of the expected defaults are there - for option in backup_list: - self.assertTrue(option in options.keys(), - "Option [%s] missing" % - option) - - # Ensure that there aren't any unexpected options listed - for option in options.keys(): - self.assertTrue(option in backup_list, - 'Option [%s] unexpectedly found' % - option) - - def testListProviders(self): - domain = SSSDConfig.SSSDDomain('sssd', self.schema) - - control_provider_dict = { - 'ipa': ['id', 'auth', 'access', 'chpass'], - 'local': ['id', 'auth', 'chpass'], - 'ldap': ['id', 'auth', 'chpass'], - 'krb5': ['auth', 'chpass'], - 'proxy': ['id', 'auth'], - 'permit': ['access'], - 'deny': ['access']} - - providers = domain.list_providers() - - # Ensure that all of the expected defaults are there - for provider in control_provider_dict.keys(): - for ptype in control_provider_dict[provider]: - self.assertTrue(providers.has_key(provider)) - self.assertTrue(ptype in providers[provider]) - - for provider in providers.keys(): - for ptype in providers[provider]: - self.assertTrue(control_provider_dict.has_key(provider)) - self.assertTrue(ptype in control_provider_dict[provider]) - - def testListProviderOptions(self): - domain = SSSDConfig.SSSDDomain('sssd', self.schema) - - # Test looking up a specific provider type - options = domain.list_provider_options('krb5', 'auth') - control_list = [ - 'krb5_kdcip', - 'krb5_realm', - 'krb5_ccachedir', - 'krb5_ccname_template', - 'krb5_keytab', - 'krb5_validate', - 'krb5_auth_timeout'] - - self.assertTrue(type(options) == dict, - "Options should be a dictionary") - - # Ensure that all of the expected defaults are there - for option in control_list: - self.assertTrue(option in options.keys(), - "Option [%s] missing" % - option) - - # Ensure that there aren't any unexpected options listed - for option in options.keys(): - self.assertTrue(option in control_list, - 'Option [%s] unexpectedly found' % - option) - - #Test looking up all provider values - options = domain.list_provider_options('krb5') - control_list.extend(['krb5_changepw_principal']) - - self.assertTrue(type(options) == dict, - "Options should be a dictionary") - - # Ensure that all of the expected defaults are there - for option in control_list: - self.assertTrue(option in options.keys(), - "Option [%s] missing" % - option) - - # Ensure that there aren't any unexpected options listed - for option in options.keys(): - self.assertTrue(option in control_list, - 'Option [%s] unexpectedly found' % - option) - - def testAddProvider(self): - domain = SSSDConfig.SSSDDomain('sssd', self.schema) - - # Positive Test - domain.add_provider('local', 'id') - - # Negative Test - No such backend type - self.assertRaises(SSSDConfig.NoSuchProviderError, - domain.add_provider, 'nosuchbackend', 'auth') - - # Negative Test - No such backend subtype - self.assertRaises(SSSDConfig.NoSuchProviderSubtypeError, - domain.add_provider, 'ldap', 'nosuchsubtype') - - # Negative Test - Try to add a second provider of the same type - self.assertRaises(SSSDConfig.ProviderSubtypeInUse, - domain.add_provider, 'ldap', 'id') - - def testRemoveProvider(self): - domain = SSSDConfig.SSSDDomain('sssd', self.schema) - - # First test default options - options = domain.list_options() - control_list = [ - 'debug_level', - 'debug_timestamps', - 'min_id', - 'max_id', - 'timeout', - 'command', - 'enumerate', - 'cache_credentials', - 'store_legacy_passwords', - 'use_fully_qualified_names', - 'entry_cache_timeout', - 'id_provider', - 'auth_provider', - 'access_provider', - 'chpass_provider'] - - self.assertTrue(type(options) == dict, - "Options should be a dictionary") - - # Ensure that all of the expected defaults are there - for option in control_list: - self.assertTrue(option in options.keys(), - "Option [%s] missing" % - option) - - # Ensure that there aren't any unexpected options listed - for option in options.keys(): - self.assertTrue(option in control_list, - 'Option [%s] unexpectedly found' % - option) - - self.assertTrue(type(options['max_id']) == tuple, - "Option values should be a tuple") - - self.assertTrue(options['max_id'][0] == int, - "config_file_version should require an int. " + - "list_options is requiring a %s" % - options['max_id'][0]) - - self.assertTrue(options['max_id'][1] == None, - "config_file_version should not require a subtype. " + - "list_options is requiring a %s" % - options['max_id'][1]) - - # Add a provider and verify that the new options appear - domain.add_provider('local', 'id') - control_list.extend( - ['default_shell', - 'base_directory']) - - options = domain.list_options() - - self.assertTrue(type(options) == dict, - "Options should be a dictionary") - - # Ensure that all of the expected defaults are there - for option in control_list: - self.assertTrue(option in options.keys(), - "Option [%s] missing" % - option) - - # Ensure that there aren't any unexpected options listed - for option in options.keys(): - self.assertTrue(option in control_list, - 'Option [%s] unexpectedly found' % - option) - - # Add a provider that has global options and verify that - # The new options appear. - domain.add_provider('krb5', 'auth') - - backup_list = control_list[:] - control_list.extend( - ['krb5_kdcip', - 'krb5_realm', - 'krb5_ccachedir', - 'krb5_ccname_template', - 'krb5_keytab', - 'krb5_validate', - 'krb5_auth_timeout']) - - options = domain.list_options() - - self.assertTrue(type(options) == dict, - "Options should be a dictionary") - - # Ensure that all of the expected defaults are there - for option in control_list: - self.assertTrue(option in options.keys(), - "Option [%s] missing" % - option) - - # Ensure that there aren't any unexpected options listed - for option in options.keys(): - self.assertTrue(option in control_list, - 'Option [%s] unexpectedly found' % - option) - - # Remove the local ID provider and add an LDAP one - # LDAP ID providers can also use the krb5_realm - domain.remove_provider('id') - - domain.add_provider('ldap', 'id') - - # Set the krb5_realm option and the ldap_uri option - domain.set_option('krb5_realm', 'EXAMPLE.COM') - domain.set_option('ldap_uri', 'ldap://ldap.example.com') - - self.assertEquals(domain.get_option('krb5_realm'), - 'EXAMPLE.COM') - self.assertEquals(domain.get_option('ldap_uri'), - 'ldap://ldap.example.com') - - # Remove the LDAP provider and verify that krb5_realm remains - domain.remove_provider('id') - self.assertEquals(domain.get_option('krb5_realm'), - 'EXAMPLE.COM') - self.assertFalse(domain.options.has_key('ldap_uri')) - - # Put the LOCAL provider back - domain.add_provider('local', 'id') - - # Remove the auth domain and verify that the options - # revert to the backup_list - domain.remove_provider('auth') - options = domain.list_options() - - self.assertTrue(type(options) == dict, - "Options should be a dictionary") - - # Ensure that all of the expected defaults are there - for option in backup_list: - self.assertTrue(option in options.keys(), - "Option [%s] missing" % - option) - - # Ensure that there aren't any unexpected options listed - for option in options.keys(): - self.assertTrue(option in backup_list, - 'Option [%s] unexpectedly found' % - option) - - # Ensure that the krb5_realm option is now gone - self.assertFalse(domain.options.has_key('krb5_realm')) - - # Test removing nonexistent provider - Real - domain.remove_provider('id') - - # Test removing nonexistent provider - Bad backend type - # Should pass without complaint - domain.remove_provider('id') - - # Test removing nonexistent provider - Bad provider type - # Should pass without complaint - domain.remove_provider('nosuchprovider') - - def testGetOption(self): - domain = SSSDConfig.SSSDDomain('sssd', self.schema) - - # Positive Test - Ensure that we can get a valid option - self.assertEqual(domain.get_option('debug_level'), 0) - - # Negative Test - Try to get valid option that is not set - self.assertRaises(SSSDConfig.NoOptionError, domain.get_option, 'max_id') - - # Positive Test - Set the above option and get it - domain.set_option('max_id', 10000) - self.assertEqual(domain.get_option('max_id'), 10000) - - # Negative Test - Try yo get invalid option - self.assertRaises(SSSDConfig.NoOptionError, domain.get_option, 'nosuchoption') - - def testSetOption(self): - domain = SSSDConfig.SSSDDomain('sssd', self.schema) - - # Positive Test - domain.set_option('max_id', 10000) - self.assertEqual(domain.get_option('max_id'), 10000) - - # Positive Test - Remove option if value is None - domain.set_option('max_id', None) - self.assertTrue('max_id' not in domain.get_all_options().keys()) - - # Negative Test - invalid option - self.assertRaises(SSSDConfig.NoOptionError, domain.set_option, 'nosuchoption', 1) - - # Negative Test - incorrect type - self.assertRaises(TypeError, domain.set_option, 'max_id', 'a string') - - # Positive Test - Coax options to appropriate type - domain.set_option('max_id', '10000') - self.assertEqual(domain.get_option('max_id'), 10000) - - domain.set_option('max_id', 30.2) - self.assertEqual(domain.get_option('max_id'), 30) - - def testRemoveOption(self): - domain = SSSDConfig.SSSDDomain('sssd', self.schema) - - # Positive test - Remove existing option - self.assertTrue('min_id' in domain.get_all_options().keys()) - domain.remove_option('min_id') - self.assertFalse('min_id' in domain.get_all_options().keys()) - - # Positive test - Remove unset but valid option - self.assertFalse('max_id' in domain.get_all_options().keys()) - domain.remove_option('max_id') - self.assertFalse('max_id' in domain.get_all_options().keys()) - - # Positive test - Remove unset and unknown option - self.assertFalse('nosuchoption' in domain.get_all_options().keys()) - domain.remove_option('nosuchoption') - self.assertFalse('nosuchoption' in domain.get_all_options().keys()) - - def testSetName(self): - domain = SSSDConfig.SSSDDomain('sssd', self.schema) - - # Positive test - Change the name once - domain.set_name('sssd2'); - self.assertEqual(domain.get_name(), 'sssd2') - self.assertEqual(domain.oldname, 'sssd') - - # Positive test - Change the name a second time - domain.set_name('sssd3') - self.assertEqual(domain.get_name(), 'sssd3') - self.assertEqual(domain.oldname, 'sssd') - - # Negative test - try setting the name to a non-string - self.assertRaises(TypeError, - domain.set_name, 4) - -class SSSDConfigTestSSSDConfig(unittest.TestCase): - def setUp(self): - pass - - def tearDown(self): - pass - - def testInit(self): - # Positive test - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - # Negative Test - No Such File - self.assertRaises(IOError, - SSSDConfig.SSSDConfig, "nosuchfile.api.conf", srcdir + "/etc/sssd.api.d") - - # Negative Test - Schema is not parsable - self.assertRaises(SSSDConfig.ParsingError, - SSSDConfig.SSSDConfig, srcdir + "/testconfigs/noparse.api.conf", srcdir + "/etc/sssd.api.d") - - def testImportConfig(self): - # Positive Test - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - sssdconfig.import_config(srcdir + "/testconfigs/sssd-valid.conf") - - # Verify that all sections were imported - control_list = [ - 'sssd', - 'nss', - 'pam', - 'dp', - 'domain/PROXY', - 'domain/IPA', - 'domain/LOCAL', - 'domain/LDAP', - ] - - for section in control_list: - self.assertTrue(sssdconfig.has_section(section), - "Section [%s] missing" % - section) - for section in sssdconfig.sections(): - self.assertTrue(section['name'] in control_list) - - # Verify that all options were imported for a section - control_list = [ - 'services', - 'reconnection_retries', - 'domains', - 'debug_timestamps', - 'config_file_version'] - - for option in control_list: - self.assertTrue(sssdconfig.has_option('sssd', option), - "Option [%s] missing from [sssd]" % - option) - for option in sssdconfig.options('sssd'): - if option['type'] in ('empty', 'comment'): - continue - self.assertTrue(option['name'] in control_list, - "Option [%s] unexpectedly found" % - option) - - #TODO: Check the types and values of the settings - - # Negative Test - Missing config file - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - self.assertRaises(IOError, sssdconfig.import_config, "nosuchfile.conf") - - # Negative Test - Invalid config file - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - self.assertRaises(SSSDConfig.ParsingError, sssdconfig.import_config, srcdir + "/testconfigs/sssd-invalid.conf") - - # Negative Test - Invalid config file version - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - self.assertRaises(SSSDConfig.ParsingError, sssdconfig.import_config, srcdir + "/testconfigs/sssd-badversion.conf") - - # Negative Test - No config file version - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - self.assertRaises(SSSDConfig.ParsingError, sssdconfig.import_config, srcdir + "/testconfigs/sssd-noversion.conf") - - # Negative Test - Already initialized - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - sssdconfig.import_config(srcdir + "/testconfigs/sssd-valid.conf") - self.assertRaises(SSSDConfig.AlreadyInitializedError, - sssdconfig.import_config, srcdir + "/testconfigs/sssd-valid.conf") - - def testNewConfig(self): - # Positive Test - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - sssdconfig.new_config() - - # Check that the defaults were set - control_list = [ - 'sssd', - 'nss', - 'pam'] - for section in control_list: - self.assertTrue(sssdconfig.has_section(section), - "Section [%s] missing" % - section) - for section in sssdconfig.sections(): - self.assertTrue(section['name'] in control_list) - - control_list = [ - 'config_file_version', - 'services'] - for option in control_list: - self.assertTrue(sssdconfig.has_option('sssd', option), - "Option [%s] missing from [sssd]" % - option) - for option in sssdconfig.options('sssd'): - if option['type'] in ('empty', 'comment'): - continue - self.assertTrue(option['name'] in control_list, - "Option [%s] unexpectedly found" % - option) - - # Negative Test - Already Initialized - self.assertRaises(SSSDConfig.AlreadyInitializedError, sssdconfig.new_config) - - def testWrite(self): - #TODO Write tests to compare output files - pass - - def testListServices(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - # Negative Test - sssdconfig not initialized - self.assertRaises(SSSDConfig.NotInitializedError, sssdconfig.list_services) - - sssdconfig.new_config() - - control_list = [ - 'sssd', - 'pam', - 'nss'] - service_list = sssdconfig.list_services() - for service in control_list: - self.assertTrue(service in service_list, - "Service [%s] missing" % - service) - for service in service_list: - self.assertTrue(service in control_list, - "Service [%s] unexpectedly found" % - service) - - def testGetService(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - # Negative Test - Not initialized - self.assertRaises(SSSDConfig.NotInitializedError, sssdconfig.get_service, 'sssd') - - sssdconfig.import_config(srcdir + '/testconfigs/sssd-valid.conf') - - service = sssdconfig.get_service('sssd') - self.assertTrue(isinstance(service, SSSDConfig.SSSDService)) - - # Verify the contents of this service - self.assertEqual(type(service.get_option('debug_timestamps')), bool) - self.assertFalse(service.get_option('debug_timestamps')) - - # Negative Test - No such service - self.assertRaises(SSSDConfig.NoServiceError, sssdconfig.get_service, 'nosuchservice') - - def testNewService(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - # Negative Test - Not initialized - self.assertRaises(SSSDConfig.NotInitializedError, sssdconfig.new_service, 'sssd') - - sssdconfig.new_config() - - # Positive Test - # First need to remove the existing service - sssdconfig.delete_service('sssd') - service = sssdconfig.new_service('sssd') - self.failUnless(service.get_name() in sssdconfig.list_services()) - - # TODO: check that the values of this new service - # are set to the defaults from the schema - - def testDeleteService(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - # Negative Test - Not initialized - self.assertRaises(SSSDConfig.NotInitializedError, sssdconfig.delete_service, 'sssd') - - sssdconfig.new_config() - - # Positive Test - service = sssdconfig.delete_service('sssd') - - def testSaveService(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - new_service = SSSDConfig.SSSDService('sssd', sssdconfig.schema) - - # Negative Test - Not initialized - self.assertRaises(SSSDConfig.NotInitializedError, sssdconfig.save_service, new_service) - - # Positive Test - sssdconfig.new_config() - sssdconfig.save_service(new_service) - - # TODO: check that all entries were saved correctly (change a few) - - # Negative Test - Type Error - self.assertRaises(TypeError, sssdconfig.save_service, self) - - def testListActiveDomains(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - # Negative Test - Not Initialized - self.assertRaises(SSSDConfig.NotInitializedError, sssdconfig.list_active_domains) - - # Positive Test - sssdconfig.import_config(srcdir + '/testconfigs/sssd-valid.conf') - - control_list = [ - 'IPA', - 'LOCAL'] - active_domains = sssdconfig.list_active_domains() - - for domain in control_list: - self.assertTrue(domain in active_domains, - "Domain [%s] missing" % - domain) - for domain in active_domains: - self.assertTrue(domain in control_list, - "Domain [%s] unexpectedly found" % - domain) - - def testListInactiveDomains(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - # Negative Test - Not Initialized - self.assertRaises(SSSDConfig.NotInitializedError, sssdconfig.list_inactive_domains) - - # Positive Test - sssdconfig.import_config(srcdir + '/testconfigs/sssd-valid.conf') - - control_list = [ - 'PROXY', - 'LDAP'] - inactive_domains = sssdconfig.list_inactive_domains() - - for domain in control_list: - self.assertTrue(domain in inactive_domains, - "Domain [%s] missing" % - domain) - for domain in inactive_domains: - self.assertTrue(domain in control_list, - "Domain [%s] unexpectedly found" % - domain) - - def testListDomains(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - # Negative Test - Not Initialized - self.assertRaises(SSSDConfig.NotInitializedError, sssdconfig.list_domains) - - # Positive Test - sssdconfig.import_config(srcdir + '/testconfigs/sssd-valid.conf') - - control_list = [ - 'IPA', - 'LOCAL', - 'PROXY', - 'LDAP'] - domains = sssdconfig.list_domains() - - for domain in control_list: - self.assertTrue(domain in domains, - "Domain [%s] missing" % - domain) - for domain in domains: - self.assertTrue(domain in control_list, - "Domain [%s] unexpectedly found" % - domain) - - def testGetDomain(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - # Negative Test - Not initialized - self.assertRaises(SSSDConfig.NotInitializedError, sssdconfig.get_domain, 'sssd') - - sssdconfig.import_config(srcdir + '/testconfigs/sssd-valid.conf') - - domain = sssdconfig.get_domain('IPA') - self.assertTrue(isinstance(domain, SSSDConfig.SSSDDomain)) - self.assertTrue(domain.active) - - # TODO verify the contents of this domain - - # Negative Test - No such domain - self.assertRaises(SSSDConfig.NoDomainError, sssdconfig.get_domain, 'nosuchdomain') - - def testNewDomain(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - # Negative Test - Not initialized - self.assertRaises(SSSDConfig.NotInitializedError, sssdconfig.new_domain, 'example.com') - - sssdconfig.new_config() - - # Positive Test - domain = sssdconfig.new_domain('example.com') - self.assertTrue(isinstance(domain, SSSDConfig.SSSDDomain)) - self.failUnless(domain.get_name() in sssdconfig.list_domains()) - self.failUnless(domain.get_name() in sssdconfig.list_inactive_domains()) - - # TODO: check that the values of this new domain - # are set to the defaults from the schema - - def testDeleteDomain(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - # Negative Test - Not initialized - self.assertRaises(SSSDConfig.NotInitializedError, sssdconfig.delete_domain, 'IPA') - - # Positive Test - sssdconfig.import_config(srcdir + '/testconfigs/sssd-valid.conf') - - self.assertTrue('IPA' in sssdconfig.list_domains()) - self.assertTrue('IPA' in sssdconfig.list_active_domains()) - self.assertTrue(sssdconfig.has_section('domain/IPA')) - sssdconfig.delete_domain('IPA') - self.assertFalse('IPA' in sssdconfig.list_domains()) - self.assertFalse('IPA' in sssdconfig.list_active_domains()) - self.assertFalse(sssdconfig.has_section('domain/IPA')) - - def testSaveDomain(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - # Negative Test - Not initialized - self.assertRaises(SSSDConfig.NotInitializedError, sssdconfig.save_domain, 'IPA') - - # Positive Test - sssdconfig.new_config() - domain = sssdconfig.new_domain('example.com') - domain.add_provider('ldap', 'id') - domain.set_option('ldap_uri', 'ldap://ldap.example.com') - domain.set_active(True) - sssdconfig.save_domain(domain) - - self.assertTrue('example.com' in sssdconfig.list_domains()) - self.assertTrue('example.com' in sssdconfig.list_active_domains()) - self.assertEqual(sssdconfig.get('domain/example.com', 'ldap_uri'), - 'ldap://ldap.example.com') - - # Negative Test - Type Error - self.assertRaises(TypeError, sssdconfig.save_domain, self) - - # Positive test - Change the domain name and save it - domain.set_name('example.com2') - self.assertEqual(domain.name,'example.com2') - self.assertEqual(domain.oldname,'example.com') - sssdconfig.save_domain(domain) - - self.assertTrue('example.com2' in sssdconfig.list_domains()) - self.assertTrue('example.com2' in sssdconfig.list_active_domains()) - self.assertTrue(sssdconfig.has_section('domain/example.com2')) - self.assertEqual(sssdconfig.get('domain/example.com2', - 'ldap_uri'), - 'ldap://ldap.example.com') - self.assertFalse('example.com' in sssdconfig.list_domains()) - self.assertFalse('example.com' in sssdconfig.list_active_domains()) - self.assertFalse('example.com' in sssdconfig.list_inactive_domains()) - self.assertFalse(sssdconfig.has_section('domain/example.com')) - self.assertEquals(domain.oldname, None) - - # Positive test - Set the domain inactive and save it - activelist = sssdconfig.list_active_domains() - inactivelist = sssdconfig.list_inactive_domains() - - domain.set_active(False) - sssdconfig.save_domain(domain) - - self.assertFalse('example.com2' in sssdconfig.list_active_domains()) - self.assertTrue('example.com2' in sssdconfig.list_inactive_domains()) - - self.assertEquals(len(sssdconfig.list_active_domains()), - len(activelist)-1) - self.assertEquals(len(sssdconfig.list_inactive_domains()), - len(inactivelist)+1) - - # Positive test - Set the domain active and save it - activelist = sssdconfig.list_active_domains() - inactivelist = sssdconfig.list_inactive_domains() - domain.set_active(True) - sssdconfig.save_domain(domain) - - self.assertTrue('example.com2' in sssdconfig.list_active_domains()) - self.assertFalse('example.com2' in sssdconfig.list_inactive_domains()) - - self.assertEquals(len(sssdconfig.list_active_domains()), - len(activelist)+1) - self.assertEquals(len(sssdconfig.list_inactive_domains()), - len(inactivelist)-1) - - # Positive test - Set the domain inactive and save it - activelist = sssdconfig.list_active_domains() - inactivelist = sssdconfig.list_inactive_domains() - - sssdconfig.deactivate_domain(domain.get_name()) - - self.assertFalse('example.com2' in sssdconfig.list_active_domains()) - self.assertTrue('example.com2' in sssdconfig.list_inactive_domains()) - - self.assertEquals(len(sssdconfig.list_active_domains()), - len(activelist)-1) - self.assertEquals(len(sssdconfig.list_inactive_domains()), - len(inactivelist)+1) - - # Positive test - Set the domain active and save it - activelist = sssdconfig.list_active_domains() - inactivelist = sssdconfig.list_inactive_domains() - - sssdconfig.activate_domain(domain.get_name()) - - self.assertTrue('example.com2' in sssdconfig.list_active_domains()) - self.assertFalse('example.com2' in sssdconfig.list_inactive_domains()) - - self.assertEquals(len(sssdconfig.list_active_domains()), - len(activelist)+1) - self.assertEquals(len(sssdconfig.list_inactive_domains()), - len(inactivelist)-1) - - # Positive test - Ensure that saved domains retain values - domain.set_option('ldap_krb5_init_creds', True) - domain.set_option('ldap_id_use_start_tls', False) - domain.set_option('ldap_user_search_base', - 'cn=accounts, dc=example, dc=com') - self.assertTrue(domain.get_option('ldap_krb5_init_creds')) - self.assertFalse(domain.get_option('ldap_id_use_start_tls')) - self.assertEqual(domain.get_option('ldap_user_search_base'), - 'cn=accounts, dc=example, dc=com') - - sssdconfig.save_domain(domain) - sssdconfig.write('/tmp/testSaveDomain.out') - - domain2 = sssdconfig.get_domain('example.com2') - self.assertTrue(domain2.get_option('ldap_krb5_init_creds')) - self.assertFalse(domain2.get_option('ldap_id_use_start_tls')) - - def testActivateDomain(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - domain_name = 'PROXY' - - # Negative test - Not initialized - self.assertRaises(SSSDConfig.NotInitializedError, - sssdconfig.activate_domain, domain_name) - - sssdconfig.import_config(srcdir + "/testconfigs/sssd-valid.conf") - - # Positive test - Activate an inactive domain - self.assertTrue(domain_name in sssdconfig.list_domains()) - self.assertFalse(domain_name in sssdconfig.list_active_domains()) - self.assertTrue(domain_name in sssdconfig.list_inactive_domains()) - - sssdconfig.activate_domain('PROXY') - self.assertTrue(domain_name in sssdconfig.list_domains()) - self.assertTrue(domain_name in sssdconfig.list_active_domains()) - self.assertFalse(domain_name in sssdconfig.list_inactive_domains()) - - # Positive test - Activate an active domain - # This should succeed - sssdconfig.activate_domain('PROXY') - self.assertTrue(domain_name in sssdconfig.list_domains()) - self.assertTrue(domain_name in sssdconfig.list_active_domains()) - self.assertFalse(domain_name in sssdconfig.list_inactive_domains()) - - # Negative test - Invalid domain name - self.assertRaises(SSSDConfig.NoDomainError, - sssdconfig.activate_domain, 'nosuchdomain') - - # Negative test - Invalid domain name type - self.assertRaises(SSSDConfig.NoDomainError, - sssdconfig.activate_domain, self) - - def testDeactivateDomain(self): - sssdconfig = SSSDConfig.SSSDConfig(srcdir + "/etc/sssd.api.conf", - srcdir + "/etc/sssd.api.d") - - domain_name = 'IPA' - - # Negative test - Not initialized - self.assertRaises(SSSDConfig.NotInitializedError, - sssdconfig.activate_domain, domain_name) - - sssdconfig.import_config(srcdir + "/testconfigs/sssd-valid.conf") - - # Positive test -Deactivate an active domain - self.assertTrue(domain_name in sssdconfig.list_domains()) - self.assertTrue(domain_name in sssdconfig.list_active_domains()) - self.assertFalse(domain_name in sssdconfig.list_inactive_domains()) - - sssdconfig.deactivate_domain(domain_name) - self.assertTrue(domain_name in sssdconfig.list_domains()) - self.assertFalse(domain_name in sssdconfig.list_active_domains()) - self.assertTrue(domain_name in sssdconfig.list_inactive_domains()) - - # Positive test - Deactivate an inactive domain - # This should succeed - sssdconfig.deactivate_domain(domain_name) - self.assertTrue(domain_name in sssdconfig.list_domains()) - self.assertFalse(domain_name in sssdconfig.list_active_domains()) - self.assertTrue(domain_name in sssdconfig.list_inactive_domains()) - - # Negative test - Invalid domain name - self.assertRaises(SSSDConfig.NoDomainError, - sssdconfig.activate_domain, 'nosuchdomain') - - # Negative test - Invalid domain name type - self.assertRaises(SSSDConfig.NoDomainError, - sssdconfig.activate_domain, self) - -if __name__ == "__main__": - error = 0 - - import os - import sys - srcdir = os.getenv('srcdir') - if srcdir: - srcdir = srcdir + "/config" - else: - srcdir = "." - - suite = unittest.TestLoader().loadTestsFromTestCase(SSSDConfigTestSSSDService) - res = unittest.TextTestRunner().run(suite) - if not res.wasSuccessful(): - error |= 0x1 - - suite = unittest.TestLoader().loadTestsFromTestCase(SSSDConfigTestSSSDDomain) - res = unittest.TextTestRunner().run(suite) - if not res.wasSuccessful(): - error |= 0x2 - - suite = unittest.TestLoader().loadTestsFromTestCase(SSSDConfigTestSSSDConfig) - res = unittest.TextTestRunner().run(suite) - if not res.wasSuccessful(): - error |= 0x4 - - suite = unittest.TestLoader().loadTestsFromTestCase(SSSDConfigTestValid) - res = unittest.TextTestRunner().run(suite) - if not res.wasSuccessful(): - error |= 0x8 - - suite = unittest.TestLoader().loadTestsFromTestCase(SSSDConfigTestInvalid) - res = unittest.TextTestRunner().run(suite) - if not res.wasSuccessful(): - error |= 0x10 - - sys.exit(error) diff --git a/server/config/etc/sssd.api.conf b/server/config/etc/sssd.api.conf deleted file mode 100644 index 19053538..00000000 --- a/server/config/etc/sssd.api.conf +++ /dev/null @@ -1,66 +0,0 @@ -# Format: -# option = type, subtype, mandatory[, default] - -[service] -# Options available to all services -debug_level = int, None, false -debug_timestamps = bool, None, false -debug_to_files = bool, None, false -command = str, None, false -reconnection_retries = int, None, false - -[sssd] -# Monitor service -services = list, str, true, nss, pam -domains = list, str, true -timeout = int, None, false -sbus_timeout = int, None, false -re_expression = str, None, false -full_name_format = str, None, false - -[nss] -# Name service -enum_cache_timeout = int, None, false -entry_cache_no_wait_percentage = int, None, false -entry_negative_timeout = int, None, false -filter_users = list, str, false -filter_groups = list, str, false -filter_users_in_groups = bool, None, false -pwfield = str, None, false - -[pam] -# Authentication service -offline_credentials_expiration = int, None, false -offline_failed_login_attempts = int, None, false -offline_failed_login_delay = int, None, false - -[provider] -#Available provider types -id_provider = str, None, true -auth_provider = str, None, true -access_provider = str, None, false -chpass_provider = str, None, false - -[domain] -# Options available to all domains -debug_level = int, None, false, 0 -debug_timestamps = bool, None, false -command = str, None, false -min_id = int, None, true, 1000 -max_id = int, None, false -timeout = int, None, false -enumerate = bool, None, false -cache_credentials = bool, None, true, false -store_legacy_passwords = bool, None, false -use_fully_qualified_names = bool, None, false -entry_cache_timeout = int, None, false - -# Special providers -[provider/permit] - -[provider/permit/access] - -[provider/deny] - -[provider/deny/access] - diff --git a/server/config/etc/sssd.api.d/sssd-ipa.conf b/server/config/etc/sssd.api.d/sssd-ipa.conf deleted file mode 100644 index c2a12d5a..00000000 --- a/server/config/etc/sssd.api.d/sssd-ipa.conf +++ /dev/null @@ -1,77 +0,0 @@ -[provider/ipa] -ipa_domain = str, None, true -ipa_server = str, None, true -ipa_hostname = str, None, false -ldap_uri = str, None, false -ldap_search_base = str, None, false -ldap_schema = str, None, false -ldap_default_bind_dn = str, None, false -ldap_default_authtok_type = str, None, false -ldap_default_authtok = str, None, false -ldap_network_timeout = int, None, false -ldap_opt_timeout = int, None, false -ldap_offline_timeout = int, None, false -ldap_tls_cacert = str, None, false -ldap_tls_reqcert = str, None, false -ldap_sasl_mech = str, None, false -ldap_sasl_authid = str, None, false -krb5_kdcip = str, None, false -krb5_realm = str, None, false -krb5_auth_timeout = int, None, false -ldap_krb5_keytab = str, None, false -ldap_krb5_init_creds = bool, None, false -ldap_entry_usn = str, None, false -ldap_rootdse_last_usn = str, None, false -ldap_referrals = bool, None, false - -[provider/ipa/id] -ldap_search_timeout = int, None, false -ldap_enumeration_refresh_timeout = int, None, false -ldap_purge_cache_timeout = int, None, false -ldap_id_use_start_tls = bool, None, false -ldap_user_search_base = str, None, false -ldap_user_search_scope = str, None, false -ldap_user_search_filter = str, None, false -ldap_user_object_class = str, None, false -ldap_user_name = str, None, false -ldap_user_uid_number = str, None, false -ldap_user_gid_number = str, None, false -ldap_user_gecos = str, None, false -ldap_user_homedir = str, None, false -ldap_user_shell = str, None, false -ldap_user_uuid = str, None, false -ldap_user_principal = str, None, false -ldap_user_fullname = str, None, false -ldap_user_member_of = str, None, false -ldap_user_modify_timestamp = str, None, false -ldap_user_shadow_last_change = str, None, false -ldap_user_shadow_min = str, None, false -ldap_user_shadow_max = str, None, false -ldap_user_shadow_warning = str, None, false -ldap_user_shadow_inactive = str, None, false -ldap_user_shadow_expire = str, None, false -ldap_user_shadow_flag = str, None, false -ldap_user_krb_last_pwd_change = str, None, false -ldap_user_krb_password_expiration = str, None, false -ldap_pwd_attribute = str, None, false -ldap_group_search_base = str, None, false -ldap_group_search_scope = str, None, false -ldap_group_search_filter = str, None, false -ldap_group_object_class = str, None, false -ldap_group_name = str, None, false -ldap_group_gid_number = str, None, false -ldap_group_member = str, None, false -ldap_group_uuid = str, None, false -ldap_group_modify_timestamp = str, None, false -ldap_force_upper_case_realm = bool, None, false - -[provider/ipa/auth] -krb5_ccachedir = str, None, false -krb5_ccname_template = str, None, false -krb5_keytab = str, None, false -krb5_validate = bool, None, false - -[provider/ipa/access] - -[provider/ipa/chpass] -krb5_changepw_principal = str, None, false diff --git a/server/config/etc/sssd.api.d/sssd-krb5.conf b/server/config/etc/sssd.api.d/sssd-krb5.conf deleted file mode 100644 index 7ba0ab32..00000000 --- a/server/config/etc/sssd.api.d/sssd-krb5.conf +++ /dev/null @@ -1,13 +0,0 @@ -[provider/krb5] -krb5_kdcip = str, None, true -krb5_realm = str, None, true -krb5_auth_timeout = int, None, false - -[provider/krb5/auth] -krb5_ccachedir = str, None, false -krb5_ccname_template = str, None, false -krb5_keytab = str, None, false -krb5_validate = bool, None, false - -[provider/krb5/chpass] -krb5_changepw_principal = str, None, false diff --git a/server/config/etc/sssd.api.d/sssd-ldap.conf b/server/config/etc/sssd.api.d/sssd-ldap.conf deleted file mode 100644 index 6758ab49..00000000 --- a/server/config/etc/sssd.api.d/sssd-ldap.conf +++ /dev/null @@ -1,68 +0,0 @@ -[provider/ldap] -ldap_uri = str, None, true -ldap_search_base = str, None, true -ldap_schema = str, None, true, rfc2307 -ldap_default_bind_dn = str, None, false -ldap_default_authtok_type = str, None, false -ldap_default_authtok = str, None, false -ldap_network_timeout = int, None, false -ldap_opt_timeout = int, None, false -ldap_offline_timeout = int, None, false -ldap_tls_cacert = str, None, false -ldap_tls_reqcert = str, None, false -ldap_sasl_mech = str, None, false -ldap_sasl_authid = str, None, false -krb5_kdcip = str, None, false -krb5_realm = str, None, false -ldap_krb5_keytab = str, None, false -ldap_krb5_init_creds = bool, None, false -ldap_entry_usn = str, None, false -ldap_rootdse_last_usn = str, None, false -ldap_referrals = bool, None, false - -[provider/ldap/id] -ldap_search_timeout = int, None, false -ldap_enumeration_refresh_timeout = int, None, false -ldap_purge_cache_timeout = int, None, false -ldap_id_use_start_tls = bool, None, true, false -ldap_user_search_base = str, None, false -ldap_user_search_scope = str, None, false -ldap_user_search_filter = str, None, false -ldap_user_object_class = str, None, false -ldap_user_name = str, None, false -ldap_user_uid_number = str, None, false -ldap_user_gid_number = str, None, false -ldap_user_gecos = str, None, false -ldap_user_homedir = str, None, false -ldap_user_shell = str, None, false -ldap_user_uuid = str, None, false -ldap_user_principal = str, None, false -ldap_user_fullname = str, None, false -ldap_user_member_of = str, None, false -ldap_user_modify_timestamp = str, None, false -ldap_user_shadow_last_change = str, None, false -ldap_user_shadow_min = str, None, false -ldap_user_shadow_max = str, None, false -ldap_user_shadow_warning = str, None, false -ldap_user_shadow_inactive = str, None, false -ldap_user_shadow_expire = str, None, false -ldap_user_shadow_flag = str, None, false -ldap_user_krb_last_pwd_change = str, None, false -ldap_user_krb_password_expiration = str, None, false -ldap_pwd_attribute = str, None, false -ldap_group_search_base = str, None, false -ldap_group_search_scope = str, None, false -ldap_group_search_filter = str, None, false -ldap_group_object_class = str, None, false -ldap_group_name = str, None, false -ldap_group_gid_number = str, None, false -ldap_group_member = str, None, false -ldap_group_uuid = str, None, false -ldap_group_modify_timestamp = str, None, false -ldap_force_upper_case_realm = bool, None, false - -[provider/ldap/auth] -ldap_pwd_policy = str, None, false - -[provider/ldap/chpass] - diff --git a/server/config/etc/sssd.api.d/sssd-local.conf b/server/config/etc/sssd.api.d/sssd-local.conf deleted file mode 100644 index 0686f082..00000000 --- a/server/config/etc/sssd.api.d/sssd-local.conf +++ /dev/null @@ -1,10 +0,0 @@ -[provider/local] - -[provider/local/id] -default_shell = str, None, true, /bin/bash -base_directory = str, None, true, /home - -[provider/local/auth] - -[provider/local/chpass] - diff --git a/server/config/etc/sssd.api.d/sssd-proxy.conf b/server/config/etc/sssd.api.d/sssd-proxy.conf deleted file mode 100644 index 7ecf6b33..00000000 --- a/server/config/etc/sssd.api.d/sssd-proxy.conf +++ /dev/null @@ -1,7 +0,0 @@ -[provider/proxy] - -[provider/proxy/id] -proxy_lib_name = str, None, true - -[provider/proxy/auth] -proxy_pam_target = str, None, true diff --git a/server/config/ipachangeconf.py b/server/config/ipachangeconf.py deleted file mode 100644 index ea73a9b9..00000000 --- a/server/config/ipachangeconf.py +++ /dev/null @@ -1,588 +0,0 @@ -# -# ipachangeconf - configuration file manipulation classes and functions -# partially based on authconfig code -# Copyright (c) 1999-2007 Red Hat, Inc. -# Author: Simo Sorce <ssorce@redhat.com> -# -# This is free software; you can redistribute it and/or modify it -# under the terms of the GNU General Public License as published by -# the Free Software Foundation; version 2 only -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. -# - -import fcntl -import os -import string -import time -import shutil -import re - -def openLocked(filename, perms, create = True): - fd = -1 - - flags = os.O_RDWR - if create: - flags = flags | os.O_CREAT - - try: - fd = os.open(filename, flags, perms) - fcntl.lockf(fd, fcntl.LOCK_EX) - except OSError, (errno, strerr): - if fd != -1: - try: - os.close(fd) - except OSError: - pass - raise IOError(errno, strerr) - return os.fdopen(fd, "r+") - - - #TODO: add subsection as a concept - # (ex. REALM.NAME = { foo = x bar = y } ) - #TODO: put section delimiters as separating element of the list - # so that we can process multiple sections in one go - #TODO: add a comment all but provided options as a section option -class IPAChangeConf: - - def __init__(self, name): - self.progname = name - self.indent = ("","","") - self.assign = (" = ","=") - self.dassign = self.assign[0] - self.comment = ("#",) - self.dcomment = self.comment[0] - self.eol = ("\n",) - self.deol = self.eol[0] - self.sectnamdel = ("[","]") - self.subsectdel = ("{","}") - self.backup_suffix = ".ipabkp" - - def setProgName(self, name): - self.progname = name - - def setIndent(self, indent): - if type(indent) is tuple: - self.indent = indent - elif type(indent) is str: - self.indent = (indent, ) - else: - raise ValueError, 'Indent must be a list of strings' - - def setOptionAssignment(self, assign): - if type(assign) is tuple: - self.assign = assign - else: - self.assign = (assign, ) - self.dassign = self.assign[0] - - def setCommentPrefix(self, comment): - if type(comment) is tuple: - self.comment = comment - else: - self.comment = (comment, ) - self.dcomment = self.comment[0] - - def setEndLine(self, eol): - if type(eol) is tuple: - self.eol = eol - else: - self.eol = (eol, ) - self.deol = self.eol[0] - - def setSectionNameDelimiters(self, delims): - self.sectnamdel = delims - - def setSubSectionDelimiters(self, delims): - self.subsectdel = delims - - def matchComment(self, line): - for v in self.comment: - if line.lstrip().startswith(v): - return line.lstrip()[len(v):] - return False - - def matchEmpty(self, line): - if line.strip() == "": - return True - return False - - def matchSection(self, line): - cl = "".join(line.strip().split()) - if len(self.sectnamdel) != 2: - return False - if not cl.startswith(self.sectnamdel[0]): - return False - if not cl.endswith(self.sectnamdel[1]): - return False - return cl[len(self.sectnamdel[0]):-len(self.sectnamdel[1])] - - def matchSubSection(self, line): - if self.matchComment(line): - return False - - parts = line.split(self.dassign, 1) - if len(parts) < 2: - return False - - if parts[1].strip() == self.subsectdel[0]: - return parts[0].strip() - - return False - - def matchSubSectionEnd(self, line): - if self.matchComment(line): - return False - - if line.strip() == self.subsectdel[1]: - return True - - return False - - def getSectionLine(self, section): - if len(self.sectnamdel) != 2: - return section - return self.sectnamdel[0]+section+self.sectnamdel[1]+self.deol - - def dump(self, options, level=0): - output = "" - if level >= len(self.indent): - level = len(self.indent)-1 - - for o in options: - if o['type'] == "section": - output += self.sectnamdel[0]+o['name']+self.sectnamdel[1]+self.deol - output += self.dump(o['value'], level+1) - continue - if o['type'] == "subsection": - output += self.indent[level]+o['name']+self.dassign+self.subsectdel[0]+self.deol - output += self.dump(o['value'], level+1) - output += self.indent[level]+self.subsectdel[1]+self.deol - continue - if o['type'] == "option": - output += self.indent[level]+o['name']+self.dassign+o['value']+self.deol - continue - if o['type'] == "comment": - output += self.dcomment+o['value']+self.deol - continue - if o['type'] == "empty": - output += self.deol - continue - raise SyntaxError, 'Unknown type: ['+o['type']+']' - - return output - - def parseLine(self, line): - - if self.matchEmpty(line): - return {'name':'empty', 'type':'empty'} - - value = self.matchComment(line) - if value: - return {'name':'comment', 'type':'comment', 'value':value.rstrip()} - - parts = line.split(self.dassign, 1) - if len(parts) < 2: - raise SyntaxError, 'Syntax Error: Unknown line format' - - return {'name':parts[0].strip(), 'type':'option', 'value':parts[1].rstrip()} - - def findOpts(self, opts, type, name, exclude_sections=False): - - num = 0 - for o in opts: - if o['type'] == type and o['name'] == name: - return (num, o) - if exclude_sections and (o['type'] == "section" or o['type'] == "subsection"): - return (num, None) - num += 1 - return (num, None) - - def commentOpts(self, inopts, level = 0): - - opts = [] - - if level >= len(self.indent): - level = len(self.indent)-1 - - for o in inopts: - if o['type'] == 'section': - no = self.commentOpts(o['value'], level+1) - val = self.dcomment+self.sectnamdel[0]+o['name']+self.sectnamdel[1] - opts.append({'name':'comment', 'type':'comment', 'value':val}) - for n in no: - opts.append(n) - continue - if o['type'] == 'subsection': - no = self.commentOpts(o['value'], level+1) - val = self.indent[level]+o['name']+self.dassign+self.subsectdel[0] - opts.append({'name':'comment', 'type':'comment', 'value':val}) - for n in no: - opts.append(n) - val = self.indent[level]+self.subsectdel[1] - opts.append({'name':'comment', 'type':'comment', 'value':val}) - continue - if o['type'] == 'option': - val = self.indent[level]+o['name']+self.dassign+o['value'] - opts.append({'name':'comment', 'type':'comment', 'value':val}) - continue - if o['type'] == 'comment': - opts.append(o) - continue - if o['type'] == 'empty': - opts.append({'name':'comment', 'type':'comment', 'value':''}) - continue - raise SyntaxError, 'Unknown type: ['+o['type']+']' - - return opts - - def mergeOld(self, oldopts, newopts): - - opts = [] - - for o in oldopts: - if o['type'] == "section" or o['type'] == "subsection": - (num, no) = self.findOpts(newopts, o['type'], o['name']) - if not no: - opts.append(o) - continue - if no['action'] == "set": - mo = self.mergeOld(o['value'], no['value']) - opts.append({'name':o['name'], 'type':o['type'], 'value':mo}) - continue - if no['action'] == "comment": - co = self.commentOpts(o['value']) - for c in co: - opts.append(c) - continue - if no['action'] == "remove": - continue - raise SyntaxError, 'Unknown action: ['+no['action']+']' - - if o['type'] == "comment" or o['type'] == "empty": - opts.append(o) - continue - - if o['type'] == "option": - (num, no) = self.findOpts(newopts, 'option', o['name'], True) - if not no: - opts.append(o) - continue - if no['action'] == 'comment' or no['action'] == 'remove': - if no['value'] != None and o['value'] != no['value']: - opts.append(o) - continue - if no['action'] == 'comment': - opts.append({'name':'comment', 'type':'comment', - 'value':self.dcomment+o['name']+self.dassign+o['value']}) - continue - if no['action'] == 'set': - opts.append(no) - continue - raise SyntaxError, 'Unknown action: ['+o['action']+']' - - raise SyntaxError, 'Unknown type: ['+o['type']+']' - - return opts - - def mergeNew(self, opts, newopts): - - cline = 0 - - for no in newopts: - - if no['type'] == "section" or no['type'] == "subsection": - (num, o) = self.findOpts(opts, no['type'], no['name']) - if not o: - if no['action'] == 'set': - opts.append(no) - continue - if no['action'] == "set": - self.mergeNew(o['value'], no['value']) - continue - cline = num+1 - continue - - if no['type'] == "option": - (num, o) = self.findOpts(opts, no['type'], no['name'], True) - if not o: - if no['action'] == 'set': - opts.append(no) - continue - cline = num+1 - continue - - if no['type'] == "comment" or no['type'] == "empty": - opts.insert(cline, no) - cline += 1 - continue - - raise SyntaxError, 'Unknown type: ['+no['type']+']' - - - def merge(self, oldopts, newopts): - - #Use a two pass strategy - #First we create a new opts tree from oldopts removing/commenting - # the options as indicated by the contents of newopts - #Second we fill in the new opts tree with options as indicated - # in the newopts tree (this is becaus eentire (sub)sections may - # exist in the newopts that do not exist in oldopts) - - opts = self.mergeOld(oldopts, newopts) - self.mergeNew(opts, newopts) - return opts - - #TODO: Make parse() recursive? - def parse(self, f): - - opts = [] - sectopts = [] - section = None - subsectopts = [] - subsection = None - curopts = opts - fatheropts = opts - - # Read in the old file. - for line in f: - - # It's a section start. - value = self.matchSection(line) - if value: - if section is not None: - opts.append({'name':section, 'type':'section', 'value':sectopts}) - sectopts = [] - curopts = sectopts - fatheropts = sectopts - section = value - continue - - # It's a subsection start. - value = self.matchSubSection(line) - if value: - if subsection is not None: - raise SyntaxError, 'nested subsections are not supported yet' - subsectopts = [] - curopts = subsectopts - subsection = value - continue - - value = self.matchSubSectionEnd(line) - if value: - if subsection is None: - raise SyntaxError, 'Unmatched end subsection terminator found' - fatheropts.append({'name':subsection, 'type':'subsection', 'value':subsectopts}) - subsection = None - curopts = fatheropts - continue - - # Copy anything else as is. - curopts.append(self.parseLine(line)) - - #Add last section if any - if len(sectopts) is not 0: - opts.append({'name':section, 'type':'section', 'value':sectopts}) - - return opts - - # Write settings to configuration file - # file is a path - # options is a set of dictionaries in the form: - # [{'name': 'foo', 'value': 'bar', 'action': 'set/comment'}] - # section is a section name like 'global' - def changeConf(self, file, newopts): - autosection = False - savedsection = None - done = False - output = "" - f = None - try: - #Do not catch an unexisting file error, we want to fail in that case - shutil.copy2(file, file+self.backup_suffix) - - f = openLocked(file, 0644) - - oldopts = self.parse(f) - - options = self.merge(oldopts, newopts) - - output = self.dump(options) - - # Write it out and close it. - f.seek(0) - f.truncate(0) - f.write(output) - finally: - try: - if f: - f.close() - except IOError: - pass - return True - - # Write settings to new file, backup old - # file is a path - # options is a set of dictionaries in the form: - # [{'name': 'foo', 'value': 'bar', 'action': 'set/comment'}] - # section is a section name like 'global' - def newConf(self, file, options): - autosection = False - savedsection = None - done = False - output = "" - f = None - try: - try: - shutil.copy2(file, file+self.backup_suffix) - except IOError, err: - if err.errno == 2: - # The orign file did not exist - pass - - f = openLocked(file, 0644) - - # Trunkate - f.seek(0) - f.truncate(0) - - output = self.dump(options) - - f.write(output) - finally: - try: - if f: - f.close() - except IOError: - pass - return True - -# A SSSD-specific subclass of IPAChangeConf -class SSSDChangeConf(IPAChangeConf): - OPTCRE = re.compile( - r'(?P<option>[^:=\s][^:=]*)' # very permissive! - r'\s*=\s*' # any number of space/tab, - # followed by separator - # followed by any # space/tab - r'(?P<value>.*)$' # everything up to eol - ) - - def __init__(self): - IPAChangeConf.__init__(self, "SSSD") - self.comment = ("#",";") - self.backup_suffix = ".bak" - self.opts = [] - - def parseLine(self, line): - """ - Overrides IPAChangeConf parseLine so that lines are splitted - using any separator in self.assign, not just the default one - """ - - if self.matchEmpty(line): - return {'name':'empty', 'type':'empty'} - - value = self.matchComment(line) - if value: - return {'name':'comment', 'type':'comment', 'value':value.rstrip()} - - mo = self.OPTCRE.match(line) - if not mo: - raise SyntaxError, 'Syntax Error: Unknown line format' - - try: - name, value = mo.group('option', 'value') - except IndexError: - raise SyntaxError, 'Syntax Error: Unknown line format' - - return {'name':name.strip(), 'type':'option', 'value':value.strip()} - - def readfp(self, fd): - self.opts.extend(self.parse(fd)) - - def read(self, filename): - fd = open(filename, 'r') - self.readfp(fd) - fd.close() - - def get(self, section, name): - index, item = self.get_option_index(section, name) - if item: - return item['value'] - - def set(self, section, name, value): - modkw = { 'type' : 'section', - 'name' : section, - 'value' : [{ - 'type' : 'option', - 'name' : name, - 'value' : value, - 'action': 'set', - }], - 'action': 'set', - } - self.opts = self.merge(self.opts, [ modkw ]) - - def add_section(self, name, optkw, index=0): - optkw.append({'type':'empty', 'value':'empty'}) - addkw = { 'type' : 'section', - 'name' : name, - 'value' : optkw, - } - self.opts.insert(index, addkw) - - def delete_section(self, name): - self.delete_option('section', name) - - def sections(self): - return [ o for o in self.opts if o['type'] == 'section' ] - - def has_section(self, section): - return len([ o for o in self.opts if o['type'] == 'section' if o['name'] == section ]) > 0 - - def options(self, section): - for opt in self.opts: - if opt['type'] == 'section' and opt['name'] == section: - return opt['value'] - - def delete_option(self, type, name, exclude_sections=False): - return self.delete_option_subtree(self.opts, type, name) - - def delete_option_subtree(self, subtree, type, name, exclude_sections=False): - index, item = self.findOpts(subtree, type, name, exclude_sections) - if item: - del subtree[index] - return index - - def has_option(self, section, name): - index, item = self.get_option_index(section, name) - if index != -1 and item != None: - return True - return False - - def strip_comments_empty(self, optlist): - retlist = [] - for opt in optlist: - if opt['type'] in ('comment', 'empty'): - continue - retlist.append(opt) - return retlist - - def get_option_index(self, parent_name, name, type='option'): - subtree = None - if parent_name: - pindex, pdata = self.findOpts(self.opts, 'section', parent_name) - if not pdata: - return (-1, None) - subtree = pdata['value'] - else: - subtree = self.opts - return self.findOpts(subtree, type, name) - diff --git a/server/config/setup.py b/server/config/setup.py deleted file mode 100644 index 46a81060..00000000 --- a/server/config/setup.py +++ /dev/null @@ -1,35 +0,0 @@ -# Authors: -# Stephen Gallagher <sgallagh@redhat.com> -# -# Copyright (C) 2009 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; version 2 only -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -""" -Python-level packaging using distutils. -""" - -from distutils.core import setup - -setup( - name='SSSDConfig', - version='1', - license='GPLv3+', - url='http://fedorahosted.org/sssd', - py_modules=[ - 'SSSDConfig', - 'ipachangeconf', - ], -) diff --git a/server/config/testconfigs/noparse.api.conf b/server/config/testconfigs/noparse.api.conf deleted file mode 100644 index 50651001..00000000 --- a/server/config/testconfigs/noparse.api.conf +++ /dev/null @@ -1,7 +0,0 @@ -# Format: -# option = type, subtype[, default] - -[service] -# Options available to all services -debug_level = int, None, 0 -command
\ No newline at end of file diff --git a/server/config/testconfigs/sssd-badversion.conf b/server/config/testconfigs/sssd-badversion.conf deleted file mode 100644 index 75d8c484..00000000 --- a/server/config/testconfigs/sssd-badversion.conf +++ /dev/null @@ -1,42 +0,0 @@ -[nss] -nss_filter_groups = root -nss_entry_negative_timeout = 15 -debug_level = 0 -nss_filter_users_in_groups = true -nss_filter_users = root -nss_entry_cache_no_wait_timeout = 60 -nss_entry_cache_timeout = 600 -nss_enum_cache_timeout = 120 - -[sssd] -services = nss, pam -reconnection_retries = 3 -domains = LOCAL, IPA -config_file_version = 1 - -[domain/PROXY] -id_provider = proxy -auth_provider = proxy -debug_level = 0 - -[domain/IPA] -id_provider = ldap -auth_provider = krb5 -debug_level = 0 - -[domain/LOCAL] -id_provider = local -auth_provider = local -debug_level = 0 - -[domain/LDAP] -id_provider = ldap -auth_provider = ldap -debug_level = 0 - -[pam] -debug_level = 0 - -[dp] -debug_level = 0 - diff --git a/server/config/testconfigs/sssd-invalid-badbool.conf b/server/config/testconfigs/sssd-invalid-badbool.conf deleted file mode 100644 index 25c27f49..00000000 --- a/server/config/testconfigs/sssd-invalid-badbool.conf +++ /dev/null @@ -1,43 +0,0 @@ -[nss] -nss_filter_groups = root -nss_entry_negative_timeout = 15 -debug_level = 0 -nss_filter_users_in_groups = true -nss_filter_users = root -nss_entry_cache_no_wait_timeout = 60 -nss_entry_cache_timeout = 600 -nss_enum_cache_timeout = 120 - -[sssd] -services = nss, pam -reconnection_retries = 3 -domains = LOCAL, IPA -config_file_version = 2 - -[domain/PROXY] -id_provider = proxy -auth_provider = proxy -debug_level = 0 - -[domain/IPA] -id_provider = ldap -ldap_id_use_start_tls = Fal -auth_provider = krb5 -debug_level = 0 - -[domain/LOCAL] -id_provider = local -auth_provider = local -debug_level = 0 - -[domain/LDAP] -id_provider = ldap -auth_provider=ldap -debug_level = 0 - -[pam] -debug_level = 0 - -[dp] -debug_level = 0 - diff --git a/server/config/testconfigs/sssd-invalid.conf b/server/config/testconfigs/sssd-invalid.conf deleted file mode 100644 index 3a84ae11..00000000 --- a/server/config/testconfigs/sssd-invalid.conf +++ /dev/null @@ -1,3 +0,0 @@ -[sssd] -services -config_file_version = 2 diff --git a/server/config/testconfigs/sssd-noversion.conf b/server/config/testconfigs/sssd-noversion.conf deleted file mode 100644 index 71af85cc..00000000 --- a/server/config/testconfigs/sssd-noversion.conf +++ /dev/null @@ -1,41 +0,0 @@ -[nss] -nss_filter_groups = root -nss_entry_negative_timeout = 15 -debug_level = 0 -nss_filter_users_in_groups = true -nss_filter_users = root -nss_entry_cache_no_wait_timeout = 60 -nss_entry_cache_timeout = 600 -nss_enum_cache_timeout = 120 - -[sssd] -services = nss, pam -reconnection_retries = 3 -domains = LOCAL, IPA - -[domain/PROXY] -id_provider = proxy -auth_provider = proxy -debug_level = 0 - -[domain/IPA] -id_provider = ldap -auth_provider = krb5 -debug_level = 0 - -[domain/LOCAL] -id_provider = local -auth_provider = local -debug_level = 0 - -[domain/LDAP] -id_provider = ldap -auth_provider = ldap -debug_level = 0 - -[pam] -debug_level = 0 - -[dp] -debug_level = 0 - diff --git a/server/config/testconfigs/sssd-valid.conf b/server/config/testconfigs/sssd-valid.conf deleted file mode 100644 index 79016eb4..00000000 --- a/server/config/testconfigs/sssd-valid.conf +++ /dev/null @@ -1,43 +0,0 @@ -[nss] -nss_filter_groups = root -nss_entry_negative_timeout = 15 -debug_level = 0 -nss_filter_users_in_groups = true -nss_filter_users = root -nss_entry_cache_no_wait_timeout = 60 -nss_entry_cache_timeout = 600 -nss_enum_cache_timeout = 120 - -[sssd] -services = nss, pam -reconnection_retries = 3 -domains = LOCAL, IPA -config_file_version = 2 -debug_timestamps = False - -[domain/PROXY] -id_provider = proxy -auth_provider = proxy -debug_level = 0 - -[domain/IPA] -id_provider = ldap -auth_provider = krb5 -debug_level = 0 - -[domain/LOCAL] -id_provider = local -auth_provider = local -debug_level = 0 - -[domain/LDAP] -id_provider = ldap -auth_provider=ldap -debug_level = 0 - -[pam] -debug_level = 0 - -[dp] -debug_level = 0 - diff --git a/server/config/upgrade_config.py b/server/config/upgrade_config.py deleted file mode 100644 index d47fcd38..00000000 --- a/server/config/upgrade_config.py +++ /dev/null @@ -1,405 +0,0 @@ -#!/usr/bin/python -#coding=utf-8 - -# SSSD -# -# upgrade_config.py -# -# Copyright (C) Jakub Hrozek <jhrozek@redhat.com> 2009 -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import sys -import shutil -import traceback -from optparse import OptionParser - -from ipachangeconf import openLocked -from ipachangeconf import SSSDChangeConf - -class SSSDConfigFile(SSSDChangeConf): - def __init__(self, filename): - SSSDChangeConf.__init__(self) - self.filename = filename - - f = openLocked(self.filename, 0600, False) - self.opts = self.parse(f) - f.close() - - def _backup_file(self, file_name): - " Copy the file we operate on to a backup location " - shutil.copy(file_name, file_name + self.backup_suffix) - # make sure we don't leak data, force permissions on the backup - os.chmod(file_name + self.backup_suffix, 0600) - - def get_version(self): - ver = self.get_option_index('sssd', 'config_file_version')[1] - if not ver: - return 1 - try: - return int(ver['value']) - except ValueError: - raise SyntaxError, 'config_file_version not an integer' - - def rename_opts(self, parent_name, rename_kw, type='option'): - for new_name, old_name in rename_kw.items(): - index, item = self.get_option_index(parent_name, old_name, type) - if item: - item['name'] = new_name - - def _do_v2_changes(self): - # remove Data Provider - srvlist = self.get_option_index('sssd', 'services')[1] - if srvlist: - services = [ srv.strip() for srv in srvlist['value'].split(',') ] - if 'dp' in services: - services.remove('dp') - srvlist['value'] = ", ".join([srv for srv in services]) - self.delete_option('section', 'dp') - - # remove magic_private_groups from all domains - for domain in [ s for s in self.sections() if s['name'].startswith("domain/") ]: - self.delete_option_subtree(domain['value'], 'option', 'magic_private_groups') - - def _update_option(self, to_section_name, from_section_name, opts): - to_section = [ s for s in self.sections() if s['name'].strip() == to_section_name ] - from_section = [ s for s in self.sections() if s['name'].strip() == from_section_name ] - - if len(to_section) > 0 and len(from_section) > 0: - vals = to_section[0]['value'] - for o in [one_opt for one_opt in from_section[0]['value'] if one_opt['name'] in opts]: - updated = False - for v in vals: - if v['type'] == 'empty': - continue - # if already in list, just update - if o['name'] == v['name']: - o['value'] = v['value'] - updated = True - # not in list, add there - if not updated: - vals.insert(0, { 'name' : o['name'], 'type' : o['type'], 'value' : o['value'] }) - - def _migrate_enumerate(self, domain): - " Enumerate was special as it turned into bool from (0,1,2,3) enum " - enum = self.findOpts(domain, 'option', 'enumerate')[1] - if enum: - if enum['value'].upper() not in ['TRUE', 'FALSE']: - try: - enum['value'] = int(enum['value']) - except ValueError: - raise ValueError('Cannot convert value %s in domain %s' % (enum['value'], domain['name'])) - - if enum['value'] == 0: - enum['value'] = 'FALSE' - elif enum['value'] > 0: - enum['value'] = 'TRUE' - else: - raise ValueError('Cannot convert value %s in domain %s' % (enum['value'], domain['name'])) - - def _migrate_domain(self, domain): - # rename the section - domain['name'] = domain['name'].strip().replace('domains', 'domain') - - # Generic options - new:old - generic_kw = { 'min_id' : 'minId', - 'max_id': 'maxId', - 'timeout': 'timeout', - 'magic_private_groups' : 'magicPrivateGroups', - 'cache_credentials' : 'cache-credentials', - 'id_provider' : 'provider', - 'auth_provider' : 'auth-module', - 'access_provider' : 'access-module', - 'chpass_provider' : 'chpass-module', - 'use_fully_qualified_names' : 'useFullyQualifiedNames', - 'store_legacy_passwords' : 'store-legacy-passwords', - } - # Proxy options - proxy_kw = { 'proxy_pam_target' : 'pam-target', - 'proxy_lib_name' : 'libName', - } - # LDAP options - new:old - ldap_kw = { 'ldap_uri' : 'ldapUri', - 'ldap_schema' : 'ldapSchema', - 'ldap_default_bind_dn' : 'defaultBindDn', - 'ldap_default_authtok_type' : 'defaultAuthtokType', - 'ldap_default_authtok' : 'defaultAuthtok', - 'ldap_user_search_base' : 'userSearchBase', - 'ldap_user_search_scope' : 'userSearchScope', - 'ldap_user_search_filter' : 'userSearchFilter', - 'ldap_user_object_class' : 'userObjectClass', - 'ldap_user_name' : 'userName', - 'ldap_user_pwd' : 'userPassword', - 'ldap_user_uid_number' : 'userUidNumber', - 'ldap_user_gid_number' : 'userGidNumber', - 'ldap_user_gecos' : 'userGecos', - 'ldap_user_home_directory' : 'userHomeDirectory', - 'ldap_user_shell' : 'userShell', - 'ldap_user_uuid' : 'userUUID', - 'ldap_user_principal' : 'userPrincipal', - 'ldap_force_upper_case_realm' : 'force_upper_case_realm', - 'ldap_user_fullname' : 'userFullname', - 'ldap_user_member_of' : 'userMemberOf', - 'ldap_user_modify_timestamp' : 'modifyTimestamp', - 'ldap_group_search_base' : 'groupSearchBase', - 'ldap_group_search_scope' : 'groupSearchScope', - 'ldap_group_search_filter' : 'groupSearchFilter', - 'ldap_group_object_class' : 'groupObjectClass', - 'ldap_group_name' : 'groupName', - 'ldap_group_pwd' : 'userPassword', - 'ldap_group_gid_number' : 'groupGidNumber', - 'ldap_group_member' : 'groupMember', - 'ldap_group_uuid' : 'groupUUID', - 'ldap_group_modify_timestamp' : 'modifyTimestamp', - 'ldap_network_timeout' : 'network_timeout', - 'ldap_offline_timeout' : 'offline_timeout', - 'ldap_enumeration_refresh_timeout' : 'enumeration_refresh_timeout', - 'ldap_stale_time' : 'stale_time', - 'ldap_opt_timeout' : 'opt_timeout', - 'ldap_tls_reqcert' : 'tls_reqcert', - } - krb5_kw = { 'krb5_kdcip' : 'krb5KDCIP', - 'krb5_realm' : 'krb5REALM', - 'krb5_try_simple_upn' : 'krb5try_simple_upn', - 'krb5_changepw_principal' : 'krb5changepw_principle', - 'krb5_ccachedir' : 'krb5ccache_dir', - 'krb5_auth_timeout' : 'krb5auth_timeout', - 'krb5_ccname_template' : 'krb5ccname_template', - } - user_defaults_kw = { 'default_shell' : 'defaultShell', - 'base_directory' : 'baseDirectory', - } - - self._migrate_enumerate(domain['value']) - self.rename_opts(domain['name'], generic_kw) - self.rename_opts(domain['name'], proxy_kw) - self.rename_opts(domain['name'], ldap_kw) - self.rename_opts(domain['name'], krb5_kw) - - # remove obsolete libPath option - self.delete_option_subtree(domain['value'], 'option', 'libPath') - - # configuration files before 0.5.0 did not enforce provider= in local domains - # it did special-case by domain name (LOCAL) - prvindex, prv = self.findOpts(domain['value'], 'option', 'id_provider') - if not prv and domain['name'] == 'domain/LOCAL': - prv = { 'type' : 'option', - 'name' : 'id_provider', - 'value' : 'local', - } - domain['value'].insert(0, prv) - - # if domain was local, update with parameters from [user_defaults] - if prv['value'] == 'local': - self._update_option(domain['name'], 'user_defaults', user_defaults_kw.values()) - self.delete_option('section', 'user_defaults') - self.rename_opts(domain['name'], user_defaults_kw) - - # if domain had provider = files, unroll that into provider=proxy, proxy_lib_name=files - if prv['value'] == 'files': - prv['value'] = 'proxy' - libkw = { 'type' : 'option', - 'name' : 'proxy_lib_name', - 'value' : 'files', - } - domain['value'].insert(prvindex+1, libkw) - - def _migrate_domains(self): - for domain in [ s for s in self.sections() if s['name'].startswith("domains/") ]: - self._migrate_domain(domain) - - def _update_if_exists(self, opt, to_name, from_section, from_name): - index, item = self.get_option_index(from_section, from_name) - if item: - item['name'] = to_name - opt.append(item) - - def _migrate_services(self): - # [service] - options common to all services, no section as in v1 - service_kw = { 'reconnection_retries' : 'reconnection_retries', - 'debug_level' : 'debug-level', - 'debug_timestamps' : 'debug-timestamps', - 'command' : 'command', - 'timeout' : 'timeout', - } - - # rename services sections - names_kw = { 'nss' : 'services/nss', - 'pam' : 'services/pam', - 'dp' : 'services/dp', - } - self.rename_opts(None, names_kw, 'section') - - # [sssd] - monitor service - sssd_kw = [ - { 'type' : 'option', - 'name' : 'config_file_version', - 'value' : '2', - 'action': 'set', - } - ] - self._update_if_exists(sssd_kw, 'domains', - 'domains', 'domains') - self._update_if_exists(sssd_kw, 'services', - 'services', 'activeServices') - self._update_if_exists(sssd_kw, 'sbus_timeout', - 'services/monitor', 'sbusTimeout') - self._update_if_exists(sssd_kw, 're_expression', - 'names', 're-expression') - self._update_if_exists(sssd_kw, 're_expression', - 'names', 'full-name-format') - self.add_section('sssd', sssd_kw) - # update from general services section and monitor - self._update_option('sssd', 'services', service_kw.values()) - self._update_option('sssd', 'services/monitor', service_kw.values()) - - # [nss] - Name service - nss_kw = { 'enum_cache_timeout' : 'EnumCacheTimeout', - 'entry_cache_timeout' : 'EntryCacheTimeout', - 'entry_cache_nowait_timeout' : 'EntryCacheNoWaitRefreshTimeout', - 'entry_negative_timeout ' : 'EntryNegativeTimeout', - 'filter_users' : 'filterUsers', - 'filter_groups' : 'filterGroups', - 'filter_users_in_groups' : 'filterUsersInGroups', - } - nss_kw.update(service_kw) - self._update_option('nss', 'services', service_kw.values()) - self.rename_opts('nss', nss_kw) - - # [pam] - Authentication service - pam_kw = {} - pam_kw.update(service_kw) - self._update_option('pam', 'services', service_kw.values()) - self.rename_opts('pam', pam_kw) - - # remove obsolete sections - self.delete_option('section', 'services') - self.delete_option('section', 'names') - self.delete_option('section', 'domains') - self.delete_option('section', 'services/monitor') - - def v2_changes(self, out_file_name, backup=True): - # read in the old file, make backup if needed - if backup: - self._backup_file(self.filename) - - self._do_v2_changes() - - # all done, write the file - of = open(out_file_name, "wb") - output = self.dump(self.opts) - of.write(output) - of.close() - # make sure it has the right permissions too - os.chmod(out_file_name, 0600) - - def upgrade_v2(self, out_file_name, backup=True): - # read in the old file, make backup if needed - if backup: - self._backup_file(self.filename) - - # do the migration to v2 format - # do the upgrade - self._migrate_services() - self._migrate_domains() - # also include any changes in the v2 format - self._do_v2_changes() - - # all done, write the file - of = open(out_file_name, "wb") - output = self.dump(self.opts) - of.write(output) - of.close() - # make sure it has the right permissions too - os.chmod(out_file_name, 0600) - -def parse_options(): - parser = OptionParser() - parser.add_option("-f", "--file", - dest="filename", default="/etc/sssd/sssd.conf", - help="Set input file to FILE", metavar="FILE") - parser.add_option("-o", "--outfile", - dest="outfile", default=None, - help="Set output file to OUTFILE", metavar="OUTFILE") - parser.add_option("", "--no-backup", action="store_false", - dest="backup", default=True, - help="""Do not provide backup file after conversion. -The script copies the original file with the suffix .bak -by default""") - parser.add_option("-v", "--verbose", action="store_true", - dest="verbose", default=False, - help="Be verbose") - (options, args) = parser.parse_args() - if len(args) > 0: - print >>sys.stderr, "Stray arguments: %s" % ' '.join([a for a in args]) - return None - - # do the conversion in place by default - if not options.outfile: - options.outfile = options.filename - - return options - -def verbose(msg, verbose): - if verbose: - print msg - -def main(): - options = parse_options() - if not options: - print >>sys.stderr, "Cannot parse options" - return 1 - - try: - config = SSSDConfigFile(options.filename) - except SyntaxError: - verbose(traceback.format_exc(), options.verbose) - print >>sys.stderr, "Cannot parse config file %s" % options.filename - return 1 - except Exception, e: - print "ERROR: %s" % e - verbose(traceback.format_exc(), options.verbose) - return 1 - - # make sure we keep strict settings when creating new files - os.umask(0077) - - version = config.get_version() - if version == 2: - verbose("Looks like v2, only checking changes", options.verbose) - try: - config.v2_changes(options.outfile, options.backup) - except Exception, e: - print "ERROR: %s" % e - verbose(traceback.format_exc(), options.verbose) - return 1 - elif version == 1: - verbose("Looks like v1, performing full upgrade", options.verbose) - try: - config.upgrade_v2(options.outfile, options.backup) - except Exception, e: - print "ERROR: %s" % e - verbose(traceback.format_exc(), options.verbose) - return 1 - else: - print >>sys.stderr, "Can only upgrade from v1 to v2, file %s looks like version %d" % (options.filename, config.get_version()) - return 1 - - return 0 - -if __name__ == "__main__": - ret = main() - sys.exit(ret) - |