#!/usr/bin/env python '''automated testing library for testing Samba against windows''' import pexpect, subprocess import sys, os, time, re class wintest(): '''testing of Samba against windows VMs''' def __init__(self): self.vars = {} self.list_mode = False os.putenv('PYTHONUNBUFFERED', '1') def setvar(self, varname, value): '''set a substitution variable''' self.vars[varname] = value def setwinvars(self, vm, prefix='WIN'): '''setup WIN_XX vars based on a vm name''' for v in ['VM', 'HOSTNAME', 'USER', 'PASS', 'SNAPSHOT', 'BASEDN', 'REALM', 'DOMAIN']: vname = '%s_%s' % (vm, v) if vname in self.vars: self.setvar("%s_%s" % (prefix,v), self.substitute("${%s}" % vname)) else: self.vars.pop("%s_%s" % (prefix,v), None) def info(self, msg): '''print some information''' if not self.list_mode: print(self.substitute(msg)) def load_config(self, fname): '''load the config file''' f = open(fname) for line in f: line = line.strip() if len(line) == 0 or line[0] == '#': continue colon = line.find(':') if colon == -1: raise RuntimeError("Invalid config line '%s'" % line) varname = line[0:colon].strip() value = line[colon+1:].strip() self.setvar(varname, value) def list_steps_mode(self): '''put wintest in step listing mode''' self.list_mode = True def set_skip(self, skiplist): '''set a list of tests to skip''' self.skiplist = skiplist.split(',') def skip(self, step): '''return True if we should skip a step''' if self.list_mode: print("\t%s" % step) return True return step in self.skiplist def substitute(self, text): """Substitute strings of the form ${NAME} in text, replacing with substitutions from vars. """ if isinstance(text, list): ret = text[:] for i in range(len(ret)): ret[i] = self.substitute(ret[i]) return ret while True: var_start = text.find("${") if var_start == -1: return text var_end = text.find("}", var_start) if var_end == -1: return text var_name = text[var_start+2:var_end] if not var_name in self.vars: raise RuntimeError("Unknown substitution variable ${%s}" % var_name) text = text.replace("${%s}" % var_name, self.vars[var_name]) return text def have_var(self, varname): '''see if a variable has been set''' return varname in self.vars def putenv(self, key, value): '''putenv with substitution''' os.putenv(key, self.substitute(value)) def chdir(self, dir): '''chdir with substitution''' os.chdir(self.substitute(dir)) def del_files(self, dirs): '''delete all files in the given directory''' for d in dirs: self.run_cmd("find %s -type f | xargs rm -f" % d) def write_file(self, filename, text, mode='w'): '''write to a file''' f = open(self.substitute(filename), mode=mode) f.write(self.substitute(text)) f.close() def run_cmd(self, cmd, dir=".", show=None, output=False, checkfail=True): cmd = self.substitute(cmd) if isinstance(cmd, list): self.info('$ ' + " ".join(cmd)) else: self.info('$ ' + cmd) if output: return subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=dir).communicate()[0] if isinstance(cmd, list): shell=False else: shell=True if checkfail: return subprocess.check_call(cmd, shell=shell, cwd=dir) else: return subprocess.call(cmd, shell=shell, cwd=dir) def cmd_output(self, cmd): '''return output from and command''' cmd = self.substitute(cmd) return self.run_cmd(cmd, output=True) def cmd_contains(self, cmd, contains, nomatch=False, ordered=False, regex=False, casefold=False): '''check that command output contains the listed strings''' if isinstance(contains, str): contains = [contains] out = self.cmd_output(cmd) self.info(out) for c in self.substitute(contains): if regex: m = re.search(c, out) if m is None: start = -1 end = -1 else: start = m.start() end = m.end() elif casefold: start = out.upper().find(c.upper()) end = start + len(c) else: start = out.find(c) end = start + len(c) if nomatch: if start != -1: raise RuntimeError("Expected to not see %s in %s" % (c, cmd)) else: if start == -1: raise RuntimeError("Expected to see %s in %s" % (c, cmd)) if ordered and start != -1: out = out[end:] def retry_cmd(self, cmd, contains, retries=30, delay=2, wait_for_fail=False, ordered=False, regex=False, casefold=False): '''retry a command a number of times''' while retries > 0: try: self.cmd_contains(cmd, contains, nomatch=wait_for_fail, ordered=ordered, regex=regex, casefold=casefold) return except: time.sleep(delay) retries = retries - 1 raise RuntimeError("Failed to find %s" % contains) def pexpect_spawn(self, cmd, timeout=60): '''wrapper around pexpect spawn''' cmd = self.substitute(cmd) self.info("$ " + cmd) ret = pexpect.spawn(cmd, logfile=sys.stdout, timeout=timeout) def sendline_sub(line): line = self.substitute(line).replace('\n', '\r\n') return ret.old_sendline(line + '\r') def expect_sub(line, timeout=ret.timeout): line = self.substitute(line) return ret.old_expect(line, timeout=timeout) ret.old_sendline = ret.sendline ret.sendline = sendline_sub ret.old_expect = ret.expect ret.expect = expect_sub return ret def vm_poweroff(self, vmname, checkfail=True): '''power off a VM''' self.setvar('VMNAME', vmname) self.run_cmd("${VM_POWEROFF}", checkfail=checkfail) def vm_restore(self, vmname, snapshot): '''restore a VM''' self.setvar('VMNAME', vmname) self.setvar('SNAPSHOT', snapshot) self.run_cmd("${VM_RESTORE}") def ping_wait(self, hostname): '''wait for a hostname to come up on the network''' hostname = self.substitute(hostname) loops=10 while loops > 0: try: self.run_cmd("ping -c 1 -w 10 %s" % hostname) break except: loops = loops - 1 if loops == 0: raise RuntimeError("Failed to ping %s" % hostname) self.info("Host %s is up" % hostname) def port_wait(self, hostname, port, retries=200, delay=3, wait_for_fail=False): '''wait for a host to come up on the network''' self.retry_cmd("nc -v -z -w 1 %s %u" % (hostname, port), ['succeeded'], retries=retries, delay=delay, wait_for_fail=wait_for_fail) def run_net_time(self, child): '''run net time on windows''' child.sendline("net time \\\\${HOSTNAME} /set") child.expect("Do you want to set the local computer") child.sendline("Y") child.expect("The command completed successfully") def run_date_time(self, child, time_tuple=None): '''run date and time on windows''' if time_tuple is None: time_tuple = time.localtime() child.sendline("date") child.expect("Enter the new date:") child.sendline(time.strftime("%m-%d-%y", time_tuple)) child.expect("C:") child.sendline("time") child.expect("Enter the new time:") child.sendline(time.strftime("%H:%M:%S", time_tuple)) child.expect("C:") def open_telnet(self, hostname, username, password, retries=60, delay=5, set_time=False): '''open a telnet connection to a windows server, return the pexpect child''' while retries > 0: child = self.pexpect_spawn("telnet " + hostname + " -l '" + username + "'") i = child.expect(["Welcome to Microsoft Telnet Service", "No more connections are allowed to telnet server", "Unable to connect to remote host", "No route to host"]) if i != 0: child.close() time.sleep(delay) retries -= 1 continue child.expect("password:") child.sendline(password) child.expect("C:") if set_time: self.run_date_time(child, None) return child raise RuntimeError("Failed to connect with telnet") def kinit(self, username, password): '''use kinit to setup a credentials cache''' self.run_cmd("kdestroy") self.putenv('KRB5CCNAME', "${PREFIX}/ccache.test") username = self.substitute(username) s = username.split('@') if len(s) > 0: s[1] = s[1].upper() username = '@'.join(s) child = self.pexpect_spawn('kinit -V ' + username) child.expect("Password for") child.sendline(password) child.expect("Authenticated to Kerberos")