diff options
Diffstat (limited to 'source4/scripting/python')
| -rwxr-xr-x | source4/scripting/python/samba/torture/pytorture | 51 | ||||
| -rw-r--r-- | source4/scripting/python/samba/torture/spoolss.py | 437 | ||||
| -rwxr-xr-x | source4/scripting/python/samba/torture/torture_samr.py | 220 | ||||
| -rwxr-xr-x | source4/scripting/python/samba/torture/torture_tdb.py | 90 | ||||
| -rwxr-xr-x | source4/scripting/python/samba/torture/winreg.py | 165 | 
5 files changed, 0 insertions, 963 deletions
diff --git a/source4/scripting/python/samba/torture/pytorture b/source4/scripting/python/samba/torture/pytorture deleted file mode 100755 index e0123447e8..0000000000 --- a/source4/scripting/python/samba/torture/pytorture +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/python - -import sys -from optparse import OptionParser - -# Parse command line - -parser = OptionParser() - -parser.add_option("-b", "--binding", action="store", type="string", -                  dest="binding") - -parser.add_option("-d", "--domain", action="store", type="string", -                  dest="domain") - -parser.add_option("-u", "--username", action="store", type="string", -                  dest="username") - -parser.add_option("-p", "--password", action="store", type="string", -                  dest="password") - -(options, args) = parser.parse_args() - -if not options.binding: -   parser.error('You must supply a binding string') - -if not options.username or not options.password or not options.domain: -   parser.error('You must supply a domain, username and password') - -binding = options.binding -domain = options.domain -username = options.username -password = options.password - -if len(args) == 0: -   parser.error('You must supply the name of a module to test') - -# Import and test - -for test in args: - -   try: -      module = __import__('torture_%s' % test) -   except ImportError: -      print 'No such module "%s"' % test -      sys.exit(1) - -   if not hasattr(module, 'runtests'): -      print 'Module "%s" does not have a runtests function' % test - -   module.runtests(binding, (domain, username, password)) diff --git a/source4/scripting/python/samba/torture/spoolss.py b/source4/scripting/python/samba/torture/spoolss.py deleted file mode 100644 index a75385e079..0000000000 --- a/source4/scripting/python/samba/torture/spoolss.py +++ /dev/null @@ -1,437 +0,0 @@ -import sys, string -import dcerpc - - -def ResizeBufferCall(fn, pipe, r): - -    r['buffer'] = None -    r['buf_size'] = 0 -     -    result = fn(pipe, r) - -    if result['result'] == dcerpc.WERR_INSUFFICIENT_BUFFER or \ -       result['result'] == dcerpc.WERR_MORE_DATA: -        r['buffer'] = result['buf_size'] * '\x00' -        r['buf_size'] = result['buf_size'] - -    result = fn(pipe, r) - -    return result - - -def test_OpenPrinterEx(pipe, printer): - -    print 'spoolss_OpenPrinterEx(%s)' % printer - -    printername = '\\\\%s' % dcerpc.dcerpc_server_name(pipe) -     -    if printer is not None: -        printername = printername + '\\%s' % printer - -    r = {} -    r['printername'] = printername -    r['datatype'] = None -    r['devmode_ctr'] = {} -    r['devmode_ctr']['size'] = 0 -    r['devmode_ctr']['devmode'] = None -    r['access_mask'] = 0x02000000 -    r['level'] = 1 -    r['userlevel'] = {} -    r['userlevel']['level1'] = {} -    r['userlevel']['level1']['size'] = 0 -    r['userlevel']['level1']['client'] = None -    r['userlevel']['level1']['user'] = None -    r['userlevel']['level1']['build'] = 1381 -    r['userlevel']['level1']['major'] = 2 -    r['userlevel']['level1']['minor'] = 0 -    r['userlevel']['level1']['processor'] = 0 - -    result = dcerpc.spoolss_OpenPrinterEx(pipe, r) - -    return result['handle'] - - -def test_ClosePrinter(pipe, handle): - -    r = {} -    r['handle'] = handle - -    dcerpc.spoolss_ClosePrinter(pipe, r) - - -def test_GetPrinter(pipe, handle): - -    r = {} -    r['handle'] = handle - -    for level in [0, 1, 2, 3, 4, 5, 6, 7]: - -        print 'spoolss_GetPrinter(level = %d)' % level - -        r['level'] = level -        r['buffer'] = None -        r['buf_size'] = 0 - -        result = ResizeBufferCall(dcerpc.spoolss_GetPrinter, pipe, r) - - -def test_EnumForms(pipe, handle): - -    print 'spoolss_EnumForms()' - -    r = {} -    r['handle'] = handle -    r['level'] = 1 -    r['buffer'] = None -    r['buf_size'] = 0 - -    result = ResizeBufferCall(dcerpc.spoolss_EnumForms, pipe, r) - -    forms = dcerpc.unmarshall_spoolss_FormInfo_array( -        result['buffer'], r['level'], result['count']) - -    for form in forms: - -        r = {} -        r['handle'] = handle -        r['formname'] = form['info1']['formname'] -        r['level'] = 1 - -        result = ResizeBufferCall(dcerpc.spoolss_GetForm, pipe, r) - - -def test_EnumPorts(pipe, handle): - -    print 'spoolss_EnumPorts()' - -    for level in [1, 2]: - -        r = {} -        r['handle'] = handle -        r['servername'] = None -        r['level'] = level - -        result = ResizeBufferCall(dcerpc.spoolss_EnumPorts, pipe, r) - -        ports = dcerpc.unmarshall_spoolss_PortInfo_array( -            result['buffer'], r['level'], result['count']) - -        if level == 1: -            port_names = map(lambda x: x['info1']['port_name'], ports) - - -def test_DeleteForm(pipe, handle, formname): - -    r = {} -    r['handle'] = handle -    r['formname'] = formname - -    dcerpc.spoolss_DeleteForm(pipe, r) - - -def test_GetForm(pipe, handle, formname): - -    r = {} -    r['handle'] = handle -    r['formname'] = formname -    r['level'] = 1 - -    result = ResizeBufferCall(dcerpc.spoolss_GetForm, pipe, r) - -    return result['info']['info1'] -     - -def test_SetForm(pipe, handle, form): - -    print 'spoolss_SetForm()' - -    r = {} -    r['handle'] = handle -    r['level'] = 1 -    r['formname'] = form['info1']['formname'] -    r['info'] = form - -    dcerpc.spoolss_SetForm(pipe, r) - -    newform = test_GetForm(pipe, handle, r['formname']) - -    if form['info1'] != newform: -        print 'SetForm: mismatch: %s != %s' % \ -              (r['info']['info1'], f) -        sys.exit(1) - - -def test_AddForm(pipe, handle): - -    print 'spoolss_AddForm()' - -    formname = '__testform__' - -    r = {} -    r['handle'] = handle -    r['level'] = 1 -    r['info'] = {} -    r['info']['info1'] = {} -    r['info']['info1']['formname'] = formname -    r['info']['info1']['flags'] = 0x0002 -    r['info']['info1']['width'] = 100 -    r['info']['info1']['length'] = 100 -    r['info']['info1']['left'] = 0 -    r['info']['info1']['top'] = 1000 -    r['info']['info1']['right'] = 2000 -    r['info']['info1']['bottom'] = 3000 - -    try: -        result = dcerpc.spoolss_AddForm(pipe, r) -    except dcerpc.WERROR, arg: -        if arg[0] == dcerpc.WERR_ALREADY_EXISTS: -            test_DeleteForm(pipe, handle, formname) -        result = dcerpc.spoolss_AddForm(pipe, r) - -    f = test_GetForm(pipe, handle, formname) - -    if r['info']['info1'] != f: -        print 'AddForm: mismatch: %s != %s' % \ -              (r['info']['info1'], f) -        sys.exit(1) - -    r['formname'] = formname - -    test_SetForm(pipe, handle, r['info']) - -    test_DeleteForm(pipe, handle, formname) - - -def test_EnumJobs(pipe, handle): - -    print 'spoolss_EnumJobs()' - -    r = {} -    r['handle'] = handle -    r['firstjob'] = 0 -    r['numjobs'] = 0xffffffff -    r['level'] = 1 - -    result = ResizeBufferCall(dcerpc.spoolss_EnumJobs, pipe, r) - -    if result['buffer'] is None: -        return -     -    jobs = dcerpc.unmarshall_spoolss_JobInfo_array( -        result['buffer'], r['level'], result['count']) - -    for job in jobs: - -        s = {} -        s['handle'] = handle -        s['job_id'] = job['info1']['job_id'] -        s['level'] = 1 - -        result = ResizeBufferCall(dcerpc.spoolss_GetJob, pipe, s) - -        if result['info'] != job: -            print 'EnumJobs: mismatch: %s != %s' % (result['info'], job) -            sys.exit(1) -     - -    # TODO: AddJob, DeleteJob, ScheduleJob - - -def test_EnumPrinterData(pipe, handle): - -    print 'test_EnumPrinterData()' - -    enum_index = 0 - -    while 1: - -        r = {} -        r['handle'] = handle -        r['enum_index'] = enum_index - -        r['value_offered'] = 0 -        r['data_size'] = 0 -         -        result = dcerpc.spoolss_EnumPrinterData(pipe, r) - -        r['value_offered'] = result['value_needed'] -        r['data_size'] = result['data_size'] - -        result = dcerpc.spoolss_EnumPrinterData(pipe, r) - -        if result['result'] == dcerpc.WERR_NO_MORE_ITEMS: -            break - -        s = {} -        s['handle'] = handle -        s['value_name'] = result['value_name'] - -        result2 = ResizeBufferCall(dcerpc.spoolss_GetPrinterData, pipe, s) - -        if result['buffer'][:result2['buf_size']] != result2['buffer']: -            print 'EnumPrinterData/GetPrinterData mismatch' -            sys.exit(1) -  -        enum_index += 1 - - -def test_SetPrinterDataEx(pipe, handle): - -    valuename = '__printerdataextest__' -    data = '12345' - -    r = {} -    r['handle'] = handle -    r['key_name'] = 'DsSpooler' -    r['value_name'] = valuename -    r['type'] = 3 -    r['buffer'] = data -    r['buf_size'] = len(data) -     -    result = dcerpc.spoolss_SetPrinterDataEx(pipe, r) - - -def test_EnumPrinterDataEx(pipe, handle): - -    r = {} -    r['handle'] = handle -    r['key_name'] = 'DsSpooler' -    r['buf_size'] = 0 - -    result = dcerpc.spoolss_EnumPrinterDataEx(pipe, r) - -    if result['result'] == dcerpc.WERR_MORE_DATA: -        r['buf_size'] = result['buf_size'] - -        result = dcerpc.spoolss_EnumPrinterDataEx(pipe, r) - -    # TODO: test spoolss_GetPrinterDataEx() - - -def test_SetPrinterData(pipe, handle): - -    print 'testing spoolss_SetPrinterData()' - -    valuename = '__printerdatatest__' -    data = '12345' -     -    r = {} -    r['handle'] = handle -    r['value_name'] = valuename -    r['type'] = 3                       # REG_BINARY -    r['buffer'] = data -    r['real_len'] = 5 - -    dcerpc.spoolss_SetPrinterData(pipe, r) - -    s = {} -    s['handle'] = handle -    s['value_name'] = valuename - -    result = ResizeBufferCall(dcerpc.spoolss_GetPrinterData, pipe, r) - -    if result['buffer'] != data: -        print 'SetPrinterData: mismatch' -        sys.exit(1) - -    dcerpc.spoolss_DeletePrinterData(pipe, r) - - -def test_EnumPrinters(pipe): - -    print 'testing spoolss_EnumPrinters()' - -    printer_names = None - -    r = {} -    r['flags'] = 0x02 -    r['server'] = None - -    for level in [0, 1, 2, 4, 5]: - -        print 'test_EnumPrinters(level = %d)' % level - -        r['level'] = level - -        result = ResizeBufferCall(dcerpc.spoolss_EnumPrinters, pipe, r) - -        printers = dcerpc.unmarshall_spoolss_PrinterInfo_array( -            result['buffer'], r['level'], result['count']) - -        if level == 2: -            for p in printers: - -                # A nice check is for the specversion in the -                # devicemode.  This has always been observed to be -                # 1025. - -                if p['info2']['devmode']['specversion'] != 1025: -                    print 'test_EnumPrinters: specversion != 1025' -                    sys.exit(1) - -    r['level'] = 1 -    result = ResizeBufferCall(dcerpc.spoolss_EnumPrinters, pipe, r) -     -    for printer in dcerpc.unmarshall_spoolss_PrinterInfo_array( -        result['buffer'], r['level'], result['count']): - -        if string.find(printer['info1']['name'], '\\\\') == 0: -            print 'Skipping remote printer %s' % printer['info1']['name'] -            continue - -        printername = string.split(printer['info1']['name'], ',')[0] - -        handle = test_OpenPrinterEx(pipe, printername) - -        test_GetPrinter(pipe, handle) -        test_EnumPorts(pipe, handle) -        test_EnumForms(pipe, handle) -        test_AddForm(pipe, handle) -        test_EnumJobs(pipe, handle) -        test_EnumPrinterData(pipe, handle) -        test_EnumPrinterDataEx(pipe, handle) -        test_SetPrinterData(pipe, handle) -#       test_SetPrinterDataEx(pipe, handle) -        test_ClosePrinter(pipe, handle) - - -def test_EnumPrinterDrivers(pipe): - -    print 'test spoolss_EnumPrinterDrivers()' - -    for level in [1, 2, 3]: - -        r = {} -        r['server'] = None -        r['environment'] = None -        r['level'] = level - -        result = ResizeBufferCall(dcerpc.spoolss_EnumPrinterDrivers, pipe, r) - -        drivers = dcerpc.unmarshall_spoolss_DriverInfo_array( -            result['buffer'], r['level'], result['count']) - -        if level == 1: -            driver_names = map(lambda x: x['info1']['driver_name'], drivers) -             - -def test_PrintServer(pipe): -     -    handle = test_OpenPrinterEx(pipe, None) - -    # EnumForms and AddForm tests return WERR_BADFID here (??) - -    test_ClosePrinter(pipe, handle) -     - -def runtests(binding, domain, username, password): -     -    print 'Testing SPOOLSS pipe' - -    pipe = dcerpc.pipe_connect(binding, -            dcerpc.DCERPC_SPOOLSS_UUID, dcerpc.DCERPC_SPOOLSS_VERSION, -            domain, username, password) - -    test_EnumPrinters(pipe) -    test_EnumPrinterDrivers(pipe) -    test_PrintServer(pipe) diff --git a/source4/scripting/python/samba/torture/torture_samr.py b/source4/scripting/python/samba/torture/torture_samr.py deleted file mode 100755 index e73bb4f21e..0000000000 --- a/source4/scripting/python/samba/torture/torture_samr.py +++ /dev/null @@ -1,220 +0,0 @@ -#!/usr/bin/python - -import dcerpc, samr - -def test_Connect(pipe): - -    handle = samr.Connect(pipe) -    handle = samr.Connect2(pipe) -    handle = samr.Connect3(pipe) -    handle = samr.Connect4(pipe) - -    # WIN2K3 only? -     -    try: -        handle = samr.Connect5(pipe) -    except dcerpc.NTSTATUS, arg: -        if arg[0] != 0xc00000d2L:       # NT_STATUS_NET_WRITE_FAULT -            raise - -    return handle -     -def test_UserHandle(user_handle): - -    # QuerySecurity()/SetSecurity() - -    user_handle.SetSecurity(user_handle.QuerySecurity()) - -    # GetUserPwInfo() - -    user_handle.GetUserPwInfo() - -    # GetUserInfo() - -    for level in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16, 17, 20, -                  21, 23, 24, 25, 26]: - -        try: -            user_handle.QueryUserInfo(level) -            user_handle.QueryUserInfo2(level) -        except dcerpc.NTSTATUS, arg: -            if arg[0] != 0xc0000003L:   # NT_STATUS_INVALID_INFO_CLASS -                raise - -    # GetGroupsForUser() - -    user_handle.GetGroupsForUser() - -    # TestPrivateFunctionsUser() - -    try: -        user_handle.TestPrivateFunctionsUser() -    except dcerpc.NTSTATUS, arg: -        if arg[0] != 0xC0000002L: -            raise - -def test_GroupHandle(group_handle): - -    # QuerySecurity()/SetSecurity() - -    group_handle.SetSecurity(group_handle.QuerySecurity()) - -    # QueryGroupInfo() - -    for level in [1, 2, 3, 4, 5]: -        info = group_handle.QueryGroupInfo(level) - -    # TODO: SetGroupinfo() - -    # QueryGroupMember() - -    group_handle.QueryGroupMember() - -def test_AliasHandle(alias_handle): - -    # QuerySecurity()/SetSecurity() - -    alias_handle.SetSecurity(alias_handle.QuerySecurity()) - -    print alias_handle.GetMembersInAlias() - -def test_DomainHandle(name, sid, domain_handle): - -    print 'testing %s (%s)' % (name, sid) - -    # QuerySecurity()/SetSecurity() - -    domain_handle.SetSecurity(domain_handle.QuerySecurity()) - -    # LookupNames(), none mapped - -    try: -        domain_handle.LookupNames(['xxNONAMExx']) -    except dcerpc.NTSTATUS, arg: -        if arg[0] != 0xc0000073L: -            raise dcerpc.NTSTATUS(arg) - -    # LookupNames(), some mapped - -    if name != 'Builtin': -        domain_handle.LookupNames(['Administrator', 'xxNONAMExx']) - -    # QueryDomainInfo()/SetDomainInfo() - -    levels = [1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13] -    set_ok = [1, 0, 1, 1, 0, 1, 1, 0, 1,  0,  1,  0] -     -    for i in range(len(levels)): - -        info = domain_handle.QueryDomainInfo(level = levels[i]) - -        try: -            domain_handle.SetDomainInfo(levels[i], info) -        except dcerpc.NTSTATUS, arg: -            if not (arg[0] == 0xc0000003L and not set_ok[i]): -                raise - -    # QueryDomainInfo2() - -    levels = [1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13] - -    for i in range(len(levels)): -        domain_handle.QueryDomainInfo2(level = levels[i]) - -    # EnumDomainUsers - -    print 'testing users' - -    users = domain_handle.EnumDomainUsers() -    rids = domain_handle.LookupNames(users) -     -    for i in range(len(users)): -        test_UserHandle(domain_handle.OpenUser(rids[0][i])) -     -    # QueryDisplayInfo - -    for i in [1, 2, 3, 4, 5]: -        domain_handle.QueryDisplayInfo(level = i) -        domain_handle.QueryDisplayInfo2(level = i) -        domain_handle.QueryDisplayInfo3(level = i) -             -    # EnumDomainGroups - -    print 'testing groups' - -    groups = domain_handle.EnumDomainGroups() -    rids = domain_handle.LookupNames(groups) - -    for i in range(len(groups)): -        test_GroupHandle(domain_handle.OpenGroup(rids[0][i])) - -    # EnumDomainAliases - -    print 'testing aliases' - -    aliases = domain_handle.EnumDomainAliases() -    rids = domain_handle.LookupNames(aliases) - -    for i in range(len(aliases)): -        test_AliasHandle(domain_handle.OpenAlias(rids[0][i])) -     -    # CreateUser -    # CreateUser2 -    # CreateDomAlias -    # RidToSid -    # RemoveMemberFromForeignDomain -    # CreateDomainGroup -    # GetAliasMembership - -    # GetBootKeyInformation() - -    try: -        domain_handle.GetBootKeyInformation() -    except dcerpc.NTSTATUS, arg: -        pass - -    # TestPrivateFunctionsDomain() - -    try: -        domain_handle.TestPrivateFunctionsDomain() -    except dcerpc.NTSTATUS, arg: -        if arg[0] != 0xC0000002L: -            raise - -def test_ConnectHandle(connect_handle): - -    print 'testing connect handle' - -    # QuerySecurity/SetSecurity - -    connect_handle.SetSecurity(connect_handle.QuerySecurity()) - -    # Lookup bogus domain - -    try: -        connect_handle.LookupDomain('xxNODOMAINxx') -    except dcerpc.NTSTATUS, arg: -        if arg[0] != 0xC00000DFL:          # NT_STATUS_NO_SUCH_DOMAIN -            raise - -    # Test all domains - -    for domain_name in connect_handle.EnumDomains(): - -        connect_handle.GetDomPwInfo(domain_name) -        sid = connect_handle.LookupDomain(domain_name) -        domain_handle = connect_handle.OpenDomain(sid) - -        test_DomainHandle(domain_name, sid, domain_handle) - -    # TODO: Test Shutdown() function - -def runtests(binding, creds): - -    print 'Testing SAMR pipe' - -    pipe = dcerpc.pipe_connect(binding, -            dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION), creds) - -    handle = test_Connect(pipe) -    test_ConnectHandle(handle) diff --git a/source4/scripting/python/samba/torture/torture_tdb.py b/source4/scripting/python/samba/torture/torture_tdb.py deleted file mode 100755 index 7f97caf6cb..0000000000 --- a/source4/scripting/python/samba/torture/torture_tdb.py +++ /dev/null @@ -1,90 +0,0 @@ -#!/usr/bin/python - -import sys, os -import Tdb - -def fail(msg): -    print 'FAILED:', msg -    sys.exit(1) - -tdb_file = '/tmp/torture_tdb.tdb' - -# Create temporary tdb file - -t = Tdb.Tdb(tdb_file, flags = Tdb.CLEAR_IF_FIRST) - -# Check non-existent key throws KeyError exception - -try: -    t['__none__'] -except KeyError: -    pass -else: -    fail('non-existent key did not throw KeyError') - -# Check storing key - -t['bar'] = '1234' -if t['bar'] != '1234': -    fail('store key failed') - -# Check key exists - -if not t.has_key('bar'): -    fail('has_key() failed for existing key') - -if t.has_key('__none__'): -    fail('has_key() succeeded for non-existent key') - -# Delete key - -try: -    del(t['__none__']) -except KeyError: -    pass -else: -    fail('delete of non-existent key did not throw KeyError') - -del t['bar'] -if t.has_key('bar'): -    fail('delete of existing key did not delete key') - -# Clear all keys - -t.clear() -if len(t) != 0: -    fail('clear failed to remove all keys') - -# Other dict functions - -t['a'] = '1' -t['ab'] = '12' -t['abc'] = '123' - -if len(t) != 3: -    fail('len method produced wrong value') - -keys = t.keys() -values = t.values() -items = t.items() - -if set(keys) != set(['a', 'ab', 'abc']): -    fail('keys method produced wrong values') - -if set(values) != set(['1', '12', '123']): -    fail('values method produced wrong values') - -if set(items) != set([('a', '1'), ('ab', '12'), ('abc', '123')]): -    fail('values method produced wrong values') - -t.close() - -# Re-open read-only - -t = Tdb.Tdb(tdb_file, open_flags = os.O_RDONLY) -t.keys() -t.close() - -# Clean up - -os.unlink(tdb_file) diff --git a/source4/scripting/python/samba/torture/winreg.py b/source4/scripting/python/samba/torture/winreg.py deleted file mode 100755 index eb60b9847e..0000000000 --- a/source4/scripting/python/samba/torture/winreg.py +++ /dev/null @@ -1,165 +0,0 @@ -#!/usr/bin/python - -import sys, dcerpc - -def test_OpenHKLM(pipe): - -    r = {} -    r['unknown'] = {} -    r['unknown']['unknown0'] = 0x9038 -    r['unknown']['unknown1'] = 0x0000 -    r['access_required'] = 0x02000000 - -    result = dcerpc.winreg_OpenHKLM(pipe, r) - -    return result['handle'] - -def test_QueryInfoKey(pipe, handle): - -    r = {} -    r['handle'] = handle -    r['class'] = {} -    r['class']['name'] = None - -    return dcerpc.winreg_QueryInfoKey(pipe, r) - -def test_CloseKey(pipe, handle): - -    r = {} -    r['handle'] = handle - -    dcerpc.winreg_CloseKey(pipe, r) - -def test_FlushKey(pipe, handle): - -    r = {} -    r['handle'] = handle -     -    dcerpc.winreg_FlushKey(pipe, r) - -def test_GetVersion(pipe, handle): - -    r = {} -    r['handle'] = handle -     -    dcerpc.winreg_GetVersion(pipe, r) - -def test_GetKeySecurity(pipe, handle): - -    r = {} -    r['handle'] = handle -    r['unknown'] = 4 -    r['size'] = None -    r['data'] = {} -    r['data']['max_len'] = 0 -    r['data']['data'] = '' - -    result = dcerpc.winreg_GetKeySecurity(pipe, r) - -    print result - -    if result['result'] == dcerpc.WERR_INSUFFICIENT_BUFFER: -        r['size'] = {} -        r['size']['max_len'] = result['data']['max_len'] -        r['size']['offset'] = 0 -        r['size']['len'] = result['data']['max_len'] - -        result = dcerpc.winreg_GetKeySecurity(pipe, r) - -    print result - -    sys.exit(1) -     -def test_Key(pipe, handle, name, depth = 0): - -    # Don't descend too far.  Registries can be very deep. - -    if depth > 2: -        return - -    try: -        keyinfo = test_QueryInfoKey(pipe, handle) -    except dcerpc.WERROR, arg: -        if arg[0] == dcerpc.WERR_ACCESS_DENIED: -            return - -    test_GetVersion(pipe, handle) - -    test_FlushKey(pipe, handle) - -    test_GetKeySecurity(pipe, handle) - -    # Enumerate values in this key - -    r = {} -    r['handle'] = handle -    r['name_in'] = {} -    r['name_in']['len'] = 0 -    r['name_in']['max_len'] = (keyinfo['max_valnamelen'] + 1) * 2 -    r['name_in']['buffer'] = {} -    r['name_in']['buffer']['max_len'] = keyinfo['max_valnamelen']  + 1 -    r['name_in']['buffer']['offset'] = 0 -    r['name_in']['buffer']['len'] = 0 -    r['type'] = 0 -    r['value_in'] = {} -    r['value_in']['max_len'] = keyinfo['max_valbufsize'] -    r['value_in']['offset'] = 0 -    r['value_in']['len'] = 0 -    r['value_len1'] = keyinfo['max_valbufsize'] -    r['value_len2'] = 0 -     -    for i in range(0, keyinfo['num_values']): - -        r['enum_index'] = i - -        dcerpc.winreg_EnumValue(pipe, r) - -    # Recursively test subkeys of this key - -    r = {} -    r['handle'] = handle -    r['key_name_len'] = 0 -    r['unknown'] = 0x0414 -    r['in_name'] = {} -    r['in_name']['unknown'] = 0x20a -    r['in_name']['key_name'] = {} -    r['in_name']['key_name']['name'] = None -    r['class'] = {} -    r['class']['name'] = None -    r['last_changed_time'] = {} -    r['last_changed_time']['low'] = 0 -    r['last_changed_time']['high'] = 0 - -    for i in range(0, keyinfo['num_subkeys']): - -        r['enum_index'] = i - -        subkey = dcerpc.winreg_EnumKey(pipe, r) - -        s = {} -        s['handle'] = handle -        s['keyname'] = {} -        s['keyname']['name'] = subkey['out_name']['name'] -        s['unknown'] = 0 -        s['access_mask'] = 0x02000000 - -        result = dcerpc.winreg_OpenKey(pipe, s) - -        test_Key(pipe, result['handle'], name + '/' + s['keyname']['name'], -                 depth + 1) - -        test_CloseKey(pipe, result['handle']) - -    # Enumerate values - -def runtests(binding, domain, username, password): -     -    print 'Testing WINREG pipe' - -    pipe = dcerpc.pipe_connect(binding, -            dcerpc.DCERPC_WINREG_UUID, dcerpc.DCERPC_WINREG_VERSION, -            domain, username, password) - -    handle = test_OpenHKLM(pipe) - -    test_Key(pipe, handle, 'HKLM')  | 
