From 9a4a2b0f0d1411c98194a838390862bff8c184dd Mon Sep 17 00:00:00 2001 From: Amitay Isaacs Date: Fri, 12 Aug 2011 16:19:06 +1000 Subject: py-samba3: Create samba3 python package to hold other modules This will include passdb and param. --- source4/scripting/python/samba/samba3.py | 793 ---------------------- source4/scripting/python/samba/samba3/__init__.py | 793 ++++++++++++++++++++++ 2 files changed, 793 insertions(+), 793 deletions(-) delete mode 100644 source4/scripting/python/samba/samba3.py create mode 100644 source4/scripting/python/samba/samba3/__init__.py (limited to 'source4') diff --git a/source4/scripting/python/samba/samba3.py b/source4/scripting/python/samba/samba3.py deleted file mode 100644 index ae5b20edd2..0000000000 --- a/source4/scripting/python/samba/samba3.py +++ /dev/null @@ -1,793 +0,0 @@ -# Unix SMB/CIFS implementation. -# Copyright (C) Jelmer Vernooij 2007 -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# - -"""Support for reading Samba 3 data files.""" - -__docformat__ = "restructuredText" - -REGISTRY_VALUE_PREFIX = "SAMBA_REGVAL" -REGISTRY_DB_VERSION = 1 - -import os -import struct -import tdb - - -def fetch_uint32(tdb, key): - try: - data = tdb[key] - except KeyError: - return None - assert len(data) == 4 - return struct.unpack(" 0: - (bad_password_time, data) = unpack_int32(data) - if bad_password_time != 0: - user.bad_password_time = bad_password_time - (pass_last_set_time, data) = unpack_int32(data) - (pass_can_change_time, data) = unpack_int32(data) - (pass_must_change_time, data) = unpack_int32(data) - - if logon_time != 0: - user.logon_time = logon_time - user.logoff_time = logoff_time - user.kickoff_time = kickoff_time - if pass_last_set_time != 0: - user.pass_last_set_time = pass_last_set_time - user.pass_can_change_time = pass_can_change_time - - (user.username, data) = unpack_string(data) - (user.domain, data) = unpack_string(data) - (user.nt_username, data) = unpack_string(data) - (user.fullname, data) = unpack_string(data) - (user.homedir, data) = unpack_string(data) - (user.dir_drive, data) = unpack_string(data) - (user.logon_script, data) = unpack_string(data) - (user.profile_path, data) = unpack_string(data) - (user.acct_desc, data) = unpack_string(data) - (user.workstations, data) = unpack_string(data) - (user.unknown_str, data) = unpack_string(data) - (user.munged_dial, data) = unpack_string(data) - - (user.user_rid, data) = unpack_int32(data) - (user.group_rid, data) = unpack_int32(data) - - (user.lm_password, data) = unpack_string(data) - (user.nt_password, data) = unpack_string(data) - - if self.version > 1: - (user.nt_password_history, data) = unpack_string(data) - - (user.acct_ctrl, data) = unpack_uint16(data) - (_, data) = unpack_uint32(data) # remove_me field - (user.logon_divs, data) = unpack_uint16(data) - (hours, data) = unpack_string(data) - user.hours = [] - for entry in hours: - for i in range(8): - user.hours.append(ord(entry) & (2 ** i) == (2 ** i)) - (user.bad_password_count, data) = unpack_uint16(data) - (user.logon_count, data) = unpack_uint16(data) - (user.unknown_6, data) = unpack_uint32(data) - assert len(data) == 0 - return user - - -def shellsplit(text): - """Very simple shell-like line splitting. - - :param text: Text to split. - :return: List with parts of the line as strings. - """ - ret = list() - inquotes = False - current = "" - for c in text: - if c == "\"": - inquotes = not inquotes - elif c in ("\t", "\n", " ") and not inquotes: - ret.append(current) - current = "" - else: - current += c - if current != "": - ret.append(current) - return ret - - -class WinsDatabase(object): - """Samba 3 WINS database reader.""" - def __init__(self, file): - self.entries = {} - f = open(file, 'r') - assert f.readline().rstrip("\n") == "VERSION 1 0" - for l in f.readlines(): - if l[0] == "#": # skip comments - continue - entries = shellsplit(l.rstrip("\n")) - name = entries[0] - ttl = int(entries[1]) - i = 2 - ips = [] - while "." in entries[i]: - ips.append(entries[i]) - i+=1 - nb_flags = int(entries[i][:-1], 16) - assert not name in self.entries, "Name %s exists twice" % name - self.entries[name] = (ttl, ips, nb_flags) - f.close() - - def __getitem__(self, name): - return self.entries[name] - - def __len__(self): - return len(self.entries) - - def __iter__(self): - return iter(self.entries) - - def items(self): - """Return the entries in this WINS database.""" - return self.entries.items() - - def close(self): # for consistency - pass - - -class ParamFile(object): - """Simple smb.conf-compatible file parser - - Does not use a parameter table, unlike the "normal". - """ - - def __init__(self, sections=None): - self._sections = sections or {} - - def _sanitize_name(self, name): - return name.strip().lower().replace(" ","") - - def __repr__(self): - return "ParamFile(%r)" % self._sections - - def read(self, filename): - """Read a file. - - :param filename: Path to the file - """ - section = None - for i, l in enumerate(open(filename, 'r').xreadlines()): - l = l.strip() - if not l or l[0] == '#' or l[0] == ';': - continue - if l[0] == "[" and l[-1] == "]": - section = self._sanitize_name(l[1:-1]) - self._sections.setdefault(section, {}) - elif "=" in l: - (k, v) = l.split("=", 1) - self._sections[section][self._sanitize_name(k)] = v - else: - raise Exception("Unable to parser line %d: %r" % (i+1,l)) - - def get(self, param, section=None): - """Return the value of a parameter. - - :param param: Parameter name - :param section: Section name, defaults to "global" - :return: parameter value as string if found, None otherwise. - """ - if section is None: - section = "global" - section = self._sanitize_name(section) - if not section in self._sections: - return None - param = self._sanitize_name(param) - if not param in self._sections[section]: - return None - return self._sections[section][param].strip() - - def __getitem__(self, section): - return self._sections[section] - - def get_section(self, section): - return self._sections.get(section) - - def add_section(self, section): - self._sections[self._sanitize_name(section)] = {} - - def set_string(self, name, value): - self._sections["global"][name] = value - - def get_string(self, name): - return self._sections["global"].get(name) - - -class Samba3(object): - """Samba 3 configuration and state data reader.""" - def __init__(self, libdir, smbconfpath): - """Open the configuration and data for a Samba 3 installation. - - :param libdir: Library directory - :param smbconfpath: Path to the smb.conf file. - """ - self.smbconfpath = smbconfpath - self.libdir = libdir - self.lp = ParamFile() - self.lp.read(self.smbconfpath) - - def libdir_path(self, path): - if path[0] == "/" or path[0] == ".": - return path - return os.path.join(self.libdir, path) - - def get_conf(self): - return self.lp - - def get_sam_db(self): - lp = self.get_conf() - backends = (lp.get("passdb backend") or "").split(" ") - if ":" in backends[0]: - (name, location) = backends[0].split(":", 2) - else: - name = backends[0] - location = None - if name == "smbpasswd": - return SmbpasswdFile(self.libdir_path(location or "smbpasswd")) - elif name == "tdbsam": - return TdbSam(self.libdir_path(location or "passdb.tdb")) - elif name == "ldapsam": - if location is not None: - return LdapSam("ldap:%s" % location) - return LdapSam(lp.get("ldap server")) - else: - raise NotImplementedError("unsupported passdb backend %s" % backends[0]) - - def get_policy_db(self): - return PolicyDatabase(self.libdir_path("account_policy.tdb")) - - def get_registry(self): - return Registry(self.libdir_path("registry.tdb")) - - def get_secrets_db(self): - return SecretsDatabase(self.libdir_path("secrets.tdb")) - - def get_shareinfo_db(self): - return ShareInfoDatabase(self.libdir_path("share_info.tdb")) - - def get_idmap_db(self): - return IdmapDatabase(self.libdir_path("winbindd_idmap.tdb")) - - def get_wins_db(self): - return WinsDatabase(self.libdir_path("wins.dat")) - - def get_shares(self): - return Shares(self.get_conf(), self.get_shareinfo_db()) - - def get_groupmapping_db(self): - return GroupMappingDatabase(self.libdir_path("group_mapping.tdb")) diff --git a/source4/scripting/python/samba/samba3/__init__.py b/source4/scripting/python/samba/samba3/__init__.py new file mode 100644 index 0000000000..55e9b18f24 --- /dev/null +++ b/source4/scripting/python/samba/samba3/__init__.py @@ -0,0 +1,793 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij 2007 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +"""Support for reading Samba 3 data files.""" + +__docformat__ = "restructuredText" + +REGISTRY_VALUE_PREFIX = "SAMBA_REGVAL" +REGISTRY_DB_VERSION = 1 + +import os +import struct +import tdb + + +def fetch_uint32(tdb, key): + try: + data = tdb[key] + except KeyError: + return None + assert len(data) == 4 + return struct.unpack(" 0: + (bad_password_time, data) = unpack_int32(data) + if bad_password_time != 0: + user.bad_password_time = bad_password_time + (pass_last_set_time, data) = unpack_int32(data) + (pass_can_change_time, data) = unpack_int32(data) + (pass_must_change_time, data) = unpack_int32(data) + + if logon_time != 0: + user.logon_time = logon_time + user.logoff_time = logoff_time + user.kickoff_time = kickoff_time + if pass_last_set_time != 0: + user.pass_last_set_time = pass_last_set_time + user.pass_can_change_time = pass_can_change_time + + (user.username, data) = unpack_string(data) + (user.domain, data) = unpack_string(data) + (user.nt_username, data) = unpack_string(data) + (user.fullname, data) = unpack_string(data) + (user.homedir, data) = unpack_string(data) + (user.dir_drive, data) = unpack_string(data) + (user.logon_script, data) = unpack_string(data) + (user.profile_path, data) = unpack_string(data) + (user.acct_desc, data) = unpack_string(data) + (user.workstations, data) = unpack_string(data) + (user.unknown_str, data) = unpack_string(data) + (user.munged_dial, data) = unpack_string(data) + + (user.user_rid, data) = unpack_int32(data) + (user.group_rid, data) = unpack_int32(data) + + (user.lm_password, data) = unpack_string(data) + (user.nt_password, data) = unpack_string(data) + + if self.version > 1: + (user.nt_password_history, data) = unpack_string(data) + + (user.acct_ctrl, data) = unpack_uint16(data) + (_, data) = unpack_uint32(data) # remove_me field + (user.logon_divs, data) = unpack_uint16(data) + (hours, data) = unpack_string(data) + user.hours = [] + for entry in hours: + for i in range(8): + user.hours.append(ord(entry) & (2 ** i) == (2 ** i)) + (user.bad_password_count, data) = unpack_uint16(data) + (user.logon_count, data) = unpack_uint16(data) + (user.unknown_6, data) = unpack_uint32(data) + assert len(data) == 0 + return user + + +def shellsplit(text): + """Very simple shell-like line splitting. + + :param text: Text to split. + :return: List with parts of the line as strings. + """ + ret = list() + inquotes = False + current = "" + for c in text: + if c == "\"": + inquotes = not inquotes + elif c in ("\t", "\n", " ") and not inquotes: + ret.append(current) + current = "" + else: + current += c + if current != "": + ret.append(current) + return ret + + +class WinsDatabase(object): + """Samba 3 WINS database reader.""" + def __init__(self, file): + self.entries = {} + f = open(file, 'r') + assert f.readline().rstrip("\n") == "VERSION 1 0" + for l in f.readlines(): + if l[0] == "#": # skip comments + continue + entries = shellsplit(l.rstrip("\n")) + name = entries[0] + ttl = int(entries[1]) + i = 2 + ips = [] + while "." in entries[i]: + ips.append(entries[i]) + i+=1 + nb_flags = int(entries[i][:-1], 16) + assert not name in self.entries, "Name %s exists twice" % name + self.entries[name] = (ttl, ips, nb_flags) + f.close() + + def __getitem__(self, name): + return self.entries[name] + + def __len__(self): + return len(self.entries) + + def __iter__(self): + return iter(self.entries) + + def items(self): + """Return the entries in this WINS database.""" + return self.entries.items() + + def close(self): # for consistency + pass + + +class ParamFile(object): + """Simple smb.conf-compatible file parser + + Does not use a parameter table, unlike the "normal". + """ + + def __init__(self, sections=None): + self._sections = sections or {} + + def _sanitize_name(self, name): + return name.strip().lower().replace(" ","") + + def __repr__(self): + return "ParamFile(%r)" % self._sections + + def read(self, filename): + """Read a file. + + :param filename: Path to the file + """ + section = None + for i, l in enumerate(open(filename, 'r').xreadlines()): + l = l.strip() + if not l or l[0] == '#' or l[0] == ';': + continue + if l[0] == "[" and l[-1] == "]": + section = self._sanitize_name(l[1:-1]) + self._sections.setdefault(section, {}) + elif "=" in l: + (k, v) = l.split("=", 1) + self._sections[section][self._sanitize_name(k)] = v + else: + raise Exception("Unable to parser line %d: %r" % (i+1,l)) + + def get(self, param, section=None): + """Return the value of a parameter. + + :param param: Parameter name + :param section: Section name, defaults to "global" + :return: parameter value as string if found, None otherwise. + """ + if section is None: + section = "global" + section = self._sanitize_name(section) + if not section in self._sections: + return None + param = self._sanitize_name(param) + if not param in self._sections[section]: + return None + return self._sections[section][param].strip() + + def __getitem__(self, section): + return self._sections[section] + + def get_section(self, section): + return self._sections.get(section) + + def add_section(self, section): + self._sections[self._sanitize_name(section)] = {} + + def set_string(self, name, value): + self._sections["global"][name] = value + + def get_string(self, name): + return self._sections["global"].get(name) + + +class Samba3(object): + """Samba 3 configuration and state data reader.""" + def __init__(self, libdir, smbconfpath): + """Open the configuration and data for a Samba 3 installation. + + :param libdir: Library directory + :param smbconfpath: Path to the smb.conf file. + """ + self.smbconfpath = smbconfpath + self.libdir = libdir + self.lp = ParamFile() + self.lp.read(self.smbconfpath) + + def libdir_path(self, path): + if path[0] == "/" or path[0] == ".": + return path + return os.path.join(self.libdir, path) + + def get_conf(self): + return self.lp + + def get_sam_db(self): + lp = self.get_conf() + backends = (lp.get("passdb backend") or "").split(" ") + if ":" in backends[0]: + (name, location) = backends[0].split(":", 2) + else: + name = backends[0] + location = None + if name == "smbpasswd": + return SmbpasswdFile(self.libdir_path(location or "smbpasswd")) + elif name == "tdbsam": + return TdbSam(self.libdir_path(location or "passdb.tdb")) + elif name == "ldapsam": + if location is not None: + return LdapSam("ldap:%s" % location) + return LdapSam(lp.get("ldap server")) + else: + raise NotImplementedError("unsupported passdb backend %s" % backends[0]) + + def get_policy_db(self): + return PolicyDatabase(self.libdir_path("account_policy.tdb")) + + def get_registry(self): + return Registry(self.libdir_path("registry.tdb")) + + def get_secrets_db(self): + return SecretsDatabase(self.libdir_path("secrets.tdb")) + + def get_shareinfo_db(self): + return ShareInfoDatabase(self.libdir_path("share_info.tdb")) + + def get_idmap_db(self): + return IdmapDatabase(self.libdir_path("winbindd_idmap.tdb")) + + def get_wins_db(self): + return WinsDatabase(self.libdir_path("wins.dat")) + + def get_shares(self): + return Shares(self.get_conf(), self.get_shareinfo_db()) + + def get_groupmapping_db(self): + return GroupMappingDatabase(self.libdir_path("group_mapping.tdb")) -- cgit