diff options
-rw-r--r-- | source3/stf/comfychair.py | 248 |
1 files changed, 180 insertions, 68 deletions
diff --git a/source3/stf/comfychair.py b/source3/stf/comfychair.py index 8ff7726955..522f9bedeb 100644 --- a/source3/stf/comfychair.py +++ b/source3/stf/comfychair.py @@ -31,14 +31,9 @@ For more information, see the file README.comfychair. To run a test suite based on ComfyChair, just run it as a program. """ -# TODO: Put everything into a temporary directory? - -# TODO: Have a means for tests to customize the display of their -# failure messages. In particular, if a shell command failed, then -# give its stderr. - import sys, re + class TestCase: """A base class for tests. This class defines required functions which can optionally be overridden by subclasses. It also provides some @@ -47,6 +42,43 @@ class TestCase: def __init__(self): self.test_log = "" self.background_pids = [] + self._cleanups = [] + self._enter_rundir() + self._save_environment() + self.add_cleanup(self.teardown) + + + # -------------------------------------------------- + # Save and restore directory + def _enter_rundir(self): + import os + self.basedir = os.getcwd() + self.add_cleanup(self._restore_directory) + self.rundir = os.path.join(self.basedir, + 'testtmp', + self.__class__.__name__) + self.tmpdir = os.path.join(self.rundir, 'tmp') + os.system("rm -fr %s" % self.rundir) + os.makedirs(self.tmpdir) + os.system("mkdir -p %s" % self.rundir) + os.chdir(self.rundir) + + def _restore_directory(self): + import os + os.chdir(self.basedir) + + # -------------------------------------------------- + # Save and restore environment + def _save_environment(self): + import os + self._saved_environ = os.environ.copy() + self.add_cleanup(self._restore_environment) + + def _restore_environment(self): + import os + os.environ.clear() + os.environ.update(self._saved_environ) + def setup(self): """Set up test fixture.""" @@ -60,6 +92,12 @@ class TestCase: """Run the test.""" pass + + def add_cleanup(self, c): + """Queue a cleanup to be run when the test is complete.""" + self._cleanups.append(c) + + def fail(self, reason = ""): """Say the test failed.""" raise AssertionError(reason) @@ -138,9 +176,14 @@ why.""" def runcmd_background(self, cmd): import os - name = cmd[0] self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n" - pid = os.spawnvp(os.P_NOWAIT, name, cmd) + pid = os.fork() + if pid == 0: + # child + try: + os.execvp("/bin/sh", ["/bin/sh", "-c", cmd]) + finally: + os._exit(127) self.test_log = self.test_log + "pid: %d\n" % pid return pid @@ -148,44 +191,78 @@ why.""" def runcmd(self, cmd, expectedResult = 0): """Run a command, fail if the command returns an unexpected exit code. Return the output produced.""" - rc, output = self.runcmd_unchecked(cmd) + rc, output, stderr = self.runcmd_unchecked(cmd) if rc != expectedResult: - raise AssertionError("command returned %d; expected %s: \"%s\"" % - (rc, expectedResult, cmd)) + raise AssertionError("""command returned %d; expected %s: \"%s\" +stdout: +%s +stderr: +%s""" % (rc, expectedResult, cmd, output, stderr)) + + return output, stderr + + + def run_captured(self, cmd): + """Run a command, capturing stdout and stderr. + + Based in part on popen2.py + + Returns (waitstatus, stdout, stderr).""" + import os, types + pid = os.fork() + if pid == 0: + # child + try: + pid = os.getpid() + openmode = os.O_WRONLY|os.O_CREAT|os.O_TRUNC + + outfd = os.open('%d.out' % pid, openmode, 0666) + os.dup2(outfd, 1) + os.close(outfd) + + errfd = os.open('%d.err' % pid, openmode, 0666) + os.dup2(errfd, 2) + os.close(errfd) + + if isinstance(cmd, types.StringType): + cmd = ['/bin/sh', '-c', cmd] + + os.execvp(cmd[0], cmd) + finally: + os._exit(127) + else: + # parent + exited_pid, waitstatus = os.waitpid(pid, 0) + stdout = open('%d.out' % pid).read() + stderr = open('%d.err' % pid).read() + return waitstatus, stdout, stderr - return output def runcmd_unchecked(self, cmd, skip_on_noexec = 0): - """Invoke a command; return (exitcode, stdout)""" - import os, popen2 - pobj = popen2.Popen4(cmd) - output = pobj.fromchild.read() - waitstatus = pobj.wait() + """Invoke a command; return (exitcode, stdout, stderr)""" + import os + waitstatus, stdout, stderr = self.run_captured(cmd) assert not os.WIFSIGNALED(waitstatus), \ - ("%s terminated with signal %d", cmd, os.WTERMSIG(waitstatus)) + ("%s terminated with signal %d" % (`cmd`, os.WTERMSIG(waitstatus))) rc = os.WEXITSTATUS(waitstatus) self.test_log = self.test_log + ("""Run command: %s Wait status: %#x (exit code %d, signal %d) -Output: +stdout: +%s +stderr: %s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus), - output)) + stdout, stderr)) if skip_on_noexec and rc == 127: # Either we could not execute the command or the command # returned exit code 127. According to system(3) we can't # tell the difference. raise NotRunError, "could not execute %s" % `cmd` - return rc, output + return rc, stdout, stderr + def explain_failure(self, exc_info = None): - import traceback - # Move along, nothing to see here - if not exc_info and self.test_log == "": - return - print "-----------------------------------------------------------------" - if exc_info: - traceback.print_exc(file=sys.stdout) + print "test_log:" print self.test_log - print "-----------------------------------------------------------------" def log(self, msg): @@ -201,14 +278,34 @@ class NotRunError(Exception): self.value = value -def runtests(test_list, verbose = 0): - """Run a series of tests. +def _report_error(case, debugger): + """Ask the test case to explain failure, and optionally run a debugger - Eventually, this routine will also examine sys.argv[] to handle - extra options. + Input: + case TestCase instance + debugger if true, a debugger function to be applied to the traceback +""" + import sys + ex = sys.exc_info() + print "-----------------------------------------------------------------" + if ex: + import traceback + traceback.print_exc(file=sys.stdout) + case.explain_failure() + print "-----------------------------------------------------------------" + + if debugger: + tb = ex[2] + debugger(tb) + + +def runtests(test_list, verbose = 0, debugger = None): + """Run a series of tests. Inputs: - test_list sequence of callable test objects + test_list sequence of TestCase classes + verbose print more information as testing proceeds + debugger debugger object to be applied to errors Returns: unix return code: 0 for success, 1 for failures, 2 for test failure @@ -220,37 +317,37 @@ def runtests(test_list, verbose = 0): # flush now so that long running tests are easier to follow sys.stdout.flush() + obj = None try: try: # run test and show result obj = test_class() - if hasattr(obj, "setup"): - obj.setup() + obj.setup() obj.runtest() print "OK" except KeyboardInterrupt: print "INTERRUPT" - obj.explain_failure(sys.exc_info()) + _report_error(obj, debugger) ret = 2 break except NotRunError, msg: print "NOTRUN, %s" % msg.value except: print "FAIL" - obj.explain_failure(sys.exc_info()) + _report_error(obj, debugger) ret = 1 finally: - try: - if hasattr(obj, "teardown"): - obj.teardown() - except KeyboardInterrupt: - print "interrupted during teardown" - obj.explain_failure(sys.exc_info()) - ret = 2 - break - except: - print "error during teardown" - obj.explain_failure(sys.exc_info()) - ret = 1 + while obj and obj._cleanups: + try: + apply(obj._cleanups.pop()) + except KeyboardInterrupt: + print "interrupted during teardown" + _report_error(obj, debugger) + ret = 2 + break + except: + print "error during teardown" + _report_error(obj, debugger) + ret = 1 # Display log file if we're verbose if ret == 0 and verbose: obj.explain_failure() @@ -277,9 +374,10 @@ usage: list them on the command line. options: - --help show usage message - --list list available tests - --verbose show more information while running tests + --help show usage message + --list list available tests + --verbose, -v show more information while running tests + --post-mortem, -p enter Python debugger on error """ % sys.argv[0] @@ -289,9 +387,14 @@ def print_list(test_list): print " %s" % _test_name(test_class) -def main(tests): +def main(tests, extra_tests=[]): """Main entry point for test suites based on ComfyChair. + inputs: + tests Sequence of TestCase subclasses to be run by default. + extra_tests Sequence of TestCase subclasses that are available but + not run by default. + Test suites should contain this boilerplate: if __name__ == '__main__': @@ -305,28 +408,37 @@ Calls sys.exit() on completion. from sys import argv import getopt, sys - verbose = 0 - - opts, args = getopt.getopt(argv[1:], '', ['help', 'list', 'verbose']) - if ('--help', '') in opts: - print_help() - return - elif ('--list', '') in opts: - print_list(tests) - return + opt_verbose = 0 + debugger = None - if ('--verbose', '') in opts: - verbose = 1 + opts, args = getopt.getopt(argv[1:], 'pv', + ['help', 'list', 'verbose', 'post-mortem']) + for opt, opt_arg in opts: + if opt == '--help': + print_help() + return + elif opt == '--list': + print_list(tests + extra_tests) + return + elif opt == '--verbose' or opt == '-v': + opt_verbose = 1 + elif opt == '--post-mortem' or opt == '-p': + import pdb + debugger = pdb.post_mortem if args: + all_tests = tests + extra_tests by_name = {} - for t in tests: + for t in all_tests: by_name[_test_name(t)] = t - which_tests = [by_name[name] for name in args] + which_tests = [] + for name in args: + which_tests.append(by_name[name]) else: which_tests = tests - sys.exit(runtests(which_tests, verbose)) + sys.exit(runtests(which_tests, verbose=opt_verbose, + debugger=debugger)) if __name__ == '__main__': |