From e5e2b64449a476552bbad49792b13f7501ecdc0c Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij <jelmer@samba.org> Date: Wed, 21 Nov 2007 12:31:43 +0100 Subject: r26079: Some cleanups in the old SWIG wrappers: - remove old torture tests for LDB (replaced by a much more extensive test) - moved tool to bin directory (This used to be commit d6d3b0ad7a789441c82cf30a640033a052921c37) --- source4/scripting/bin/rpcclient | 301 ++++++++++++++++++++++++++ source4/scripting/swig/README | 23 +- source4/scripting/swig/config.mk | 17 +- source4/scripting/swig/rpcclient | 301 -------------------------- source4/scripting/swig/torture/torture_ldb.py | 83 ------- 5 files changed, 314 insertions(+), 411 deletions(-) create mode 100755 source4/scripting/bin/rpcclient delete mode 100755 source4/scripting/swig/rpcclient delete mode 100755 source4/scripting/swig/torture/torture_ldb.py diff --git a/source4/scripting/bin/rpcclient b/source4/scripting/bin/rpcclient new file mode 100755 index 0000000000..34efafdf73 --- /dev/null +++ b/source4/scripting/bin/rpcclient @@ -0,0 +1,301 @@ +#!/usr/bin/python + +import sys, os, string +from cmd import Cmd +from optparse import OptionParser +from pprint import pprint + +import dcerpc, samr + +def swig2dict(obj): + """Convert a swig object to a dictionary.""" + + result = {} + + for attr in filter(lambda x: type(x) == str, dir(obj)): + + if attr[:2] == '__' and attr[-2:] == '__': + continue + + if attr == 'this' or attr == 'thisown': + continue + + result[attr] = getattr(obj, attr) + + return result + +class rpcclient(Cmd): + + prompt = 'rpcclient$ ' + + def __init__(self, server, cred): + Cmd.__init__(self) + self.server = server + self.cred = cred + + def emptyline(self): + + # Default for empty line is to repeat last command - yuck + + pass + + def onecmd(self, line): + + # Override the onecmd() method so we can trap error returns + + try: + Cmd.onecmd(self, line) + except dcerpc.NTSTATUS, arg: + print 'The command returned an error: %s' % arg[1] + + # Command handlers + + def do_help(self, line): + """Displays on-line help for rpcclient commands.""" + Cmd.do_help(self, line) + + def do_shell(self, line): + + status = os.system(line) + + if os.WIFEXITED(status): + if os.WEXITSTATUS(status) != 0: + print 'Command exited with code %d' % os.WEXITSTATUS(status) + else: + print 'Command exited with signal %d' % os.WTERMSIG(status) + + def do_EOF(self, line): + """Exits rpcclient.""" + print + sys.exit(0) + + # SAMR pipe commands + + def do_SamrEnumDomains(self, line): + """Enumerate domain names.""" + + usage = 'usage: SamrEnumDomains' + + if line != '': + print usage + return + + pipe = dcerpc.pipe_connect( + 'ncacn_np:%s' % self.server, + dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION), + self.cred) + + connect_handle = samr.Connect(pipe) + + for i in connect_handle.EnumDomains(): + print i + + def do_SamrLookupDomain(self, line): + """Return the SID for a domain.""" + + usage = 'SamrLookupDomain DOMAIN' + + parser = OptionParser(usage) + options, args = parser.parse_args(string.split(line)) + + if len(args) != 1: + print 'usage:', usage + return + + pipe = dcerpc.pipe_connect( + 'ncacn_np:%s' % self.server, + dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION), + self.cred) + + connect_handle = samr.Connect(pipe) + + print connect_handle.LookupDomain(args[0]) + + def do_SamrQueryDomInfo(self, line): + """Return information about a domain designated by its SID.""" + + usage = 'SamrQueryDomInfo DOMAIN_SID [info_level]' + + parser = OptionParser(usage) + options, args = parser.parse_args(string.split(line)) + + if (len(args) == 0) or (len(args) > 2): + print 'usage:', usage + return + + pipe = dcerpc.pipe_connect( + 'ncacn_np:%s' % self.server, + dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION), + self.cred) + + connect_handle = samr.Connect(pipe) + domain_handle = connect_handle.OpenDomain(args[0]) + + if (len(args) == 2): + result = domain_handle.QueryDomainInfo(int(args[1])) + else: + result = domain_handle.QueryDomainInfo() + + pprint(swig2dict(result)) + + def do_SamrQueryDomInfo2(self, line): + """Return information about a domain designated by its SID. + (Windows 2000 and >)""" + + usage = 'SamrQueryDomInfo2 DOMAIN_SID [info_level] (Windows 2000 and >)' + parser = OptionParser(usage) + options, args = parser.parse_args(string.split(line)) + + if len(args) == 0 or len(args) > 2: + print 'usage:', usage + return + + pipe = dcerpc.pipe_connect( + 'ncacn_np:%s' % self.server, + dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION), + self.cred) + + connect_handle = samr.Connect(pipe) + domain_handle = connect_handle.OpenDomain(args[0]) + + if (len(args) == 2): + result = domain_handle.QueryDomainInfo2(int(args[1])) + else: + result = domain_handle.QueryDomainInfo2() + + pprint(swig2dict(result)) + + def do_SamrEnumDomainGroups(self, line): + """Return the list of groups of a domain designated by its SID.""" + + usage = 'SamrEnumDomainGroups DOMAIN_SID' + + parser = OptionParser(usage) + options, args = parser.parse_args(string.split(line)) + + if len(args) != 1: + print 'usage:', usage + return + + pipe = dcerpc.pipe_connect( + 'ncacn_np:%s' % self.server, + dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION), + self.cred) + + connect_handle = samr.Connect(pipe) + domain_handle = connect_handle.OpenDomain(args[0]) + + result = domain_handle.EnumDomainGroups() + + pprint(result) + + def do_SamrEnumDomainAliases(self, line): + """Return the list of aliases (local groups) of a domain designated + by its SID.""" + + usage = 'SamrEnumDomainAliases DOMAIN_SID' + + parser = OptionParser(usage) + options, args = parser.parse_args(string.split(line)) + + if len(args) != 1: + print 'usage:', usage + return + + pipe = dcerpc.pipe_connect( + 'ncacn_np:%s' % self.server, + dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION), + self.cred) + + connect_handle = samr.Connect(pipe) + domain_handle = connect_handle.OpenDomain(args[0]) + + result = domain_handle.EnumDomainAliases() + + pprint(result) + + def do_SamrEnumDomainUsers(self, line): + """Return the list of users of a domain designated by its SID.""" + + usage = 'SamrEnumDomainUsers DOMAIN_SID [user_account_flags]' + + parser = OptionParser(usage) + options, args = parser.parse_args(string.split(line)) + + if (len(args) == 0) or (len(args) > 2): + print 'usage:', usage + return + + pipe = dcerpc.pipe_connect( + 'ncacn_np:%s' % self.server, + dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION), + self.cred) + + connect_handle = samr.Connect(pipe) + domain_handle = connect_handle.OpenDomain(args[0]) + + if (len(args) == 2): + result = domain_handle.EnumDomainUsers(int(args[1])) + else: + result = domain_handle.EnumDomainUsers() + + pprint(result) + +if __name__ == '__main__': + + # Parse command line + + usage = 'rpcclient SERVER [options]' + + if len(sys.argv) == 1: + print usage + sys.exit(1) + + server = sys.argv[1] + del(sys.argv[1]) + + parser = OptionParser(usage) + + parser.add_option('-U', '--username', action='store', type='string', + help='Use given credentials when connecting', + metavar='DOMAIN\\username%password', + dest='username') + + parser.add_option('-c', '--command', action='store', type='string', + help='Execute COMMAND', dest='command') + + options, args = parser.parse_args() + + # Break --username up into domain, username and password + + cred = None + + if not options.username: + options.username = '%' + + domain = '' + if string.find(options.username, '\\') != -1: + domain, options.username = string.split(options.username, '\\') + + password = '' + if string.find(options.username, '%') != -1: + options.username, password = string.split(options.username, '%') + + username = options.username + + if username != '': + cred = (domain, username, password) + + # Run command loop + + c = rpcclient(server, cred) + + if options.command: + c.onecmd(options.command) + sys.exit(0) + + while 1: + try: + c.cmdloop() + except KeyboardInterrupt: + print 'KeyboardInterrupt' diff --git a/source4/scripting/swig/README b/source4/scripting/swig/README index 561c5ab279..38b95b28f4 100644 --- a/source4/scripting/swig/README +++ b/source4/scripting/swig/README @@ -6,26 +6,9 @@ Instructions for building: 1. Run configure with the --with-python option to enable python extensions. -2. Edit the script/build_idl.sh script to pass the --swig option to - pidl. Here's a patch: +2. Run 'make idl_full swig' to build extensions. -Index: script/build_idl.sh -=================================================================== ---- script/build_idl.sh (revision 2413) -+++ script/build_idl.sh (working copy) -@@ -4,7 +4,7 @@ - - [ -d librpc/gen_ndr ] || mkdir -p librpc/gen_ndr || exit 1 - --PIDL="$PERL ./build/pidl/pidl.pl --output librpc/gen_ndr/ndr_ --parse --header --parser --server" -+PIDL="$PERL ./build/pidl/pidl.pl --output librpc/gen_ndr/ndr_ --parse --header --parser --server --swig" - TABLES="$PERL ./build/pidl/tables.pl --output librpc/gen_ndr/tables" - - if [ x$FULLBUILD = xFULL ]; then - -3. Run 'make idl_full swig' to build extensions. - -4. At some stage there will be a proper system for installing the +3. At some stage there will be a proper system for installing the extensions, but right now it's easier to run them in place. Set your PYTHONPATH to include the modules. From the Samba source directory, run: @@ -34,4 +17,4 @@ Index: script/build_idl.sh Now you can go nuts and use the extensions. Check the scripting/swig/torture directory for a testsuite. There will -hopefully be a bunch of usage examples somewhere. \ No newline at end of file +hopefully be a bunch of usage examples somewhere. diff --git a/source4/scripting/swig/config.mk b/source4/scripting/swig/config.mk index 944e33b9b4..193e84b255 100644 --- a/source4/scripting/swig/config.mk +++ b/source4/scripting/swig/config.mk @@ -1,16 +1,19 @@ # Swig extensions -swig: lib/tdb/swig/_tdb.$(SHLIBEXT) lib/ldb/swig/_ldb.$(SHLIBEXT) \ - libcli/swig/_libcli_nbt.$(SHLIBEXT) libcli/swig/_libcli_smb.$(SHLIBEXT) +swig: pythonmods +pythonmods: $(PYTHON_DSOS) + .SUFFIXES: _wrap.c .i .i_wrap.c: - swig -I$(srcdir)/scripting/swig -python $< + swig -Wall -I$(srcdir)/scripting/swig -python $< clean:: @echo "Removing SWIG output files" - @-rm -f scripting/swig/tdb.pyc scripting/swig/tdb.py + @-rm -f bin/python/* + # FIXME: Remove _wrap.c files -# Swig testing -swigtest: swig - ./selftest/test_swig.sh +PYDOCTOR_MODULES=bin/python/ldb.py bin/python/auth.py bin/python/credentials.py bin/python/registry.py + +pydoctor:: + LD_LIBRARY_PATH=bin/shared PYTHONPATH=bin/python pydoctor --make-html --docformat=restructedtext --add-package scripting/python/samba/ $(addprefix --add-module , $(PYDOCTOR_MODULES)) diff --git a/source4/scripting/swig/rpcclient b/source4/scripting/swig/rpcclient deleted file mode 100755 index 34efafdf73..0000000000 --- a/source4/scripting/swig/rpcclient +++ /dev/null @@ -1,301 +0,0 @@ -#!/usr/bin/python - -import sys, os, string -from cmd import Cmd -from optparse import OptionParser -from pprint import pprint - -import dcerpc, samr - -def swig2dict(obj): - """Convert a swig object to a dictionary.""" - - result = {} - - for attr in filter(lambda x: type(x) == str, dir(obj)): - - if attr[:2] == '__' and attr[-2:] == '__': - continue - - if attr == 'this' or attr == 'thisown': - continue - - result[attr] = getattr(obj, attr) - - return result - -class rpcclient(Cmd): - - prompt = 'rpcclient$ ' - - def __init__(self, server, cred): - Cmd.__init__(self) - self.server = server - self.cred = cred - - def emptyline(self): - - # Default for empty line is to repeat last command - yuck - - pass - - def onecmd(self, line): - - # Override the onecmd() method so we can trap error returns - - try: - Cmd.onecmd(self, line) - except dcerpc.NTSTATUS, arg: - print 'The command returned an error: %s' % arg[1] - - # Command handlers - - def do_help(self, line): - """Displays on-line help for rpcclient commands.""" - Cmd.do_help(self, line) - - def do_shell(self, line): - - status = os.system(line) - - if os.WIFEXITED(status): - if os.WEXITSTATUS(status) != 0: - print 'Command exited with code %d' % os.WEXITSTATUS(status) - else: - print 'Command exited with signal %d' % os.WTERMSIG(status) - - def do_EOF(self, line): - """Exits rpcclient.""" - print - sys.exit(0) - - # SAMR pipe commands - - def do_SamrEnumDomains(self, line): - """Enumerate domain names.""" - - usage = 'usage: SamrEnumDomains' - - if line != '': - print usage - return - - pipe = dcerpc.pipe_connect( - 'ncacn_np:%s' % self.server, - dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION), - self.cred) - - connect_handle = samr.Connect(pipe) - - for i in connect_handle.EnumDomains(): - print i - - def do_SamrLookupDomain(self, line): - """Return the SID for a domain.""" - - usage = 'SamrLookupDomain DOMAIN' - - parser = OptionParser(usage) - options, args = parser.parse_args(string.split(line)) - - if len(args) != 1: - print 'usage:', usage - return - - pipe = dcerpc.pipe_connect( - 'ncacn_np:%s' % self.server, - dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION), - self.cred) - - connect_handle = samr.Connect(pipe) - - print connect_handle.LookupDomain(args[0]) - - def do_SamrQueryDomInfo(self, line): - """Return information about a domain designated by its SID.""" - - usage = 'SamrQueryDomInfo DOMAIN_SID [info_level]' - - parser = OptionParser(usage) - options, args = parser.parse_args(string.split(line)) - - if (len(args) == 0) or (len(args) > 2): - print 'usage:', usage - return - - pipe = dcerpc.pipe_connect( - 'ncacn_np:%s' % self.server, - dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION), - self.cred) - - connect_handle = samr.Connect(pipe) - domain_handle = connect_handle.OpenDomain(args[0]) - - if (len(args) == 2): - result = domain_handle.QueryDomainInfo(int(args[1])) - else: - result = domain_handle.QueryDomainInfo() - - pprint(swig2dict(result)) - - def do_SamrQueryDomInfo2(self, line): - """Return information about a domain designated by its SID. - (Windows 2000 and >)""" - - usage = 'SamrQueryDomInfo2 DOMAIN_SID [info_level] (Windows 2000 and >)' - parser = OptionParser(usage) - options, args = parser.parse_args(string.split(line)) - - if len(args) == 0 or len(args) > 2: - print 'usage:', usage - return - - pipe = dcerpc.pipe_connect( - 'ncacn_np:%s' % self.server, - dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION), - self.cred) - - connect_handle = samr.Connect(pipe) - domain_handle = connect_handle.OpenDomain(args[0]) - - if (len(args) == 2): - result = domain_handle.QueryDomainInfo2(int(args[1])) - else: - result = domain_handle.QueryDomainInfo2() - - pprint(swig2dict(result)) - - def do_SamrEnumDomainGroups(self, line): - """Return the list of groups of a domain designated by its SID.""" - - usage = 'SamrEnumDomainGroups DOMAIN_SID' - - parser = OptionParser(usage) - options, args = parser.parse_args(string.split(line)) - - if len(args) != 1: - print 'usage:', usage - return - - pipe = dcerpc.pipe_connect( - 'ncacn_np:%s' % self.server, - dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION), - self.cred) - - connect_handle = samr.Connect(pipe) - domain_handle = connect_handle.OpenDomain(args[0]) - - result = domain_handle.EnumDomainGroups() - - pprint(result) - - def do_SamrEnumDomainAliases(self, line): - """Return the list of aliases (local groups) of a domain designated - by its SID.""" - - usage = 'SamrEnumDomainAliases DOMAIN_SID' - - parser = OptionParser(usage) - options, args = parser.parse_args(string.split(line)) - - if len(args) != 1: - print 'usage:', usage - return - - pipe = dcerpc.pipe_connect( - 'ncacn_np:%s' % self.server, - dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION), - self.cred) - - connect_handle = samr.Connect(pipe) - domain_handle = connect_handle.OpenDomain(args[0]) - - result = domain_handle.EnumDomainAliases() - - pprint(result) - - def do_SamrEnumDomainUsers(self, line): - """Return the list of users of a domain designated by its SID.""" - - usage = 'SamrEnumDomainUsers DOMAIN_SID [user_account_flags]' - - parser = OptionParser(usage) - options, args = parser.parse_args(string.split(line)) - - if (len(args) == 0) or (len(args) > 2): - print 'usage:', usage - return - - pipe = dcerpc.pipe_connect( - 'ncacn_np:%s' % self.server, - dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION), - self.cred) - - connect_handle = samr.Connect(pipe) - domain_handle = connect_handle.OpenDomain(args[0]) - - if (len(args) == 2): - result = domain_handle.EnumDomainUsers(int(args[1])) - else: - result = domain_handle.EnumDomainUsers() - - pprint(result) - -if __name__ == '__main__': - - # Parse command line - - usage = 'rpcclient SERVER [options]' - - if len(sys.argv) == 1: - print usage - sys.exit(1) - - server = sys.argv[1] - del(sys.argv[1]) - - parser = OptionParser(usage) - - parser.add_option('-U', '--username', action='store', type='string', - help='Use given credentials when connecting', - metavar='DOMAIN\\username%password', - dest='username') - - parser.add_option('-c', '--command', action='store', type='string', - help='Execute COMMAND', dest='command') - - options, args = parser.parse_args() - - # Break --username up into domain, username and password - - cred = None - - if not options.username: - options.username = '%' - - domain = '' - if string.find(options.username, '\\') != -1: - domain, options.username = string.split(options.username, '\\') - - password = '' - if string.find(options.username, '%') != -1: - options.username, password = string.split(options.username, '%') - - username = options.username - - if username != '': - cred = (domain, username, password) - - # Run command loop - - c = rpcclient(server, cred) - - if options.command: - c.onecmd(options.command) - sys.exit(0) - - while 1: - try: - c.cmdloop() - except KeyboardInterrupt: - print 'KeyboardInterrupt' diff --git a/source4/scripting/swig/torture/torture_ldb.py b/source4/scripting/swig/torture/torture_ldb.py deleted file mode 100755 index fe79f4f843..0000000000 --- a/source4/scripting/swig/torture/torture_ldb.py +++ /dev/null @@ -1,83 +0,0 @@ -#!/usr/bin/python -# -# A torture test for the Python Ldb bindings. Also a short guide on -# how the API works. -# - -from Ldb import * - -# Helpers - -def t(cond, msg): - """Test a condition.""" - if not cond: - raise RuntimeError('FAILED: %s' % msg) - -# -# Torture LdbMessage -# - -m = LdbMessage() - -# Empty message - -t(m.keys() == [], 'empty msg') -t(m.dn == None, 'empty dn') - -t(m.sanity_check() == LDB_ERR_INVALID_DN_SYNTAX, 'sanity check') - -# Test invalid dn - -try: - m.dn = 'invalid dn' -except LdbError, arg: - if arg[0] != LDB_ERR_INVALID_DN_SYNTAX: - raise -else: - t(False, 'LdbError not raised') - -# Test valid dn - -m.dn = 'name=spotty' -t(m.dn == 'name=spotty', 'specified dn') - -t(m.sanity_check() == LDB_SUCCESS, 'sanity check') - -# Test some single-valued attributes - -m['animal'] = 'dog' -m['name'] = 'spotty' - -t(m.keys() == ['animal', 'name'], 'keys() test failed') -t(m.values() == [['dog'], ['spotty']], 'values() test failed') -t(m.items() == [('animal', ['dog']), ('name', ['spotty'])], - 'items() test failed') - -t(m.sanity_check() == LDB_SUCCESS, 'sanity check') - -m['animal'] = 'canine' -t(m['animal'] == ['canine'], 'replace value failed') - -# Test a multi-valued attribute - -names = ['spotty', 'foot'] -m['name'] = names - -t(m['name'] == names, 'multi-valued attr failed') - -t(m.sanity_check() == LDB_SUCCESS, 'sanity check') - -# Test non-string attributes - -try: - m['foo'] = 42 -except TypeError: - pass -else: - t(False, 'TypeError not raised') - -# -# Torture Ldb -# - -l = Ldb('foo.ldb') -- cgit