summaryrefslogtreecommitdiff
path: root/source4/scripting/devel/howto/wintest.py
diff options
context:
space:
mode:
Diffstat (limited to 'source4/scripting/devel/howto/wintest.py')
-rw-r--r--source4/scripting/devel/howto/wintest.py220
1 files changed, 0 insertions, 220 deletions
diff --git a/source4/scripting/devel/howto/wintest.py b/source4/scripting/devel/howto/wintest.py
deleted file mode 100644
index 13424ef2f3..0000000000
--- a/source4/scripting/devel/howto/wintest.py
+++ /dev/null
@@ -1,220 +0,0 @@
-#!/usr/bin/env python
-
-'''automated testing library for testing Samba against windows'''
-
-import pexpect, subprocess
-import sys, os, time
-
-class wintest():
- '''testing of Samba against windows VMs'''
-
- def __init__(self):
- self.vars = {}
- os.putenv('PYTHONUNBUFFERED', '1')
-
- def setvar(self, varname, value):
- '''set a substitution variable'''
- self.vars[varname] = value
-
- def load_config(self, fname):
- '''load the config file'''
- f = open(fname)
- for line in f:
- line = line.strip()
- if len(line) == 0 or line[0] == '#':
- continue
- colon = line.find(':')
- if colon == -1:
- raise RuntimeError("Invalid config line '%s'" % line)
- varname = line[0:colon].strip()
- value = line[colon+1:].strip()
- self.setvar(varname, value)
-
- def substitute(self, text):
- """Substitute strings of the form ${NAME} in text, replacing
- with substitutions from vars.
- """
- if isinstance(text, list):
- ret = text[:]
- for i in range(len(ret)):
- ret[i] = self.substitute(ret[i])
- return ret
-
- while True:
- var_start = text.find("${")
- if var_start == -1:
- return text
- var_end = text.find("}", var_start)
- if var_end == -1:
- return text
- var_name = text[var_start+2:var_end]
- if not var_name in self.vars:
- raise RuntimeError("Unknown substitution variable ${%s}" % var_name)
- text = text.replace("${%s}" % var_name, self.vars[var_name])
- return text
-
- def putenv(self, key, value):
- '''putenv with substitution'''
- os.putenv(key, self.substitute(value))
-
- def chdir(self, dir):
- '''chdir with substitution'''
- os.chdir(self.substitute(dir))
-
-
- def run_cmd(self, cmd, dir=".", show=None, output=False, checkfail=True):
- cmd = self.substitute(cmd)
- if isinstance(cmd, list):
- print('$ ' + " ".join(cmd))
- else:
- print('$ ' + cmd)
- if output:
- return subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=dir).communicate()[0]
- if isinstance(cmd, list):
- shell=False
- else:
- shell=True
- if checkfail:
- return subprocess.check_call(cmd, shell=shell, cwd=dir)
- else:
- return subprocess.call(cmd, shell=shell, cwd=dir)
-
-
- def cmd_output(self, cmd):
- '''return output from and command'''
- cmd = self.substitute(cmd)
- return self.run_cmd(cmd, output=True)
-
- def cmd_contains(self, cmd, contains, nomatch=False, ordered=False):
- '''check that command output contains the listed strings'''
- out = self.cmd_output(cmd)
- print out
- for c in self.substitute(contains):
- ofs = out.find(c)
- if nomatch:
- if ofs != -1:
- raise RuntimeError("Expected to not see %s in %s" % (c, cmd))
- else:
- if ofs == -1:
- raise RuntimeError("Expected to see %s in %s" % (c, cmd))
- if ordered and ofs != -1:
- ofs += len(c)
- out = out[ofs:]
-
- def retry_cmd(self, cmd, contains, retries=30, delay=2, wait_for_fail=False):
- '''retry a command a number of times'''
- while retries > 0:
- try:
- self.cmd_contains(cmd, contains, nomatch=wait_for_fail)
- return
- except:
- time.sleep(delay)
- retries = retries - 1
- raise RuntimeError("Failed to find %s" % contains)
-
- def pexpect_spawn(self, cmd, timeout=60):
- '''wrapper around pexpect spawn'''
- cmd = self.substitute(cmd)
- print("$ " + cmd)
- ret = pexpect.spawn(cmd, logfile=sys.stdout, timeout=timeout)
-
- def sendline_sub(line):
- line = self.substitute(line).replace('\n', '\r\n')
- return ret.old_sendline(line + '\r')
-
- def expect_sub(line, timeout=ret.timeout):
- line = self.substitute(line)
- return ret.old_expect(line, timeout=timeout)
-
- ret.old_sendline = ret.sendline
- ret.sendline = sendline_sub
- ret.old_expect = ret.expect
- ret.expect = expect_sub
-
- return ret
-
- def vm_poweroff(self, vmname, checkfail=True):
- '''power off a VM'''
- self.setvar('VMNAME', vmname)
- self.run_cmd("${VM_POWEROFF}", checkfail=checkfail)
-
- def vm_restore(self, vmname, snapshot):
- '''restore a VM'''
- self.setvar('VMNAME', vmname)
- self.setvar('SNAPSHOT', snapshot)
- self.run_cmd("${VM_RESTORE}")
-
- def ping_wait(self, hostname):
- '''wait for a hostname to come up on the network'''
- hostname = self.substitute(hostname)
- loops=10
- while loops > 0:
- try:
- self.run_cmd("ping -c 1 -w 10 %s" % hostname)
- break
- except:
- loops = loops - 1
- if loops == 0:
- raise RuntimeError("Failed to ping %s" % hostname)
- print("Host %s is up" % hostname)
-
- def port_wait(self, hostname, port, retries=100, delay=2, wait_for_fail=False):
- '''wait for a host to come up on the network'''
- self.retry_cmd("nc -v -z -w 1 %s %u" % (hostname, port), ['succeeded'],
- retries=retries, delay=delay, wait_for_fail=wait_for_fail)
-
- def run_net_time(self, child):
- '''run net time on windows'''
- child.sendline("net time \\\\${HOSTNAME} /set")
- child.expect("Do you want to set the local computer")
- child.sendline("Y")
- child.expect("The command completed successfully")
-
- def run_date_time(self, child, time_tuple=None):
- '''run date and time on windows'''
- if time_tuple is None:
- time_tuple = time.localtime()
- child.sendline("date")
- child.expect("Enter the new date:")
- child.sendline(time.strftime("%m-%d-%y", time_tuple))
- child.expect("C:")
- child.sendline("time")
- child.expect("Enter the new time:")
- child.sendline(time.strftime("%H:%M:%S", time_tuple))
- child.expect("C:")
-
-
- def open_telnet(self, hostname, username, password, retries=30, delay=3, set_time=False):
- '''open a telnet connection to a windows server, return the pexpect child'''
- while retries > 0:
- child = self.pexpect_spawn("telnet " + hostname + " -l '" + username + "'")
- i = child.expect(["Welcome to Microsoft Telnet Service",
- "No more connections are allowed to telnet server",
- "Unable to connect to remote host",
- "No route to host"])
- if i != 0:
- child.close()
- time.sleep(delay)
- retries -= 1
- continue
- child.expect("password:")
- child.sendline(password)
- child.expect("C:")
- if set_time:
- self.run_date_time(child, None)
- return child
- raise RuntimeError("Failed to connect with telnet")
-
- def kinit(self, username, password):
- '''use kinit to setup a credentials cache'''
- self.run_cmd("kdestroy")
- self.putenv('KRB5CCNAME', "${PREFIX}/ccache.test")
- username = self.substitute(username)
- s = username.split('@')
- if len(s) > 0:
- s[1] = s[1].upper()
- username = '@'.join(s)
- child = self.pexpect_spawn('kinit -V ' + username)
- child.expect("Password for")
- child.sendline(password)
- child.expect("Authenticated to Kerberos")