# Unix SMB/CIFS implementation. # Copyright (C) Jelmer Vernooij 2007 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # """Support for reading Samba 3 data files.""" __docformat__ = "restructuredText" REGISTRY_VALUE_PREFIX = "SAMBA_REGVAL" REGISTRY_DB_VERSION = 1 import os import struct import tdb def fetch_uint32(tdb, key): try: data = tdb[key] except KeyError: return None assert len(data) == 4 return struct.unpack(" 0: (bad_password_time, data) = unpack_int32(data) if bad_password_time != 0: user.bad_password_time = bad_password_time (pass_last_set_time, data) = unpack_int32(data) (pass_can_change_time, data) = unpack_int32(data) (pass_must_change_time, data) = unpack_int32(data) if logon_time != 0: user.logon_time = logon_time user.logoff_time = logoff_time user.kickoff_time = kickoff_time if pass_last_set_time != 0: user.pass_last_set_time = pass_last_set_time user.pass_can_change_time = pass_can_change_time (user.username, data) = unpack_string(data) (user.domain, data) = unpack_string(data) (user.nt_username, data) = unpack_string(data) (user.fullname, data) = unpack_string(data) (user.homedir, data) = unpack_string(data) (user.dir_drive, data) = unpack_string(data) (user.logon_script, data) = unpack_string(data) (user.profile_path, data) = unpack_string(data) (user.acct_desc, data) = unpack_string(data) (user.workstations, data) = unpack_string(data) (user.unknown_str, data) = unpack_string(data) (user.munged_dial, data) = unpack_string(data) (user.user_rid, data) = unpack_int32(data) (user.group_rid, data) = unpack_int32(data) (user.lm_password, data) = unpack_string(data) (user.nt_password, data) = unpack_string(data) if self.version > 1: (user.nt_password_history, data) = unpack_string(data) (user.acct_ctrl, data) = unpack_uint16(data) (_, data) = unpack_uint32(data) # remove_me field (user.logon_divs, data) = unpack_uint16(data) (hours, data) = unpack_string(data) user.hours = [] for entry in hours: for i in range(8): user.hours.append(ord(entry) & (2 ** i) == (2 ** i)) (user.bad_password_count, data) = unpack_uint16(data) (user.logon_count, data) = unpack_uint16(data) (user.unknown_6, data) = unpack_uint32(data) assert len(data) == 0 return user def shellsplit(text): """Very simple shell-like line splitting. :param text: Text to split. :return: List with parts of the line as strings. """ ret = list() inquotes = False current = "" for c in text: if c == "\"": inquotes = not inquotes elif c in ("\t", "\n", " ") and not inquotes: ret.append(current) current = "" else: current += c if current != "": ret.append(current) return ret class WinsDatabase(object): """Samba 3 WINS database reader.""" def __init__(self, file): self.entries = {} f = open(file, 'r') assert f.readline().rstrip("\n") == "VERSION 1 0" for l in f.readlines(): if l[0] == "#": # skip comments continue entries = shellsplit(l.rstrip("\n")) name = entries[0] ttl = int(entries[1]) i = 2 ips = [] while "." in entries[i]: ips.append(entries[i]) i+=1 nb_flags = int(entries[i][:-1], 16) assert not name in self.entries, "Name %s exists twice" % name self.entries[name] = (ttl, ips, nb_flags) f.close() def __getitem__(self, name): return self.entries[name] def __len__(self): return len(self.entries) def __iter__(self): return iter(self.entries) def items(self): """Return the entries in this WINS database.""" return self.entries.items() def close(self): # for consistency pass class ParamFile(object): """Simple smb.conf-compatible file parser Does not use a parameter table, unlike the "normal". """ def __init__(self, sections=None): self._sections = sections or {} def _sanitize_name(self, name): return name.strip().lower().replace(" ","") def __repr__(self): return "ParamFile(%r)" % self._sections def read(self, filename): """Read a file. :param filename: Path to the file """ section = None for i, l in enumerate(open(filename, 'r').xreadlines()): l = l.strip() if not l or l[0] == '#' or l[0] == ';': continue if l[0] == "[" and l[-1] == "]": section = self._sanitize_name(l[1:-1]) self._sections.setdefault(section, {}) elif "=" in l: (k, v) = l.split("=", 1) self._sections[section][self._sanitize_name(k)] = v else: raise Exception("Unable to parser line %d: %r" % (i+1,l)) def get(self, param, section=None): """Return the value of a parameter. :param param: Parameter name :param section: Section name, defaults to "global" :return: parameter value as string if found, None otherwise. """ if section is None: section = "global" section = self._sanitize_name(section) if not section in self._sections: return None param = self._sanitize_name(param) if not param in self._sections[section]: return None return self._sections[section][param].strip() def __getitem__(self, section): return self._sections[section] def get_section(self, section): return self._sections.get(section) def add_section(self, section): self._sections[self._sanitize_name(section)] = {} def set_string(self, name, value): self._sections["global"][name] = value def get_string(self, name): return self._sections["global"].get(name) class Samba3(object): """Samba 3 configuration and state data reader.""" def __init__(self, libdir, smbconfpath): """Open the configuration and data for a Samba 3 installation. :param libdir: Library directory :param smbconfpath: Path to the smb.conf file. """ self.smbconfpath = smbconfpath self.libdir = libdir self.lp = ParamFile() self.lp.read(self.smbconfpath) def libdir_path(self, path): if path[0] == "/" or path[0] == ".": return path return os.path.join(self.libdir, path) def get_conf(self): return self.lp def get_sam_db(self): lp = self.get_conf() backends = (lp.get("passdb backend") or "").split(" ") if ":" in backends[0]: (name, location) = backends[0].split(":", 2) else: name = backends[0] location = None if name == "smbpasswd": return SmbpasswdFile(self.libdir_path(location or "smbpasswd")) elif name == "tdbsam": return TdbSam(self.libdir_path(location or "passdb.tdb")) elif name == "ldapsam": if location is not None: return LdapSam("ldap:%s" % location) return LdapSam(lp.get("ldap server")) else: raise NotImplementedError("unsupported passdb backend %s" % backends[0]) def get_policy_db(self): return PolicyDatabase(self.libdir_path("account_policy.tdb")) def get_registry(self): return Registry(self.libdir_path("registry.tdb")) def get_secrets_db(self): return SecretsDatabase(self.libdir_path("secrets.tdb")) def get_shareinfo_db(self): return ShareInfoDatabase(self.libdir_path("share_info.tdb")) def get_idmap_db(self): return IdmapDatabase(self.libdir_path("winbindd_idmap.tdb")) def get_wins_db(self): return WinsDatabase(self.libdir_path("wins.dat")) def get_shares(self): return Shares(self.get_conf(), self.get_shareinfo_db()) def get_groupmapping_db(self): return GroupMappingDatabase(self.libdir_path("group_mapping.tdb"))