From be4b68817544b87d12a1dcd7d8b5c5d778872418 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Thu, 8 Apr 2010 18:53:14 +0200 Subject: s4-python: Remove obsolete and broken torture modules. The functionality of these modules is already present in a more current form in other modules. --- source4/scripting/python/samba/torture/pytorture | 51 --- source4/scripting/python/samba/torture/spoolss.py | 437 --------------------- .../scripting/python/samba/torture/torture_samr.py | 220 ----------- .../scripting/python/samba/torture/torture_tdb.py | 90 ----- source4/scripting/python/samba/torture/winreg.py | 165 -------- 5 files changed, 963 deletions(-) delete mode 100755 source4/scripting/python/samba/torture/pytorture delete mode 100644 source4/scripting/python/samba/torture/spoolss.py delete mode 100755 source4/scripting/python/samba/torture/torture_samr.py delete mode 100755 source4/scripting/python/samba/torture/torture_tdb.py delete mode 100755 source4/scripting/python/samba/torture/winreg.py (limited to 'source4/scripting/python') diff --git a/source4/scripting/python/samba/torture/pytorture b/source4/scripting/python/samba/torture/pytorture deleted file mode 100755 index e0123447e8..0000000000 --- a/source4/scripting/python/samba/torture/pytorture +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/python - -import sys -from optparse import OptionParser - -# Parse command line - -parser = OptionParser() - -parser.add_option("-b", "--binding", action="store", type="string", - dest="binding") - -parser.add_option("-d", "--domain", action="store", type="string", - dest="domain") - -parser.add_option("-u", "--username", action="store", type="string", - dest="username") - -parser.add_option("-p", "--password", action="store", type="string", - dest="password") - -(options, args) = parser.parse_args() - -if not options.binding: - parser.error('You must supply a binding string') - -if not options.username or not options.password or not options.domain: - parser.error('You must supply a domain, username and password') - -binding = options.binding -domain = options.domain -username = options.username -password = options.password - -if len(args) == 0: - parser.error('You must supply the name of a module to test') - -# Import and test - -for test in args: - - try: - module = __import__('torture_%s' % test) - except ImportError: - print 'No such module "%s"' % test - sys.exit(1) - - if not hasattr(module, 'runtests'): - print 'Module "%s" does not have a runtests function' % test - - module.runtests(binding, (domain, username, password)) diff --git a/source4/scripting/python/samba/torture/spoolss.py b/source4/scripting/python/samba/torture/spoolss.py deleted file mode 100644 index a75385e079..0000000000 --- a/source4/scripting/python/samba/torture/spoolss.py +++ /dev/null @@ -1,437 +0,0 @@ -import sys, string -import dcerpc - - -def ResizeBufferCall(fn, pipe, r): - - r['buffer'] = None - r['buf_size'] = 0 - - result = fn(pipe, r) - - if result['result'] == dcerpc.WERR_INSUFFICIENT_BUFFER or \ - result['result'] == dcerpc.WERR_MORE_DATA: - r['buffer'] = result['buf_size'] * '\x00' - r['buf_size'] = result['buf_size'] - - result = fn(pipe, r) - - return result - - -def test_OpenPrinterEx(pipe, printer): - - print 'spoolss_OpenPrinterEx(%s)' % printer - - printername = '\\\\%s' % dcerpc.dcerpc_server_name(pipe) - - if printer is not None: - printername = printername + '\\%s' % printer - - r = {} - r['printername'] = printername - r['datatype'] = None - r['devmode_ctr'] = {} - r['devmode_ctr']['size'] = 0 - r['devmode_ctr']['devmode'] = None - r['access_mask'] = 0x02000000 - r['level'] = 1 - r['userlevel'] = {} - r['userlevel']['level1'] = {} - r['userlevel']['level1']['size'] = 0 - r['userlevel']['level1']['client'] = None - r['userlevel']['level1']['user'] = None - r['userlevel']['level1']['build'] = 1381 - r['userlevel']['level1']['major'] = 2 - r['userlevel']['level1']['minor'] = 0 - r['userlevel']['level1']['processor'] = 0 - - result = dcerpc.spoolss_OpenPrinterEx(pipe, r) - - return result['handle'] - - -def test_ClosePrinter(pipe, handle): - - r = {} - r['handle'] = handle - - dcerpc.spoolss_ClosePrinter(pipe, r) - - -def test_GetPrinter(pipe, handle): - - r = {} - r['handle'] = handle - - for level in [0, 1, 2, 3, 4, 5, 6, 7]: - - print 'spoolss_GetPrinter(level = %d)' % level - - r['level'] = level - r['buffer'] = None - r['buf_size'] = 0 - - result = ResizeBufferCall(dcerpc.spoolss_GetPrinter, pipe, r) - - -def test_EnumForms(pipe, handle): - - print 'spoolss_EnumForms()' - - r = {} - r['handle'] = handle - r['level'] = 1 - r['buffer'] = None - r['buf_size'] = 0 - - result = ResizeBufferCall(dcerpc.spoolss_EnumForms, pipe, r) - - forms = dcerpc.unmarshall_spoolss_FormInfo_array( - result['buffer'], r['level'], result['count']) - - for form in forms: - - r = {} - r['handle'] = handle - r['formname'] = form['info1']['formname'] - r['level'] = 1 - - result = ResizeBufferCall(dcerpc.spoolss_GetForm, pipe, r) - - -def test_EnumPorts(pipe, handle): - - print 'spoolss_EnumPorts()' - - for level in [1, 2]: - - r = {} - r['handle'] = handle - r['servername'] = None - r['level'] = level - - result = ResizeBufferCall(dcerpc.spoolss_EnumPorts, pipe, r) - - ports = dcerpc.unmarshall_spoolss_PortInfo_array( - result['buffer'], r['level'], result['count']) - - if level == 1: - port_names = map(lambda x: x['info1']['port_name'], ports) - - -def test_DeleteForm(pipe, handle, formname): - - r = {} - r['handle'] = handle - r['formname'] = formname - - dcerpc.spoolss_DeleteForm(pipe, r) - - -def test_GetForm(pipe, handle, formname): - - r = {} - r['handle'] = handle - r['formname'] = formname - r['level'] = 1 - - result = ResizeBufferCall(dcerpc.spoolss_GetForm, pipe, r) - - return result['info']['info1'] - - -def test_SetForm(pipe, handle, form): - - print 'spoolss_SetForm()' - - r = {} - r['handle'] = handle - r['level'] = 1 - r['formname'] = form['info1']['formname'] - r['info'] = form - - dcerpc.spoolss_SetForm(pipe, r) - - newform = test_GetForm(pipe, handle, r['formname']) - - if form['info1'] != newform: - print 'SetForm: mismatch: %s != %s' % \ - (r['info']['info1'], f) - sys.exit(1) - - -def test_AddForm(pipe, handle): - - print 'spoolss_AddForm()' - - formname = '__testform__' - - r = {} - r['handle'] = handle - r['level'] = 1 - r['info'] = {} - r['info']['info1'] = {} - r['info']['info1']['formname'] = formname - r['info']['info1']['flags'] = 0x0002 - r['info']['info1']['width'] = 100 - r['info']['info1']['length'] = 100 - r['info']['info1']['left'] = 0 - r['info']['info1']['top'] = 1000 - r['info']['info1']['right'] = 2000 - r['info']['info1']['bottom'] = 3000 - - try: - result = dcerpc.spoolss_AddForm(pipe, r) - except dcerpc.WERROR, arg: - if arg[0] == dcerpc.WERR_ALREADY_EXISTS: - test_DeleteForm(pipe, handle, formname) - result = dcerpc.spoolss_AddForm(pipe, r) - - f = test_GetForm(pipe, handle, formname) - - if r['info']['info1'] != f: - print 'AddForm: mismatch: %s != %s' % \ - (r['info']['info1'], f) - sys.exit(1) - - r['formname'] = formname - - test_SetForm(pipe, handle, r['info']) - - test_DeleteForm(pipe, handle, formname) - - -def test_EnumJobs(pipe, handle): - - print 'spoolss_EnumJobs()' - - r = {} - r['handle'] = handle - r['firstjob'] = 0 - r['numjobs'] = 0xffffffff - r['level'] = 1 - - result = ResizeBufferCall(dcerpc.spoolss_EnumJobs, pipe, r) - - if result['buffer'] is None: - return - - jobs = dcerpc.unmarshall_spoolss_JobInfo_array( - result['buffer'], r['level'], result['count']) - - for job in jobs: - - s = {} - s['handle'] = handle - s['job_id'] = job['info1']['job_id'] - s['level'] = 1 - - result = ResizeBufferCall(dcerpc.spoolss_GetJob, pipe, s) - - if result['info'] != job: - print 'EnumJobs: mismatch: %s != %s' % (result['info'], job) - sys.exit(1) - - - # TODO: AddJob, DeleteJob, ScheduleJob - - -def test_EnumPrinterData(pipe, handle): - - print 'test_EnumPrinterData()' - - enum_index = 0 - - while 1: - - r = {} - r['handle'] = handle - r['enum_index'] = enum_index - - r['value_offered'] = 0 - r['data_size'] = 0 - - result = dcerpc.spoolss_EnumPrinterData(pipe, r) - - r['value_offered'] = result['value_needed'] - r['data_size'] = result['data_size'] - - result = dcerpc.spoolss_EnumPrinterData(pipe, r) - - if result['result'] == dcerpc.WERR_NO_MORE_ITEMS: - break - - s = {} - s['handle'] = handle - s['value_name'] = result['value_name'] - - result2 = ResizeBufferCall(dcerpc.spoolss_GetPrinterData, pipe, s) - - if result['buffer'][:result2['buf_size']] != result2['buffer']: - print 'EnumPrinterData/GetPrinterData mismatch' - sys.exit(1) - - enum_index += 1 - - -def test_SetPrinterDataEx(pipe, handle): - - valuename = '__printerdataextest__' - data = '12345' - - r = {} - r['handle'] = handle - r['key_name'] = 'DsSpooler' - r['value_name'] = valuename - r['type'] = 3 - r['buffer'] = data - r['buf_size'] = len(data) - - result = dcerpc.spoolss_SetPrinterDataEx(pipe, r) - - -def test_EnumPrinterDataEx(pipe, handle): - - r = {} - r['handle'] = handle - r['key_name'] = 'DsSpooler' - r['buf_size'] = 0 - - result = dcerpc.spoolss_EnumPrinterDataEx(pipe, r) - - if result['result'] == dcerpc.WERR_MORE_DATA: - r['buf_size'] = result['buf_size'] - - result = dcerpc.spoolss_EnumPrinterDataEx(pipe, r) - - # TODO: test spoolss_GetPrinterDataEx() - - -def test_SetPrinterData(pipe, handle): - - print 'testing spoolss_SetPrinterData()' - - valuename = '__printerdatatest__' - data = '12345' - - r = {} - r['handle'] = handle - r['value_name'] = valuename - r['type'] = 3 # REG_BINARY - r['buffer'] = data - r['real_len'] = 5 - - dcerpc.spoolss_SetPrinterData(pipe, r) - - s = {} - s['handle'] = handle - s['value_name'] = valuename - - result = ResizeBufferCall(dcerpc.spoolss_GetPrinterData, pipe, r) - - if result['buffer'] != data: - print 'SetPrinterData: mismatch' - sys.exit(1) - - dcerpc.spoolss_DeletePrinterData(pipe, r) - - -def test_EnumPrinters(pipe): - - print 'testing spoolss_EnumPrinters()' - - printer_names = None - - r = {} - r['flags'] = 0x02 - r['server'] = None - - for level in [0, 1, 2, 4, 5]: - - print 'test_EnumPrinters(level = %d)' % level - - r['level'] = level - - result = ResizeBufferCall(dcerpc.spoolss_EnumPrinters, pipe, r) - - printers = dcerpc.unmarshall_spoolss_PrinterInfo_array( - result['buffer'], r['level'], result['count']) - - if level == 2: - for p in printers: - - # A nice check is for the specversion in the - # devicemode. This has always been observed to be - # 1025. - - if p['info2']['devmode']['specversion'] != 1025: - print 'test_EnumPrinters: specversion != 1025' - sys.exit(1) - - r['level'] = 1 - result = ResizeBufferCall(dcerpc.spoolss_EnumPrinters, pipe, r) - - for printer in dcerpc.unmarshall_spoolss_PrinterInfo_array( - result['buffer'], r['level'], result['count']): - - if string.find(printer['info1']['name'], '\\\\') == 0: - print 'Skipping remote printer %s' % printer['info1']['name'] - continue - - printername = string.split(printer['info1']['name'], ',')[0] - - handle = test_OpenPrinterEx(pipe, printername) - - test_GetPrinter(pipe, handle) - test_EnumPorts(pipe, handle) - test_EnumForms(pipe, handle) - test_AddForm(pipe, handle) - test_EnumJobs(pipe, handle) - test_EnumPrinterData(pipe, handle) - test_EnumPrinterDataEx(pipe, handle) - test_SetPrinterData(pipe, handle) -# test_SetPrinterDataEx(pipe, handle) - test_ClosePrinter(pipe, handle) - - -def test_EnumPrinterDrivers(pipe): - - print 'test spoolss_EnumPrinterDrivers()' - - for level in [1, 2, 3]: - - r = {} - r['server'] = None - r['environment'] = None - r['level'] = level - - result = ResizeBufferCall(dcerpc.spoolss_EnumPrinterDrivers, pipe, r) - - drivers = dcerpc.unmarshall_spoolss_DriverInfo_array( - result['buffer'], r['level'], result['count']) - - if level == 1: - driver_names = map(lambda x: x['info1']['driver_name'], drivers) - - -def test_PrintServer(pipe): - - handle = test_OpenPrinterEx(pipe, None) - - # EnumForms and AddForm tests return WERR_BADFID here (??) - - test_ClosePrinter(pipe, handle) - - -def runtests(binding, domain, username, password): - - print 'Testing SPOOLSS pipe' - - pipe = dcerpc.pipe_connect(binding, - dcerpc.DCERPC_SPOOLSS_UUID, dcerpc.DCERPC_SPOOLSS_VERSION, - domain, username, password) - - test_EnumPrinters(pipe) - test_EnumPrinterDrivers(pipe) - test_PrintServer(pipe) diff --git a/source4/scripting/python/samba/torture/torture_samr.py b/source4/scripting/python/samba/torture/torture_samr.py deleted file mode 100755 index e73bb4f21e..0000000000 --- a/source4/scripting/python/samba/torture/torture_samr.py +++ /dev/null @@ -1,220 +0,0 @@ -#!/usr/bin/python - -import dcerpc, samr - -def test_Connect(pipe): - - handle = samr.Connect(pipe) - handle = samr.Connect2(pipe) - handle = samr.Connect3(pipe) - handle = samr.Connect4(pipe) - - # WIN2K3 only? - - try: - handle = samr.Connect5(pipe) - except dcerpc.NTSTATUS, arg: - if arg[0] != 0xc00000d2L: # NT_STATUS_NET_WRITE_FAULT - raise - - return handle - -def test_UserHandle(user_handle): - - # QuerySecurity()/SetSecurity() - - user_handle.SetSecurity(user_handle.QuerySecurity()) - - # GetUserPwInfo() - - user_handle.GetUserPwInfo() - - # GetUserInfo() - - for level in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16, 17, 20, - 21, 23, 24, 25, 26]: - - try: - user_handle.QueryUserInfo(level) - user_handle.QueryUserInfo2(level) - except dcerpc.NTSTATUS, arg: - if arg[0] != 0xc0000003L: # NT_STATUS_INVALID_INFO_CLASS - raise - - # GetGroupsForUser() - - user_handle.GetGroupsForUser() - - # TestPrivateFunctionsUser() - - try: - user_handle.TestPrivateFunctionsUser() - except dcerpc.NTSTATUS, arg: - if arg[0] != 0xC0000002L: - raise - -def test_GroupHandle(group_handle): - - # QuerySecurity()/SetSecurity() - - group_handle.SetSecurity(group_handle.QuerySecurity()) - - # QueryGroupInfo() - - for level in [1, 2, 3, 4, 5]: - info = group_handle.QueryGroupInfo(level) - - # TODO: SetGroupinfo() - - # QueryGroupMember() - - group_handle.QueryGroupMember() - -def test_AliasHandle(alias_handle): - - # QuerySecurity()/SetSecurity() - - alias_handle.SetSecurity(alias_handle.QuerySecurity()) - - print alias_handle.GetMembersInAlias() - -def test_DomainHandle(name, sid, domain_handle): - - print 'testing %s (%s)' % (name, sid) - - # QuerySecurity()/SetSecurity() - - domain_handle.SetSecurity(domain_handle.QuerySecurity()) - - # LookupNames(), none mapped - - try: - domain_handle.LookupNames(['xxNONAMExx']) - except dcerpc.NTSTATUS, arg: - if arg[0] != 0xc0000073L: - raise dcerpc.NTSTATUS(arg) - - # LookupNames(), some mapped - - if name != 'Builtin': - domain_handle.LookupNames(['Administrator', 'xxNONAMExx']) - - # QueryDomainInfo()/SetDomainInfo() - - levels = [1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13] - set_ok = [1, 0, 1, 1, 0, 1, 1, 0, 1, 0, 1, 0] - - for i in range(len(levels)): - - info = domain_handle.QueryDomainInfo(level = levels[i]) - - try: - domain_handle.SetDomainInfo(levels[i], info) - except dcerpc.NTSTATUS, arg: - if not (arg[0] == 0xc0000003L and not set_ok[i]): - raise - - # QueryDomainInfo2() - - levels = [1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13] - - for i in range(len(levels)): - domain_handle.QueryDomainInfo2(level = levels[i]) - - # EnumDomainUsers - - print 'testing users' - - users = domain_handle.EnumDomainUsers() - rids = domain_handle.LookupNames(users) - - for i in range(len(users)): - test_UserHandle(domain_handle.OpenUser(rids[0][i])) - - # QueryDisplayInfo - - for i in [1, 2, 3, 4, 5]: - domain_handle.QueryDisplayInfo(level = i) - domain_handle.QueryDisplayInfo2(level = i) - domain_handle.QueryDisplayInfo3(level = i) - - # EnumDomainGroups - - print 'testing groups' - - groups = domain_handle.EnumDomainGroups() - rids = domain_handle.LookupNames(groups) - - for i in range(len(groups)): - test_GroupHandle(domain_handle.OpenGroup(rids[0][i])) - - # EnumDomainAliases - - print 'testing aliases' - - aliases = domain_handle.EnumDomainAliases() - rids = domain_handle.LookupNames(aliases) - - for i in range(len(aliases)): - test_AliasHandle(domain_handle.OpenAlias(rids[0][i])) - - # CreateUser - # CreateUser2 - # CreateDomAlias - # RidToSid - # RemoveMemberFromForeignDomain - # CreateDomainGroup - # GetAliasMembership - - # GetBootKeyInformation() - - try: - domain_handle.GetBootKeyInformation() - except dcerpc.NTSTATUS, arg: - pass - - # TestPrivateFunctionsDomain() - - try: - domain_handle.TestPrivateFunctionsDomain() - except dcerpc.NTSTATUS, arg: - if arg[0] != 0xC0000002L: - raise - -def test_ConnectHandle(connect_handle): - - print 'testing connect handle' - - # QuerySecurity/SetSecurity - - connect_handle.SetSecurity(connect_handle.QuerySecurity()) - - # Lookup bogus domain - - try: - connect_handle.LookupDomain('xxNODOMAINxx') - except dcerpc.NTSTATUS, arg: - if arg[0] != 0xC00000DFL: # NT_STATUS_NO_SUCH_DOMAIN - raise - - # Test all domains - - for domain_name in connect_handle.EnumDomains(): - - connect_handle.GetDomPwInfo(domain_name) - sid = connect_handle.LookupDomain(domain_name) - domain_handle = connect_handle.OpenDomain(sid) - - test_DomainHandle(domain_name, sid, domain_handle) - - # TODO: Test Shutdown() function - -def runtests(binding, creds): - - print 'Testing SAMR pipe' - - pipe = dcerpc.pipe_connect(binding, - dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION), creds) - - handle = test_Connect(pipe) - test_ConnectHandle(handle) diff --git a/source4/scripting/python/samba/torture/torture_tdb.py b/source4/scripting/python/samba/torture/torture_tdb.py deleted file mode 100755 index 7f97caf6cb..0000000000 --- a/source4/scripting/python/samba/torture/torture_tdb.py +++ /dev/null @@ -1,90 +0,0 @@ -#!/usr/bin/python - -import sys, os -import Tdb - -def fail(msg): - print 'FAILED:', msg - sys.exit(1) - -tdb_file = '/tmp/torture_tdb.tdb' - -# Create temporary tdb file - -t = Tdb.Tdb(tdb_file, flags = Tdb.CLEAR_IF_FIRST) - -# Check non-existent key throws KeyError exception - -try: - t['__none__'] -except KeyError: - pass -else: - fail('non-existent key did not throw KeyError') - -# Check storing key - -t['bar'] = '1234' -if t['bar'] != '1234': - fail('store key failed') - -# Check key exists - -if not t.has_key('bar'): - fail('has_key() failed for existing key') - -if t.has_key('__none__'): - fail('has_key() succeeded for non-existent key') - -# Delete key - -try: - del(t['__none__']) -except KeyError: - pass -else: - fail('delete of non-existent key did not throw KeyError') - -del t['bar'] -if t.has_key('bar'): - fail('delete of existing key did not delete key') - -# Clear all keys - -t.clear() -if len(t) != 0: - fail('clear failed to remove all keys') - -# Other dict functions - -t['a'] = '1' -t['ab'] = '12' -t['abc'] = '123' - -if len(t) != 3: - fail('len method produced wrong value') - -keys = t.keys() -values = t.values() -items = t.items() - -if set(keys) != set(['a', 'ab', 'abc']): - fail('keys method produced wrong values') - -if set(values) != set(['1', '12', '123']): - fail('values method produced wrong values') - -if set(items) != set([('a', '1'), ('ab', '12'), ('abc', '123')]): - fail('values method produced wrong values') - -t.close() - -# Re-open read-only - -t = Tdb.Tdb(tdb_file, open_flags = os.O_RDONLY) -t.keys() -t.close() - -# Clean up - -os.unlink(tdb_file) diff --git a/source4/scripting/python/samba/torture/winreg.py b/source4/scripting/python/samba/torture/winreg.py deleted file mode 100755 index eb60b9847e..0000000000 --- a/source4/scripting/python/samba/torture/winreg.py +++ /dev/null @@ -1,165 +0,0 @@ -#!/usr/bin/python - -import sys, dcerpc - -def test_OpenHKLM(pipe): - - r = {} - r['unknown'] = {} - r['unknown']['unknown0'] = 0x9038 - r['unknown']['unknown1'] = 0x0000 - r['access_required'] = 0x02000000 - - result = dcerpc.winreg_OpenHKLM(pipe, r) - - return result['handle'] - -def test_QueryInfoKey(pipe, handle): - - r = {} - r['handle'] = handle - r['class'] = {} - r['class']['name'] = None - - return dcerpc.winreg_QueryInfoKey(pipe, r) - -def test_CloseKey(pipe, handle): - - r = {} - r['handle'] = handle - - dcerpc.winreg_CloseKey(pipe, r) - -def test_FlushKey(pipe, handle): - - r = {} - r['handle'] = handle - - dcerpc.winreg_FlushKey(pipe, r) - -def test_GetVersion(pipe, handle): - - r = {} - r['handle'] = handle - - dcerpc.winreg_GetVersion(pipe, r) - -def test_GetKeySecurity(pipe, handle): - - r = {} - r['handle'] = handle - r['unknown'] = 4 - r['size'] = None - r['data'] = {} - r['data']['max_len'] = 0 - r['data']['data'] = '' - - result = dcerpc.winreg_GetKeySecurity(pipe, r) - - print result - - if result['result'] == dcerpc.WERR_INSUFFICIENT_BUFFER: - r['size'] = {} - r['size']['max_len'] = result['data']['max_len'] - r['size']['offset'] = 0 - r['size']['len'] = result['data']['max_len'] - - result = dcerpc.winreg_GetKeySecurity(pipe, r) - - print result - - sys.exit(1) - -def test_Key(pipe, handle, name, depth = 0): - - # Don't descend too far. Registries can be very deep. - - if depth > 2: - return - - try: - keyinfo = test_QueryInfoKey(pipe, handle) - except dcerpc.WERROR, arg: - if arg[0] == dcerpc.WERR_ACCESS_DENIED: - return - - test_GetVersion(pipe, handle) - - test_FlushKey(pipe, handle) - - test_GetKeySecurity(pipe, handle) - - # Enumerate values in this key - - r = {} - r['handle'] = handle - r['name_in'] = {} - r['name_in']['len'] = 0 - r['name_in']['max_len'] = (keyinfo['max_valnamelen'] + 1) * 2 - r['name_in']['buffer'] = {} - r['name_in']['buffer']['max_len'] = keyinfo['max_valnamelen'] + 1 - r['name_in']['buffer']['offset'] = 0 - r['name_in']['buffer']['len'] = 0 - r['type'] = 0 - r['value_in'] = {} - r['value_in']['max_len'] = keyinfo['max_valbufsize'] - r['value_in']['offset'] = 0 - r['value_in']['len'] = 0 - r['value_len1'] = keyinfo['max_valbufsize'] - r['value_len2'] = 0 - - for i in range(0, keyinfo['num_values']): - - r['enum_index'] = i - - dcerpc.winreg_EnumValue(pipe, r) - - # Recursively test subkeys of this key - - r = {} - r['handle'] = handle - r['key_name_len'] = 0 - r['unknown'] = 0x0414 - r['in_name'] = {} - r['in_name']['unknown'] = 0x20a - r['in_name']['key_name'] = {} - r['in_name']['key_name']['name'] = None - r['class'] = {} - r['class']['name'] = None - r['last_changed_time'] = {} - r['last_changed_time']['low'] = 0 - r['last_changed_time']['high'] = 0 - - for i in range(0, keyinfo['num_subkeys']): - - r['enum_index'] = i - - subkey = dcerpc.winreg_EnumKey(pipe, r) - - s = {} - s['handle'] = handle - s['keyname'] = {} - s['keyname']['name'] = subkey['out_name']['name'] - s['unknown'] = 0 - s['access_mask'] = 0x02000000 - - result = dcerpc.winreg_OpenKey(pipe, s) - - test_Key(pipe, result['handle'], name + '/' + s['keyname']['name'], - depth + 1) - - test_CloseKey(pipe, result['handle']) - - # Enumerate values - -def runtests(binding, domain, username, password): - - print 'Testing WINREG pipe' - - pipe = dcerpc.pipe_connect(binding, - dcerpc.DCERPC_WINREG_UUID, dcerpc.DCERPC_WINREG_VERSION, - domain, username, password) - - handle = test_OpenHKLM(pipe) - - test_Key(pipe, handle, 'HKLM') -- cgit