summaryrefslogtreecommitdiff
path: root/source4/scripting/python/samba/samba3/__init__.py
diff options
context:
space:
mode:
authorAmitay Isaacs <amitay@gmail.com>2011-08-25 17:10:23 +1000
committerAndrew Bartlett <abartlet@samba.org>2011-08-26 10:06:33 +1000
commitd2536b31200106ad0bbea60abd3e654c23540e6d (patch)
treee040dc9a7b5d150b34b9fe17f2c3296a0569e1d4 /source4/scripting/python/samba/samba3/__init__.py
parent17c74e5dfd02df7e935171422477dea3f1bce057 (diff)
downloadsamba-d2536b31200106ad0bbea60abd3e654c23540e6d.tar.gz
samba-d2536b31200106ad0bbea60abd3e654c23540e6d.tar.bz2
samba-d2536b31200106ad0bbea60abd3e654c23540e6d.zip
py-samba3: Use passdb/param wrapper for samba3 module
Instead of parsing samba3 database files (password, group mapping, account policy, secrets), use passdb python wrapper. Similarly for parsing configuration, use samba3 param python wrapper. Other databases (idmap, registry, wins) are still parsed in python. Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Diffstat (limited to 'source4/scripting/python/samba/samba3/__init__.py')
-rw-r--r--source4/scripting/python/samba/samba3/__init__.py454
1 files changed, 23 insertions, 431 deletions
diff --git a/source4/scripting/python/samba/samba3/__init__.py b/source4/scripting/python/samba/samba3/__init__.py
index 385d9331ec..dd2f927aa4 100644
--- a/source4/scripting/python/samba/samba3/__init__.py
+++ b/source4/scripting/python/samba/samba3/__init__.py
@@ -26,6 +26,9 @@ import os
import struct
import tdb
+import passdb
+import param as s3param
+
def fetch_uint32(tdb, key):
try:
@@ -125,74 +128,6 @@ class Registry(TdbDatabase):
return ret
-class PolicyDatabase(TdbDatabase):
- """Samba 3 Account Policy database reader."""
- def __init__(self, file):
- """Open a policy database
-
- :param file: Path to the file to open.
- """
- super(PolicyDatabase, self).__init__(file)
- self.min_password_length = fetch_uint32(self.tdb, "min password length\x00")
- self.password_history = fetch_uint32(self.tdb, "password history\x00")
- self.user_must_logon_to_change_password = fetch_uint32(self.tdb, "user must logon to change pasword\x00")
- self.maximum_password_age = fetch_uint32(self.tdb, "maximum password age\x00")
- self.minimum_password_age = fetch_uint32(self.tdb, "minimum password age\x00")
- self.lockout_duration = fetch_uint32(self.tdb, "lockout duration\x00")
- self.reset_count_minutes = fetch_uint32(self.tdb, "reset count minutes\x00")
- self.bad_lockout_minutes = fetch_uint32(self.tdb, "bad lockout minutes\x00")
- self.disconnect_time = fetch_int32(self.tdb, "disconnect time\x00")
- self.refuse_machine_password_change = fetch_uint32(self.tdb, "refuse machine password change\x00")
-
- # FIXME: Read privileges as well
-
-
-GROUPDB_DATABASE_VERSION_V1 = 1 # native byte format.
-GROUPDB_DATABASE_VERSION_V2 = 2 # le format.
-
-GROUP_PREFIX = "UNIXGROUP/"
-
-# Alias memberships are stored reverse, as memberships. The performance
-# critical operation is to determine the aliases a SID is member of, not
-# listing alias members. So we store a list of alias SIDs a SID is member of
-# hanging of the member as key.
-MEMBEROF_PREFIX = "MEMBEROF/"
-
-class GroupMappingDatabase(TdbDatabase):
- """Samba 3 group mapping database reader."""
- def _check_version(self):
- assert fetch_int32(self.tdb, "INFO/version\x00") in (GROUPDB_DATABASE_VERSION_V1, GROUPDB_DATABASE_VERSION_V2)
-
- def groupsids(self):
- """Retrieve the SIDs for the groups in this database.
-
- :return: List with sids as strings.
- """
- for k in self.tdb.iterkeys():
- if k.startswith(GROUP_PREFIX):
- yield k[len(GROUP_PREFIX):].rstrip("\0")
-
- def get_group(self, sid):
- """Retrieve the group mapping information for a particular group.
-
- :param sid: SID of the group
- :return: None if the group can not be found, otherwise
- a tuple with gid, sid_name_use, the NT name and comment.
- """
- data = self.tdb.get("%s%s\0" % (GROUP_PREFIX, sid))
- if data is None:
- return data
- (gid, sid_name_use) = struct.unpack("<lL", data[0:8])
- (nt_name, comment, _) = data[8:].split("\0")
- return (gid, sid_name_use, nt_name, comment)
-
- def aliases(self):
- """Retrieve the aliases in this database."""
- for k in self.tdb.iterkeys():
- if k.startswith(MEMBEROF_PREFIX):
- yield k[len(MEMBEROF_PREFIX):].rstrip("\0")
-
-
# High water mark keys
IDMAP_HWM_GROUP = "GROUP HWM\0"
IDMAP_HWM_USER = "USER HWM\0"
@@ -229,6 +164,11 @@ class IdmapDatabase(TdbDatabase):
yield int(k[len(IDMAP_GROUP_PREFIX):].rstrip("\0"))
def get_sid(self, xid, id_type):
+ """Retrive SID associated with a particular id and type.
+
+ :param xid: UID or GID to retrive SID for.
+ :param id_type: Type of id specified - 'UID' or 'GID'
+ """
data = self.tdb.get("%s %s\0" % (id_type, str(xid)))
if data is None:
return data
@@ -354,266 +294,6 @@ class Shares(object):
return self.lp.__iter__()
-ACB_DISABLED = 0x00000001
-ACB_HOMDIRREQ = 0x00000002
-ACB_PWNOTREQ = 0x00000004
-ACB_TEMPDUP = 0x00000008
-ACB_NORMAL = 0x00000010
-ACB_MNS = 0x00000020
-ACB_DOMTRUST = 0x00000040
-ACB_WSTRUST = 0x00000080
-ACB_SVRTRUST = 0x00000100
-ACB_PWNOEXP = 0x00000200
-ACB_AUTOLOCK = 0x00000400
-ACB_ENC_TXT_PWD_ALLOWED = 0x00000800
-ACB_SMARTCARD_REQUIRED = 0x00001000
-ACB_TRUSTED_FOR_DELEGATION = 0x00002000
-ACB_NOT_DELEGATED = 0x00004000
-ACB_USE_DES_KEY_ONLY = 0x00008000
-ACB_DONT_REQUIRE_PREAUTH = 0x00010000
-ACB_PW_EXPIRED = 0x00020000
-ACB_NO_AUTH_DATA_REQD = 0x00080000
-
-acb_info_mapping = {
- 'N': ACB_PWNOTREQ, # 'N'o password.
- 'D': ACB_DISABLED, # 'D'isabled.
- 'H': ACB_HOMDIRREQ, # 'H'omedir required.
- 'T': ACB_TEMPDUP, # 'T'emp account.
- 'U': ACB_NORMAL, # 'U'ser account (normal).
- 'M': ACB_MNS, # 'M'NS logon user account. What is this ?
- 'W': ACB_WSTRUST, # 'W'orkstation account.
- 'S': ACB_SVRTRUST, # 'S'erver account.
- 'L': ACB_AUTOLOCK, # 'L'ocked account.
- 'X': ACB_PWNOEXP, # No 'X'piry on password
- 'I': ACB_DOMTRUST, # 'I'nterdomain trust account.
- ' ': 0
- }
-
-def decode_acb(text):
- """Decode a ACB field.
-
- :param text: ACB text
- :return: integer with flags set.
- """
- assert not "[" in text and not "]" in text
- ret = 0
- for x in text:
- ret |= acb_info_mapping[x]
- return ret
-
-
-class SAMUser(object):
- """Samba 3 SAM User.
-
- :note: Unknown or unset fields are set to None.
- """
- def __init__(self, name, uid=None, lm_password=None, nt_password=None, acct_ctrl=None,
- last_change_time=None, nt_username=None, fullname=None, logon_time=None, logoff_time=None,
- acct_desc=None, group_rid=None, bad_password_count=None, logon_count=None,
- domain=None, dir_drive=None, munged_dial=None, homedir=None, logon_script=None,
- profile_path=None, workstations=None, kickoff_time=None, bad_password_time=None,
- pass_last_set_time=None, pass_can_change_time=None, pass_must_change_time=None,
- user_rid=None, unknown_6=None, nt_password_history=None,
- unknown_str=None, hours=None, logon_divs=None):
- self.username = name
- self.uid = uid
- self.lm_password = lm_password
- self.nt_password = nt_password
- self.acct_ctrl = acct_ctrl
- self.pass_last_set_time = last_change_time
- self.nt_username = nt_username
- self.fullname = fullname
- self.logon_time = logon_time
- self.logoff_time = logoff_time
- self.acct_desc = acct_desc
- self.group_rid = group_rid
- self.bad_password_count = bad_password_count
- self.logon_count = logon_count
- self.domain = domain
- self.dir_drive = dir_drive
- self.munged_dial = munged_dial
- self.homedir = homedir
- self.logon_script = logon_script
- self.profile_path = profile_path
- self.workstations = workstations
- self.kickoff_time = kickoff_time
- self.bad_password_time = bad_password_time
- self.pass_can_change_time = pass_can_change_time
- self.pass_must_change_time = pass_must_change_time
- self.user_rid = user_rid
- self.unknown_6 = unknown_6
- self.nt_password_history = nt_password_history
- self.unknown_str = unknown_str
- self.hours = hours
- self.logon_divs = logon_divs
-
- def __eq__(self, other):
- if not isinstance(other, SAMUser):
- return False
- return self.__dict__ == other.__dict__
-
-
-class SmbpasswdFile(object):
- """Samba 3 smbpasswd file reader."""
- def __init__(self, file):
- self.users = {}
- f = open(file, 'r')
- for l in f.readlines():
- if len(l) == 0 or l[0] == "#":
- continue # Skip comments and blank lines
- parts = l.split(":")
- username = parts[0]
- uid = int(parts[1])
- acct_ctrl = 0
- last_change_time = None
- if parts[2] == "NO PASSWORD":
- acct_ctrl |= ACB_PWNOTREQ
- lm_password = None
- elif parts[2][0] in ("*", "X"):
- # No password set
- lm_password = None
- else:
- lm_password = parts[2]
-
- if parts[3][0] in ("*", "X"):
- # No password set
- nt_password = None
- else:
- nt_password = parts[3]
-
- if parts[4][0] == '[':
- assert "]" in parts[4]
- acct_ctrl |= decode_acb(parts[4][1:-1])
- if parts[5].startswith("LCT-"):
- last_change_time = int(parts[5][len("LCT-"):], 16)
- else: # old style file
- if username[-1] == "$":
- acct_ctrl &= ~ACB_NORMAL
- acct_ctrl |= ACB_WSTRUST
-
- self.users[username] = SAMUser(username, uid, lm_password, nt_password, acct_ctrl, last_change_time)
-
- f.close()
-
- def __len__(self):
- return len(self.users)
-
- def __getitem__(self, name):
- return self.users[name]
-
- def __iter__(self):
- return iter(self.users)
-
- def close(self): # For consistency
- pass
-
-
-TDBSAM_FORMAT_STRING_V0 = "ddddddBBBBBBBBBBBBddBBwdwdBwwd"
-TDBSAM_FORMAT_STRING_V1 = "dddddddBBBBBBBBBBBBddBBwdwdBwwd"
-TDBSAM_FORMAT_STRING_V2 = "dddddddBBBBBBBBBBBBddBBBwwdBwwd"
-TDBSAM_USER_PREFIX = "USER_"
-
-
-class LdapSam(object):
- """Samba 3 LDAP passdb backend reader."""
- def __init__(self, url):
- self.ldap_url = url
-
-
-class TdbSam(TdbDatabase):
- """Samba 3 TDB passdb backend reader."""
- def _check_version(self):
- self.version = fetch_uint32(self.tdb, "INFO/version\0") or 0
- assert self.version in (0, 1, 2)
-
- def usernames(self):
- """Iterate over the usernames in this Tdb database."""
- for k in self.tdb.iterkeys():
- if k.startswith(TDBSAM_USER_PREFIX):
- yield k[len(TDBSAM_USER_PREFIX):].rstrip("\0")
-
- __iter__ = usernames
-
- def __getitem__(self, name):
- data = self.tdb["%s%s\0" % (TDBSAM_USER_PREFIX, name)]
- user = SAMUser(name)
-
- def unpack_string(data):
- (length, ) = struct.unpack("<L", data[:4])
- data = data[4:]
- if length == 0:
- return (None, data)
- return (data[:length].rstrip("\0"), data[length:])
-
- def unpack_int32(data):
- (value, ) = struct.unpack("<l", data[:4])
- return (value, data[4:])
-
- def unpack_uint32(data):
- (value, ) = struct.unpack("<L", data[:4])
- return (value, data[4:])
-
- def unpack_uint16(data):
- (value, ) = struct.unpack("<H", data[:2])
- return (value, data[2:])
-
- (logon_time, data) = unpack_int32(data)
- (logoff_time, data) = unpack_int32(data)
- (kickoff_time, data) = unpack_int32(data)
-
- if self.version > 0:
- (bad_password_time, data) = unpack_int32(data)
- if bad_password_time != 0:
- user.bad_password_time = bad_password_time
- (pass_last_set_time, data) = unpack_int32(data)
- (pass_can_change_time, data) = unpack_int32(data)
- (pass_must_change_time, data) = unpack_int32(data)
-
- if logon_time != 0:
- user.logon_time = logon_time
- user.logoff_time = logoff_time
- user.kickoff_time = kickoff_time
- if pass_last_set_time != 0:
- user.pass_last_set_time = pass_last_set_time
- user.pass_can_change_time = pass_can_change_time
-
- (user.username, data) = unpack_string(data)
- (user.domain, data) = unpack_string(data)
- (user.nt_username, data) = unpack_string(data)
- (user.fullname, data) = unpack_string(data)
- (user.homedir, data) = unpack_string(data)
- (user.dir_drive, data) = unpack_string(data)
- (user.logon_script, data) = unpack_string(data)
- (user.profile_path, data) = unpack_string(data)
- (user.acct_desc, data) = unpack_string(data)
- (user.workstations, data) = unpack_string(data)
- (user.unknown_str, data) = unpack_string(data)
- (user.munged_dial, data) = unpack_string(data)
-
- (user.user_rid, data) = unpack_int32(data)
- (user.group_rid, data) = unpack_int32(data)
-
- (user.lm_password, data) = unpack_string(data)
- (user.nt_password, data) = unpack_string(data)
-
- if self.version > 1:
- (user.nt_password_history, data) = unpack_string(data)
-
- (user.acct_ctrl, data) = unpack_uint16(data)
- (_, data) = unpack_uint32(data) # remove_me field
- (user.logon_divs, data) = unpack_uint16(data)
- (hours, data) = unpack_string(data)
- user.hours = []
- for entry in hours:
- for i in range(8):
- user.hours.append(ord(entry) & (2 ** i) == (2 ** i))
- (user.bad_password_count, data) = unpack_uint16(data)
- (user.logon_count, data) = unpack_uint16(data)
- (user.unknown_6, data) = unpack_uint32(data)
- assert len(data) == 0
- return user
-
-
def shellsplit(text):
"""Very simple shell-like line splitting.
@@ -675,139 +355,51 @@ class WinsDatabase(object):
pass
-class ParamFile(object):
- """Simple smb.conf-compatible file parser
-
- Does not use a parameter table, unlike the "normal".
- """
-
- def __init__(self, sections=None):
- self._sections = sections or {}
-
- def _sanitize_name(self, name):
- return name.strip().lower().replace(" ","")
-
- def __repr__(self):
- return "ParamFile(%r)" % self._sections
-
- def read(self, filename):
- """Read a file.
-
- :param filename: Path to the file
- """
- section = None
- for i, l in enumerate(open(filename, 'r').xreadlines()):
- l = l.strip()
- if not l or l[0] == '#' or l[0] == ';':
- continue
- if l[0] == "[" and l[-1] == "]":
- section = self._sanitize_name(l[1:-1])
- self._sections.setdefault(section, {})
- elif "=" in l:
- (k, v) = l.split("=", 1)
- self._sections[section][self._sanitize_name(k)] = v
- else:
- raise Exception("Unable to parser line %d: %r" % (i+1,l))
-
- def get(self, param, section=None):
- """Return the value of a parameter.
-
- :param param: Parameter name
- :param section: Section name, defaults to "global"
- :return: parameter value as string if found, None otherwise.
- """
- if section is None:
- section = "global"
- section = self._sanitize_name(section)
- if not section in self._sections:
- return None
- param = self._sanitize_name(param)
- if not param in self._sections[section]:
- return None
- return self._sections[section][param].strip()
-
- def __getitem__(self, section):
- return self._sections[section]
-
- def get_section(self, section):
- return self._sections.get(section)
-
- def add_section(self, section):
- self._sections[self._sanitize_name(section)] = {}
-
- def set_string(self, name, value):
- self._sections["global"][name] = value
-
- def get_string(self, name):
- return self._sections["global"].get(name)
-
-
class Samba3(object):
"""Samba 3 configuration and state data reader."""
- def __init__(self, libdir, smbconfpath):
+ def __init__(self, smbconfpath, s3_lp_ctx=None):
"""Open the configuration and data for a Samba 3 installation.
- :param libdir: Library directory
:param smbconfpath: Path to the smb.conf file.
+ :param s3_lp_ctx: Samba3 Loadparm context
"""
self.smbconfpath = smbconfpath
- self.libdir = libdir
- self.lp = ParamFile()
- self.lp.read(self.smbconfpath)
- self.privatedir = self.lp.get("private dir") or libdir
+ if s3_lp_ctx:
+ self.lp = s3_lp_ctx
+ else:
+ self.lp = s3param.get_context()
+ self.lp.load(smbconfpath)
- def libdir_path(self, path):
+ def statedir_path(self, path):
if path[0] == "/" or path[0] == ".":
return path
- return os.path.join(self.libdir, path)
+ return os.path.join(self.lp.get("state directory"), path)
def privatedir_path(self, path):
if path[0] == "/" or path[0] == ".":
return path
- return os.path.join(self.privatedir, path)
+ return os.path.join(self.lp.get("private dir"), path)
def get_conf(self):
return self.lp
def get_sam_db(self):
- lp = self.get_conf()
- backends = (lp.get("passdb backend") or "").split(" ")
- if ":" in backends[0]:
- (name, location) = backends[0].split(":", 2)
- else:
- name = backends[0]
- location = None
- if name == "smbpasswd":
- return SmbpasswdFile(self.libdir_path(location or "smbpasswd"))
- elif name == "tdbsam":
- return TdbSam(self.libdir_path(location or "passdb.tdb"))
- elif name == "ldapsam":
- if location is not None:
- return LdapSam("ldap:%s" % location)
- return LdapSam(lp.get("ldap server"))
- else:
- raise NotImplementedError("unsupported passdb backend %s" % backends[0])
-
- def get_policy_db(self):
- return PolicyDatabase(self.libdir_path("account_policy.tdb"))
+ return passdb.PDB(self.lp.get('passdb backend'))
def get_registry(self):
- return Registry(self.libdir_path("registry.tdb"))
+ return Registry(self.statedir_path("registry.tdb"))
def get_secrets_db(self):
return SecretsDatabase(self.privatedir_path("secrets.tdb"))
def get_shareinfo_db(self):
- return ShareInfoDatabase(self.libdir_path("share_info.tdb"))
+ return ShareInfoDatabase(self.statedir_path("share_info.tdb"))
def get_idmap_db(self):
- return IdmapDatabase(self.libdir_path("winbindd_idmap.tdb"))
+ return IdmapDatabase(self.statedir_path("winbindd_idmap.tdb"))
def get_wins_db(self):
- return WinsDatabase(self.libdir_path("wins.dat"))
+ return WinsDatabase(self.statedir_path("wins.dat"))
def get_shares(self):
return Shares(self.get_conf(), self.get_shareinfo_db())
-
- def get_groupmapping_db(self):
- return GroupMappingDatabase(self.libdir_path("group_mapping.tdb"))