diff options
author | Jelmer Vernooij <jelmer@samba.org> | 2010-01-08 02:09:20 +0100 |
---|---|---|
committer | Jelmer Vernooij <jelmer@samba.org> | 2010-01-16 19:53:49 +1300 |
commit | 28577aae928847e64a0274b5922e26e1f15d9916 (patch) | |
tree | 78f6b36e8166d3320120df0e1bc91fef7ce8f9b1 /lib/subunit/python | |
parent | b6b46b4978dcaffa0cd9803c43b8a5f1c19e227e (diff) | |
download | samba-28577aae928847e64a0274b5922e26e1f15d9916.tar.gz samba-28577aae928847e64a0274b5922e26e1f15d9916.tar.bz2 samba-28577aae928847e64a0274b5922e26e1f15d9916.zip |
Import testtools as well, required for subunit.
Diffstat (limited to 'lib/subunit/python')
21 files changed, 3845 insertions, 0 deletions
diff --git a/lib/subunit/python/testtools/__init__.py b/lib/subunit/python/testtools/__init__.py new file mode 100644 index 0000000000..0504d661d4 --- /dev/null +++ b/lib/subunit/python/testtools/__init__.py @@ -0,0 +1,58 @@ +# Copyright (c) 2008, 2009 Jonathan M. Lange. See LICENSE for details. + +"""Extensions to the standard Python unittest library.""" + +__all__ = [ + 'clone_test_with_new_id', + 'ConcurrentTestSuite', + 'ExtendedToOriginalDecorator', + 'iterate_tests', + 'MultiTestResult', + 'TestCase', + 'TestResult', + 'TextTestResult', + 'RunTest', + 'skip', + 'skipIf', + 'skipUnless', + 'ThreadsafeForwardingResult', + ] + +from testtools.matchers import ( + Matcher, + ) +from testtools.runtest import ( + RunTest, + ) +from testtools.testcase import ( + TestCase, + clone_test_with_new_id, + skip, + skipIf, + skipUnless, + ) +from testtools.testresult import ( + ExtendedToOriginalDecorator, + MultiTestResult, + TestResult, + TextTestResult, + ThreadsafeForwardingResult, + ) +from testtools.testsuite import ( + ConcurrentTestSuite, + ) +from testtools.utils import iterate_tests + +# same format as sys.version_info: "A tuple containing the five components of +# the version number: major, minor, micro, releaselevel, and serial. All +# values except releaselevel are integers; the release level is 'alpha', +# 'beta', 'candidate', or 'final'. The version_info value corresponding to the +# Python version 2.0 is (2, 0, 0, 'final', 0)." Additionally we use a +# releaselevel of 'dev' for unreleased under-development code. +# +# If the releaselevel is 'alpha' then the major/minor/micro components are not +# established at this point, and setup.py will use a version of next-$(revno). +# If the releaselevel is 'final', then the tarball will be major.minor.micro. +# Otherwise it is major.minor.micro~$(revno). + +__version__ = (0, 9, 2, 'final', 0) diff --git a/lib/subunit/python/testtools/content.py b/lib/subunit/python/testtools/content.py new file mode 100644 index 0000000000..00c782347b --- /dev/null +++ b/lib/subunit/python/testtools/content.py @@ -0,0 +1,91 @@ +# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details. + +"""Content - a MIME-like Content object.""" + +import codecs +from unittest import TestResult + +from testtools.content_type import ContentType +from testtools.utils import _b + + +class Content(object): + """A MIME-like Content object. + + Content objects can be serialised to bytes using the iter_bytes method. + If the Content-Type is recognised by other code, they are welcome to + look for richer contents that mere byte serialisation - for example in + memory object graphs etc. However, such code MUST be prepared to receive + a generic Content object that has been reconstructed from a byte stream. + + :ivar content_type: The content type of this Content. + """ + + def __init__(self, content_type, get_bytes): + """Create a ContentType.""" + if None in (content_type, get_bytes): + raise ValueError("None not permitted in %r, %r" % ( + content_type, get_bytes)) + self.content_type = content_type + self._get_bytes = get_bytes + + def __eq__(self, other): + return (self.content_type == other.content_type and + ''.join(self.iter_bytes()) == ''.join(other.iter_bytes())) + + def iter_bytes(self): + """Iterate over bytestrings of the serialised content.""" + return self._get_bytes() + + def iter_text(self): + """Iterate over the text of the serialised content. + + This is only valid for text MIME types, and will use ISO-8859-1 if + no charset parameter is present in the MIME type. (This is somewhat + arbitrary, but consistent with RFC2617 3.7.1). + + :raises: ValueError If the content type is not text/*. + """ + if self.content_type.type != "text": + raise ValueError("Not a text type %r" % self.content_type) + return self._iter_text() + + def _iter_text(self): + """Worker for iter_text - does the decoding.""" + encoding = self.content_type.parameters.get('charset', 'ISO-8859-1') + try: + # 2.5+ + decoder = codecs.getincrementaldecoder(encoding)() + for bytes in self.iter_bytes(): + yield decoder.decode(bytes) + final = decoder.decode(_b(''), True) + if final: + yield final + except AttributeError: + # < 2.5 + bytes = ''.join(self.iter_bytes()) + yield bytes.decode(encoding) + + def __repr__(self): + return "<Content type=%r, value=%r>" % ( + self.content_type, ''.join(self.iter_bytes())) + + +class TracebackContent(Content): + """Content object for tracebacks. + + This adapts an exc_info tuple to the Content interface. + text/x-traceback;language=python is used for the mime type, in order to + provide room for other languages to format their tracebacks differently. + """ + + def __init__(self, err, test): + """Create a TracebackContent for err.""" + if err is None: + raise ValueError("err may not be None") + content_type = ContentType('text', 'x-traceback', + {"language": "python", "charset": "utf8"}) + self._result = TestResult() + value = self._result._exc_info_to_string(err, test) + super(TracebackContent, self).__init__( + content_type, lambda: [value.encode("utf8")]) diff --git a/lib/subunit/python/testtools/content_type.py b/lib/subunit/python/testtools/content_type.py new file mode 100644 index 0000000000..e70fa76ec8 --- /dev/null +++ b/lib/subunit/python/testtools/content_type.py @@ -0,0 +1,30 @@ +# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details. + +"""ContentType - a MIME Content Type.""" + + +class ContentType(object): + """A content type from http://www.iana.org/assignments/media-types/ + + :ivar type: The primary type, e.g. "text" or "application" + :ivar subtype: The subtype, e.g. "plain" or "octet-stream" + :ivar parameters: A dict of additional parameters specific to the + content type. + """ + + def __init__(self, primary_type, sub_type, parameters=None): + """Create a ContentType.""" + if None in (primary_type, sub_type): + raise ValueError("None not permitted in %r, %r" % ( + primary_type, sub_type)) + self.type = primary_type + self.subtype = sub_type + self.parameters = parameters or {} + + def __eq__(self, other): + if type(other) != ContentType: + return False + return self.__dict__ == other.__dict__ + + def __repr__(self): + return "%s/%s params=%s" % (self.type, self.subtype, self.parameters) diff --git a/lib/subunit/python/testtools/matchers.py b/lib/subunit/python/testtools/matchers.py new file mode 100644 index 0000000000..947ef601b3 --- /dev/null +++ b/lib/subunit/python/testtools/matchers.py @@ -0,0 +1,169 @@ +# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details. + +"""Matchers, a way to express complex assertions outside the testcase. + +Inspired by 'hamcrest'. + +Matcher provides the abstract API that all matchers need to implement. + +Bundled matchers are listed in __all__: a list can be obtained by running +$ python -c 'import testtools.matchers; print testtools.matchers.__all__' +""" + +__metaclass__ = type +__all__ = [ + 'DocTestMatches', + 'Equals', + 'MatchesAny', + ] + +import doctest + + +class Matcher: + """A pattern matcher. + + A Matcher must implement match and __str__ to be used by + testtools.TestCase.assertThat. Matcher.match(thing) returns None when + thing is completely matched, and a Mismatch object otherwise. + + Matchers can be useful outside of test cases, as they are simply a + pattern matching language expressed as objects. + + testtools.matchers is inspired by hamcrest, but is pythonic rather than + a Java transcription. + """ + + def match(self, something): + """Return None if this matcher matches something, a Mismatch otherwise. + """ + raise NotImplementedError(self.match) + + def __str__(self): + """Get a sensible human representation of the matcher. + + This should include the parameters given to the matcher and any + state that would affect the matches operation. + """ + raise NotImplementedError(self.__str__) + + +class Mismatch: + """An object describing a mismatch detected by a Matcher.""" + + def describe(self): + """Describe the mismatch. + + This should be either a human-readable string or castable to a string. + """ + raise NotImplementedError(self.describe_difference) + + +class DocTestMatches: + """See if a string matches a doctest example.""" + + def __init__(self, example, flags=0): + """Create a DocTestMatches to match example. + + :param example: The example to match e.g. 'foo bar baz' + :param flags: doctest comparison flags to match on. e.g. + doctest.ELLIPSIS. + """ + if not example.endswith('\n'): + example += '\n' + self.want = example # required variable name by doctest. + self.flags = flags + self._checker = doctest.OutputChecker() + + def __str__(self): + if self.flags: + flagstr = ", flags=%d" % self.flags + else: + flagstr = "" + return 'DocTestMatches(%r%s)' % (self.want, flagstr) + + def _with_nl(self, actual): + result = str(actual) + if not result.endswith('\n'): + result += '\n' + return result + + def match(self, actual): + with_nl = self._with_nl(actual) + if self._checker.check_output(self.want, with_nl, self.flags): + return None + return DocTestMismatch(self, with_nl) + + def _describe_difference(self, with_nl): + return self._checker.output_difference(self, with_nl, self.flags) + + +class DocTestMismatch: + """Mismatch object for DocTestMatches.""" + + def __init__(self, matcher, with_nl): + self.matcher = matcher + self.with_nl = with_nl + + def describe(self): + return self.matcher._describe_difference(self.with_nl) + + +class Equals: + """Matches if the items are equal.""" + + def __init__(self, expected): + self.expected = expected + + def match(self, other): + if self.expected == other: + return None + return EqualsMismatch(self.expected, other) + + def __str__(self): + return "Equals(%r)" % self.expected + + +class EqualsMismatch: + """Two things differed.""" + + def __init__(self, expected, other): + self.expected = expected + self.other = other + + def describe(self): + return "%r != %r" % (self.expected, self.other) + + +class MatchesAny: + """Matches if any of the matchers it is created with match.""" + + def __init__(self, *matchers): + self.matchers = matchers + + def match(self, matchee): + results = [] + for matcher in self.matchers: + mismatch = matcher.match(matchee) + if mismatch is None: + return None + results.append(mismatch) + return MismatchesAll(results) + + def __str__(self): + return "MatchesAny(%s)" % ', '.join([ + str(matcher) for matcher in self.matchers]) + + +class MismatchesAll: + """A mismatch with many child mismatches.""" + + def __init__(self, mismatches): + self.mismatches = mismatches + + def describe(self): + descriptions = ["Differences: ["] + for mismatch in self.mismatches: + descriptions.append(mismatch.describe()) + descriptions.append("]\n") + return '\n'.join(descriptions) diff --git a/lib/subunit/python/testtools/run.py b/lib/subunit/python/testtools/run.py new file mode 100755 index 0000000000..c4f461ecfb --- /dev/null +++ b/lib/subunit/python/testtools/run.py @@ -0,0 +1,39 @@ +# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details. + +"""python -m testtools.run testspec [testspec...] + +Run some tests with the testtools extended API. + +For instance, to run the testtools test suite. + $ python -m testtools.run testtools.tests.test_suite +""" + +import sys + +from testtools.tests import test_suite +from testtools import TextTestResult + + +class TestToolsTestRunner(object): + """ A thunk object to support unittest.TestProgram.""" + + def run(self, test): + "Run the given test case or test suite." + result = TextTestResult(sys.stdout) + result.startTestRun() + try: + return test.run(result) + finally: + result.stopTestRun() + + +if __name__ == '__main__': + import optparse + from unittest import TestProgram + parser = optparse.OptionParser(__doc__) + args = parser.parse_args()[1] + if not args: + parser.error("No testspecs given.") + runner = TestToolsTestRunner() + program = TestProgram(module=None, argv=[sys.argv[0]] + args, + testRunner=runner) diff --git a/lib/subunit/python/testtools/runtest.py b/lib/subunit/python/testtools/runtest.py new file mode 100644 index 0000000000..053e2205a7 --- /dev/null +++ b/lib/subunit/python/testtools/runtest.py @@ -0,0 +1,142 @@ +# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details. + +"""Individual test case execution.""" + +__metaclass__ = type +__all__ = [ + 'RunTest', + ] + +import sys + +from testtools.testresult import ExtendedToOriginalDecorator + + +class RunTest: + """An object to run a test. + + RunTest objects are used to implement the internal logic involved in + running a test. TestCase.__init__ stores _RunTest as the class of RunTest + to execute. Passing the runTest= parameter to TestCase.__init__ allows a + different RunTest class to be used to execute the test. + + Subclassing or replacing RunTest can be useful to add functionality to the + way that tests are run in a given project. + + :ivar case: The test case that is to be run. + :ivar result: The result object a case is reporting to. + :ivar handlers: A list of (ExceptionClass->handler code) for exceptions + that should be caught if raised from the user code. Exceptions that + are caught are checked against this list in first to last order. + There is a catchall of Exception at the end of the list, so to add + a new exception to the list, insert it at the front (which ensures that + it will be checked before any existing base classes in the list. If you + add multiple exceptions some of which are subclasses of each other, add + the most specific exceptions last (so they come before their parent + classes in the list). + :ivar exception_caught: An object returned when _run_user catches an + exception. + """ + + def __init__(self, case, handlers=None): + """Create a RunTest to run a case. + + :param case: A testtools.TestCase test case object. + :param handlers: Exception handlers for this RunTest. These are stored + in self.handlers and can be modified later if needed. + """ + self.case = case + self.handlers = handlers or [] + self.exception_caught = object() + + def run(self, result=None): + """Run self.case reporting activity to result. + + :param result: Optional testtools.TestResult to report activity to. + :return: The result object the test was run against. + """ + if result is None: + actual_result = self.case.defaultTestResult() + actual_result.startTestRun() + else: + actual_result = result + try: + return self._run_one(actual_result) + finally: + if result is None: + actual_result.stopTestRun() + + def _run_one(self, result): + """Run one test reporting to result. + + :param result: A testtools.TestResult to report activity to. + This result object is decorated with an ExtendedToOriginalDecorator + to ensure that the latest TestResult API can be used with + confidence by client code. + :return: The result object the test was run against. + """ + return self._run_prepared_result(ExtendedToOriginalDecorator(result)) + + def _run_prepared_result(self, result): + """Run one test reporting to result. + + :param result: A testtools.TestResult to report activity to. + :return: The result object the test was run against. + """ + result.startTest(self.case) + self.result = result + try: + self._run_core() + finally: + result.stopTest(self.case) + return result + + def _run_core(self): + """Run the user supplied test code.""" + if self.exception_caught == self._run_user(self.case._run_setup, + self.result): + # Don't run the test method if we failed getting here. + self.case._runCleanups(self.result) + return + # Run everything from here on in. If any of the methods raise an + # exception we'll have failed. + failed = False + try: + if self.exception_caught == self._run_user( + self.case._run_test_method, self.result): + failed = True + finally: + try: + if self.exception_caught == self._run_user( + self.case._run_teardown, self.result): + failed = True + finally: + try: + if not self._run_user( + self.case._runCleanups, self.result): + failed = True + finally: + if not failed: + self.result.addSuccess(self.case, + details=self.case.getDetails()) + + def _run_user(self, fn, *args): + """Run a user supplied function. + + Exceptions are processed by self.handlers. + """ + try: + return fn(*args) + except KeyboardInterrupt: + raise + except Exception: + # Note that bare exceptions are not caught, so raised strings will + # escape: but they are deprecated anyway. + exc_info = sys.exc_info() + e = exc_info[1] + for exc_class, handler in self.handlers: + self.case.onException(exc_info) + if isinstance(e, exc_class): + handler(self.case, self.result, e) + return self.exception_caught + raise e diff --git a/lib/subunit/python/testtools/testcase.py b/lib/subunit/python/testtools/testcase.py new file mode 100644 index 0000000000..a1d822ed47 --- /dev/null +++ b/lib/subunit/python/testtools/testcase.py @@ -0,0 +1,444 @@ +# Copyright (c) 2008, 2009 Jonathan M. Lange. See LICENSE for details. + +"""Test case related stuff.""" + +__metaclass__ = type +__all__ = [ + 'clone_test_with_new_id', + 'TestCase', + 'skip', + 'skipIf', + 'skipUnless', + ] + +import copy +try: + from functools import wraps +except ImportError: + wraps = None +import itertools +import sys +import types +import unittest + +from testtools import content +from testtools.runtest import RunTest +from testtools.testresult import TestResult +from testtools.utils import advance_iterator + + +try: + # Try to use the python2.7 SkipTest exception for signalling skips. + from unittest.case import SkipTest as TestSkipped +except ImportError: + class TestSkipped(Exception): + """Raised within TestCase.run() when a test is skipped.""" + + +try: + # Try to use the same exceptions python 2.7 does. + from unittest.case import _ExpectedFailure, _UnexpectedSuccess +except ImportError: + # Oops, not available, make our own. + class _UnexpectedSuccess(Exception): + """An unexpected success was raised. + + Note that this exception is private plumbing in testtools' testcase + module. + """ + + class _ExpectedFailure(Exception): + """An expected failure occured. + + Note that this exception is private plumbing in testtools' testcase + module. + """ + + +class TestCase(unittest.TestCase): + """Extensions to the basic TestCase. + + :ivar exception_handlers: Exceptions to catch from setUp, runTest and + tearDown. This list is able to be modified at any time and consists of + (exception_class, handler(case, result, exception_value)) pairs. + """ + + skipException = TestSkipped + + def __init__(self, *args, **kwargs): + """Construct a TestCase. + + :param testMethod: The name of the method to run. + :param runTest: Optional class to use to execute the test. If not + supplied testtools.runtest.RunTest is used. The instance to be + used is created when run() is invoked, so will be fresh each time. + """ + unittest.TestCase.__init__(self, *args, **kwargs) + self._cleanups = [] + self._unique_id_gen = itertools.count(1) + self.__setup_called = False + self.__teardown_called = False + self.__details = {} + self.__RunTest = kwargs.get('runTest', RunTest) + self.__exception_handlers = [] + self.exception_handlers = [ + (self.skipException, self._report_skip), + (self.failureException, self._report_failure), + (_ExpectedFailure, self._report_expected_failure), + (_UnexpectedSuccess, self._report_unexpected_success), + (Exception, self._report_error), + ] + + def __eq__(self, other): + eq = getattr(unittest.TestCase, '__eq__', None) + if eq is not None and not unittest.TestCase.__eq__(self, other): + return False + return self.__dict__ == other.__dict__ + + def __repr__(self): + # We add id to the repr because it makes testing testtools easier. + return "<%s id=0x%0x>" % (self.id(), id(self)) + + def addDetail(self, name, content_object): + """Add a detail to be reported with this test's outcome. + + For more details see pydoc testtools.TestResult. + + :param name: The name to give this detail. + :param content_object: The content object for this detail. See + testtools.content for more detail. + """ + self.__details[name] = content_object + + def getDetails(self): + """Get the details dict that will be reported with this test's outcome. + + For more details see pydoc testtools.TestResult. + """ + return self.__details + + def shortDescription(self): + return self.id() + + def skip(self, reason): + """Cause this test to be skipped. + + This raises self.skipException(reason). skipException is raised + to permit a skip to be triggered at any point (during setUp or the + testMethod itself). The run() method catches skipException and + translates that into a call to the result objects addSkip method. + + :param reason: The reason why the test is being skipped. This must + support being cast into a unicode string for reporting. + """ + raise self.skipException(reason) + + def _formatTypes(self, classOrIterable): + """Format a class or a bunch of classes for display in an error.""" + className = getattr(classOrIterable, '__name__', None) + if className is None: + className = ', '.join(klass.__name__ for klass in classOrIterable) + return className + + def _runCleanups(self, result): + """Run the cleanups that have been added with addCleanup. + + See the docstring for addCleanup for more information. + + Returns True if all cleanups ran without error, False otherwise. + """ + ok = True + while self._cleanups: + function, arguments, keywordArguments = self._cleanups.pop() + try: + function(*arguments, **keywordArguments) + except KeyboardInterrupt: + raise + except: + self._report_error(self, result, None) + ok = False + return ok + + def addCleanup(self, function, *arguments, **keywordArguments): + """Add a cleanup function to be called after tearDown. + + Functions added with addCleanup will be called in reverse order of + adding after the test method and before tearDown. + + If a function added with addCleanup raises an exception, the error + will be recorded as a test error, and the next cleanup will then be + run. + + Cleanup functions are always called before a test finishes running, + even if setUp is aborted by an exception. + """ + self._cleanups.append((function, arguments, keywordArguments)) + + def addOnException(self, handler): + """Add a handler to be called when an exception occurs in test code. + + This handler cannot affect what result methods are called, and is + called before any outcome is called on the result object. An example + use for it is to add some diagnostic state to the test details dict + which is expensive to calculate and not interesting for reporting in + the success case. + + Handlers are called before the outcome (such as addFailure) that + the exception has caused. + + Handlers are called in first-added, first-called order, and if they + raise an exception, that will propogate out of the test running + machinery, halting test processing. As a result, do not call code that + may unreasonably fail. + """ + self.__exception_handlers.append(handler) + + def _add_reason(self, reason): + self.addDetail('reason', content.Content( + content.ContentType('text', 'plain'), + lambda: [reason.encode('utf8')])) + + def assertIn(self, needle, haystack): + """Assert that needle is in haystack.""" + self.assertTrue( + needle in haystack, '%r not in %r' % (needle, haystack)) + + def assertIs(self, expected, observed): + """Assert that `expected` is `observed`.""" + self.assertTrue( + expected is observed, '%r is not %r' % (expected, observed)) + + def assertIsNot(self, expected, observed): + """Assert that `expected` is not `observed`.""" + self.assertTrue( + expected is not observed, '%r is %r' % (expected, observed)) + + def assertNotIn(self, needle, haystack): + """Assert that needle is not in haystack.""" + self.assertTrue( + needle not in haystack, '%r in %r' % (needle, haystack)) + + def assertIsInstance(self, obj, klass): + self.assertTrue( + isinstance(obj, klass), + '%r is not an instance of %s' % (obj, self._formatTypes(klass))) + + def assertRaises(self, excClass, callableObj, *args, **kwargs): + """Fail unless an exception of class excClass is thrown + by callableObj when invoked with arguments args and keyword + arguments kwargs. If a different type of exception is + thrown, it will not be caught, and the test case will be + deemed to have suffered an error, exactly as for an + unexpected exception. + """ + try: + ret = callableObj(*args, **kwargs) + except excClass: + return sys.exc_info()[1] + else: + excName = self._formatTypes(excClass) + self.fail("%s not raised, %r returned instead." % (excName, ret)) + failUnlessRaises = assertRaises + + def assertThat(self, matchee, matcher): + """Assert that matchee is matched by matcher. + + :param matchee: An object to match with matcher. + :param matcher: An object meeting the testtools.Matcher protocol. + :raises self.failureException: When matcher does not match thing. + """ + mismatch = matcher.match(matchee) + if not mismatch: + return + self.fail('Match failed. Matchee: "%s"\nMatcher: %s\nDifference: %s\n' + % (matchee, matcher, mismatch.describe())) + + def defaultTestResult(self): + return TestResult() + + def expectFailure(self, reason, predicate, *args, **kwargs): + """Check that a test fails in a particular way. + + If the test fails in the expected way, a KnownFailure is caused. If it + succeeds an UnexpectedSuccess is caused. + + The expected use of expectFailure is as a barrier at the point in a + test where the test would fail. For example: + >>> def test_foo(self): + >>> self.expectFailure("1 should be 0", self.assertNotEqual, 1, 0) + >>> self.assertEqual(1, 0) + + If in the future 1 were to equal 0, the expectFailure call can simply + be removed. This separation preserves the original intent of the test + while it is in the expectFailure mode. + """ + self._add_reason(reason) + try: + predicate(*args, **kwargs) + except self.failureException: + exc_info = sys.exc_info() + self.addDetail('traceback', + content.TracebackContent(exc_info, self)) + raise _ExpectedFailure(exc_info) + else: + raise _UnexpectedSuccess(reason) + + def getUniqueInteger(self): + """Get an integer unique to this test. + + Returns an integer that is guaranteed to be unique to this instance. + Use this when you need an arbitrary integer in your test, or as a + helper for custom anonymous factory methods. + """ + return advance_iterator(self._unique_id_gen) + + def getUniqueString(self, prefix=None): + """Get a string unique to this test. + + Returns a string that is guaranteed to be unique to this instance. Use + this when you need an arbitrary string in your test, or as a helper + for custom anonymous factory methods. + + :param prefix: The prefix of the string. If not provided, defaults + to the id of the tests. + :return: A bytestring of '<prefix>-<unique_int>'. + """ + if prefix is None: + prefix = self.id() + return '%s-%d' % (prefix, self.getUniqueInteger()) + + def onException(self, exc_info): + """Called when an exception propogates from test code. + + :seealso addOnException: + """ + for handler in self.__exception_handlers: + handler(exc_info) + + @staticmethod + def _report_error(self, result, err): + self._report_traceback() + result.addError(self, details=self.getDetails()) + + @staticmethod + def _report_expected_failure(self, result, err): + result.addExpectedFailure(self, details=self.getDetails()) + + @staticmethod + def _report_failure(self, result, err): + self._report_traceback() + result.addFailure(self, details=self.getDetails()) + + @staticmethod + def _report_skip(self, result, err): + if err.args: + reason = err.args[0] + else: + reason = "no reason given." + self._add_reason(reason) + result.addSkip(self, details=self.getDetails()) + + def _report_traceback(self): + self.addDetail('traceback', + content.TracebackContent(sys.exc_info(), self)) + + @staticmethod + def _report_unexpected_success(self, result, err): + result.addUnexpectedSuccess(self, details=self.getDetails()) + + def run(self, result=None): + return self.__RunTest(self, self.exception_handlers).run(result) + + def _run_setup(self, result): + """Run the setUp function for this test. + + :param result: A testtools.TestResult to report activity to. + :raises ValueError: If the base class setUp is not called, a + ValueError is raised. + """ + self.setUp() + if not self.__setup_called: + raise ValueError("setUp was not called") + + def _run_teardown(self, result): + """Run the tearDown function for this test. + + :param result: A testtools.TestResult to report activity to. + :raises ValueError: If the base class tearDown is not called, a + ValueError is raised. + """ + self.tearDown() + if not self.__teardown_called: + raise ValueError("teardown was not called") + + def _run_test_method(self, result): + """Run the test method for this test. + + :param result: A testtools.TestResult to report activity to. + :return: None. + """ + absent_attr = object() + # Python 2.5+ + method_name = getattr(self, '_testMethodName', absent_attr) + if method_name is absent_attr: + # Python 2.4 + method_name = getattr(self, '_TestCase__testMethodName') + testMethod = getattr(self, method_name) + testMethod() + + def setUp(self): + unittest.TestCase.setUp(self) + self.__setup_called = True + + def tearDown(self): + unittest.TestCase.tearDown(self) + self.__teardown_called = True + + +# Python 2.4 did not know how to deep copy functions. +if types.FunctionType not in copy._deepcopy_dispatch: + copy._deepcopy_dispatch[types.FunctionType] = copy._deepcopy_atomic + + +def clone_test_with_new_id(test, new_id): + """Copy a TestCase, and give the copied test a new id.""" + newTest = copy.deepcopy(test) + newTest.id = lambda: new_id + return newTest + + +def skip(reason): + """A decorator to skip unit tests. + + This is just syntactic sugar so users don't have to change any of their + unit tests in order to migrate to python 2.7, which provides the + @unittest.skip decorator. + """ + def decorator(test_item): + if wraps is not None: + @wraps(test_item) + def skip_wrapper(*args, **kwargs): + raise TestCase.skipException(reason) + else: + def skip_wrapper(test_item): + test_item.skip(reason) + return skip_wrapper + return decorator + + +def skipIf(condition, reason): + """Skip a test if the condition is true.""" + if condition: + return skip(reason) + def _id(obj): + return obj + return _id + + +def skipUnless(condition, reason): + """Skip a test unless the condition is true.""" + if not condition: + return skip(reason) + def _id(obj): + return obj + return _id diff --git a/lib/subunit/python/testtools/testresult/__init__.py b/lib/subunit/python/testtools/testresult/__init__.py new file mode 100644 index 0000000000..2ee3d25293 --- /dev/null +++ b/lib/subunit/python/testtools/testresult/__init__.py @@ -0,0 +1,19 @@ +# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details. + +"""Test result objects.""" + +__all__ = [ + 'ExtendedToOriginalDecorator', + 'MultiTestResult', + 'TestResult', + 'TextTestResult', + 'ThreadsafeForwardingResult', + ] + +from real import ( + ExtendedToOriginalDecorator, + MultiTestResult, + TestResult, + TextTestResult, + ThreadsafeForwardingResult, + ) diff --git a/lib/subunit/python/testtools/testresult/doubles.py b/lib/subunit/python/testtools/testresult/doubles.py new file mode 100644 index 0000000000..d231c919c2 --- /dev/null +++ b/lib/subunit/python/testtools/testresult/doubles.py @@ -0,0 +1,95 @@ +# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details. + +"""Doubles of test result objects, useful for testing unittest code.""" + +__all__ = [ + 'Python26TestResult', + 'Python27TestResult', + 'ExtendedTestResult', + ] + + +class LoggingBase(object): + """Basic support for logging of results.""" + + def __init__(self): + self._events = [] + self.shouldStop = False + + +class Python26TestResult(LoggingBase): + """A precisely python 2.6 like test result, that logs.""" + + def addError(self, test, err): + self._events.append(('addError', test, err)) + + def addFailure(self, test, err): + self._events.append(('addFailure', test, err)) + + def addSuccess(self, test): + self._events.append(('addSuccess', test)) + + def startTest(self, test): + self._events.append(('startTest', test)) + + def stop(self): + self.shouldStop = True + + def stopTest(self, test): + self._events.append(('stopTest', test)) + + +class Python27TestResult(Python26TestResult): + """A precisely python 2.7 like test result, that logs.""" + + def addExpectedFailure(self, test, err): + self._events.append(('addExpectedFailure', test, err)) + + def addSkip(self, test, reason): + self._events.append(('addSkip', test, reason)) + + def addUnexpectedSuccess(self, test): + self._events.append(('addUnexpectedSuccess', test)) + + def startTestRun(self): + self._events.append(('startTestRun',)) + + def stopTestRun(self): + self._events.append(('stopTestRun',)) + + +class ExtendedTestResult(Python27TestResult): + """A test result like the proposed extended unittest result API.""" + + def addError(self, test, err=None, details=None): + self._events.append(('addError', test, err or details)) + + def addFailure(self, test, err=None, details=None): + self._events.append(('addFailure', test, err or details)) + + def addExpectedFailure(self, test, err=None, details=None): + self._events.append(('addExpectedFailure', test, err or details)) + + def addSkip(self, test, reason=None, details=None): + self._events.append(('addSkip', test, reason or details)) + + def addSuccess(self, test, details=None): + if details: + self._events.append(('addSuccess', test, details)) + else: + self._events.append(('addSuccess', test)) + + def addUnexpectedSuccess(self, test, details=None): + if details is not None: + self._events.append(('addUnexpectedSuccess', test, details)) + else: + self._events.append(('addUnexpectedSuccess', test)) + + def progress(self, offset, whence): + self._events.append(('progress', offset, whence)) + + def tags(self, new_tags, gone_tags): + self._events.append(('tags', new_tags, gone_tags)) + + def time(self, time): + self._events.append(('time', time)) diff --git a/lib/subunit/python/testtools/testresult/real.py b/lib/subunit/python/testtools/testresult/real.py new file mode 100644 index 0000000000..8c8a3edd6e --- /dev/null +++ b/lib/subunit/python/testtools/testresult/real.py @@ -0,0 +1,540 @@ +# Copyright (c) 2008 Jonathan M. Lange. See LICENSE for details. + +"""Test results and related things.""" + +__metaclass__ = type +__all__ = [ + 'ExtendedToOriginalDecorator', + 'MultiTestResult', + 'TestResult', + 'ThreadsafeForwardingResult', + ] + +import datetime +import unittest + + +class TestResult(unittest.TestResult): + """Subclass of unittest.TestResult extending the protocol for flexability. + + This test result supports an experimental protocol for providing additional + data to in test outcomes. All the outcome methods take an optional dict + 'details'. If supplied any other detail parameters like 'err' or 'reason' + should not be provided. The details dict is a mapping from names to + MIME content objects (see testtools.content). This permits attaching + tracebacks, log files, or even large objects like databases that were + part of the test fixture. Until this API is accepted into upstream + Python it is considered experimental: it may be replaced at any point + by a newer version more in line with upstream Python. Compatibility would + be aimed for in this case, but may not be possible. + + :ivar skip_reasons: A dict of skip-reasons -> list of tests. See addSkip. + """ + + def __init__(self): + super(TestResult, self).__init__() + self.skip_reasons = {} + self.__now = None + # -- Start: As per python 2.7 -- + self.expectedFailures = [] + self.unexpectedSuccesses = [] + # -- End: As per python 2.7 -- + + def addExpectedFailure(self, test, err=None, details=None): + """Called when a test has failed in an expected manner. + + Like with addSuccess and addError, testStopped should still be called. + + :param test: The test that has been skipped. + :param err: The exc_info of the error that was raised. + :return: None + """ + # This is the python 2.7 implementation + self.expectedFailures.append( + (test, self._err_details_to_string(test, err, details))) + + def addError(self, test, err=None, details=None): + """Called when an error has occurred. 'err' is a tuple of values as + returned by sys.exc_info(). + + :param details: Alternative way to supply details about the outcome. + see the class docstring for more information. + """ + self.errors.append((test, + self._err_details_to_string(test, err, details))) + + def addFailure(self, test, err=None, details=None): + """Called when an error has occurred. 'err' is a tuple of values as + returned by sys.exc_info(). + + :param details: Alternative way to supply details about the outcome. + see the class docstring for more information. + """ + self.failures.append((test, + self._err_details_to_string(test, err, details))) + + def addSkip(self, test, reason=None, details=None): + """Called when a test has been skipped rather than running. + + Like with addSuccess and addError, testStopped should still be called. + + This must be called by the TestCase. 'addError' and 'addFailure' will + not call addSkip, since they have no assumptions about the kind of + errors that a test can raise. + + :param test: The test that has been skipped. + :param reason: The reason for the test being skipped. For instance, + u"pyGL is not available". + :param details: Alternative way to supply details about the outcome. + see the class docstring for more information. + :return: None + """ + if reason is None: + reason = details.get('reason') + if reason is None: + reason = 'No reason given' + else: + reason = ''.join(reason.iter_text()) + skip_list = self.skip_reasons.setdefault(reason, []) + skip_list.append(test) + + def addSuccess(self, test, details=None): + """Called when a test succeeded.""" + + def addUnexpectedSuccess(self, test, details=None): + """Called when a test was expected to fail, but succeed.""" + self.unexpectedSuccesses.append(test) + + def _err_details_to_string(self, test, err=None, details=None): + """Convert an error in exc_info form or a contents dict to a string.""" + if err is not None: + return self._exc_info_to_string(err, test) + return _details_to_str(details) + + def _now(self): + """Return the current 'test time'. + + If the time() method has not been called, this is equivalent to + datetime.now(), otherwise its the last supplied datestamp given to the + time() method. + """ + if self.__now is None: + return datetime.datetime.now() + else: + return self.__now + + def startTestRun(self): + """Called before a test run starts. + + New in python 2.7 + """ + + def stopTestRun(self): + """Called after a test run completes + + New in python 2.7 + """ + + def time(self, a_datetime): + """Provide a timestamp to represent the current time. + + This is useful when test activity is time delayed, or happening + concurrently and getting the system time between API calls will not + accurately represent the duration of tests (or the whole run). + + Calling time() sets the datetime used by the TestResult object. + Time is permitted to go backwards when using this call. + + :param a_datetime: A datetime.datetime object with TZ information or + None to reset the TestResult to gathering time from the system. + """ + self.__now = a_datetime + + def done(self): + """Called when the test runner is done. + + deprecated in favour of stopTestRun. + """ + + +class MultiTestResult(TestResult): + """A test result that dispatches to many test results.""" + + def __init__(self, *results): + TestResult.__init__(self) + self._results = map(ExtendedToOriginalDecorator, results) + + def _dispatch(self, message, *args, **kwargs): + for result in self._results: + getattr(result, message)(*args, **kwargs) + + def startTest(self, test): + self._dispatch('startTest', test) + + def stopTest(self, test): + self._dispatch('stopTest', test) + + def addError(self, test, error=None, details=None): + self._dispatch('addError', test, error, details=details) + + def addExpectedFailure(self, test, err=None, details=None): + self._dispatch('addExpectedFailure', test, err, details=details) + + def addFailure(self, test, err=None, details=None): + self._dispatch('addFailure', test, err, details=details) + + def addSkip(self, test, reason=None, details=None): + self._dispatch('addSkip', test, reason, details=details) + + def addSuccess(self, test, details=None): + self._dispatch('addSuccess', test, details=details) + + def addUnexpectedSuccess(self, test, details=None): + self._dispatch('addUnexpectedSuccess', test, details=details) + + def startTestRun(self): + self._dispatch('startTestRun') + + def stopTestRun(self): + self._dispatch('stopTestRun') + + def done(self): + self._dispatch('done') + + +class TextTestResult(TestResult): + """A TestResult which outputs activity to a text stream.""" + + def __init__(self, stream): + """Construct a TextTestResult writing to stream.""" + super(TextTestResult, self).__init__() + self.stream = stream + self.sep1 = '=' * 70 + '\n' + self.sep2 = '-' * 70 + '\n' + + def _delta_to_float(self, a_timedelta): + return (a_timedelta.days * 86400.0 + a_timedelta.seconds + + a_timedelta.microseconds / 1000000.0) + + def _show_list(self, label, error_list): + for test, output in error_list: + self.stream.write(self.sep1) + self.stream.write("%s: %s\n" % (label, test.id())) + self.stream.write(self.sep2) + self.stream.write(output) + + def startTestRun(self): + super(TextTestResult, self).startTestRun() + self.__start = self._now() + self.stream.write("Tests running...\n") + + def stopTestRun(self): + if self.testsRun != 1: + plural = 's' + else: + plural = '' + stop = self._now() + self._show_list('ERROR', self.errors) + self._show_list('FAIL', self.failures) + self.stream.write("Ran %d test%s in %.3fs\n\n" % + (self.testsRun, plural, + self._delta_to_float(stop - self.__start))) + if self.wasSuccessful(): + self.stream.write("OK\n") + else: + self.stream.write("FAILED (") + details = [] + details.append("failures=%d" % ( + len(self.failures) + len(self.errors))) + self.stream.write(", ".join(details)) + self.stream.write(")\n") + super(TextTestResult, self).stopTestRun() + + +class ThreadsafeForwardingResult(TestResult): + """A TestResult which ensures the target does not receive mixed up calls. + + This is used when receiving test results from multiple sources, and batches + up all the activity for a single test into a thread-safe batch where all + other ThreadsafeForwardingResult objects sharing the same semaphore will be + locked out. + + Typical use of ThreadsafeForwardingResult involves creating one + ThreadsafeForwardingResult per thread in a ConcurrentTestSuite. These + forward to the TestResult that the ConcurrentTestSuite run method was + called with. + + target.done() is called once for each ThreadsafeForwardingResult that + forwards to the same target. If the target's done() takes special action, + care should be taken to accommodate this. + """ + + def __init__(self, target, semaphore): + """Create a ThreadsafeForwardingResult forwarding to target. + + :param target: A TestResult. + :param semaphore: A threading.Semaphore with limit 1. + """ + TestResult.__init__(self) + self.result = ExtendedToOriginalDecorator(target) + self.semaphore = semaphore + + def addError(self, test, err=None, details=None): + self.semaphore.acquire() + try: + self.result.startTest(test) + self.result.addError(test, err, details=details) + self.result.stopTest(test) + finally: + self.semaphore.release() + + def addExpectedFailure(self, test, err=None, details=None): + self.semaphore.acquire() + try: + self.result.startTest(test) + self.result.addExpectedFailure(test, err, details=details) + self.result.stopTest(test) + finally: + self.semaphore.release() + + def addFailure(self, test, err=None, details=None): + self.semaphore.acquire() + try: + self.result.startTest(test) + self.result.addFailure(test, err, details=details) + self.result.stopTest(test) + finally: + self.semaphore.release() + + def addSkip(self, test, reason=None, details=None): + self.semaphore.acquire() + try: + self.result.startTest(test) + self.result.addSkip(test, reason, details=details) + self.result.stopTest(test) + finally: + self.semaphore.release() + + def addSuccess(self, test, details=None): + self.semaphore.acquire() + try: + self.result.startTest(test) + self.result.addSuccess(test, details=details) + self.result.stopTest(test) + finally: + self.semaphore.release() + + def addUnexpectedSuccess(self, test, details=None): + self.semaphore.acquire() + try: + self.result.startTest(test) + self.result.addUnexpectedSuccess(test, details=details) + self.result.stopTest(test) + finally: + self.semaphore.release() + + def startTestRun(self): + self.semaphore.acquire() + try: + self.result.startTestRun() + finally: + self.semaphore.release() + + def stopTestRun(self): + self.semaphore.acquire() + try: + self.result.stopTestRun() + finally: + self.semaphore.release() + + def done(self): + self.semaphore.acquire() + try: + self.result.done() + finally: + self.semaphore.release() + + +class ExtendedToOriginalDecorator(object): + """Permit new TestResult API code to degrade gracefully with old results. + + This decorates an existing TestResult and converts missing outcomes + such as addSkip to older outcomes such as addSuccess. It also supports + the extended details protocol. In all cases the most recent protocol + is attempted first, and fallbacks only occur when the decorated result + does not support the newer style of calling. + """ + + def __init__(self, decorated): + self.decorated = decorated + + def __getattr__(self, name): + return getattr(self.decorated, name) + + def addError(self, test, err=None, details=None): + self._check_args(err, details) + if details is not None: + try: + return self.decorated.addError(test, details=details) + except TypeError: + # have to convert + err = self._details_to_exc_info(details) + return self.decorated.addError(test, err) + + def addExpectedFailure(self, test, err=None, details=None): + self._check_args(err, details) + addExpectedFailure = getattr( + self.decorated, 'addExpectedFailure', None) + if addExpectedFailure is None: + return self.addSuccess(test) + if details is not None: + try: + return addExpectedFailure(test, details=details) + except TypeError: + # have to convert + err = self._details_to_exc_info(details) + return addExpectedFailure(test, err) + + def addFailure(self, test, err=None, details=None): + self._check_args(err, details) + if details is not None: + try: + return self.decorated.addFailure(test, details=details) + except TypeError: + # have to convert + err = self._details_to_exc_info(details) + return self.decorated.addFailure(test, err) + + def addSkip(self, test, reason=None, details=None): + self._check_args(reason, details) + addSkip = getattr(self.decorated, 'addSkip', None) + if addSkip is None: + return self.decorated.addSuccess(test) + if details is not None: + try: + return addSkip(test, details=details) + except TypeError: + # have to convert + reason = _details_to_str(details) + return addSkip(test, reason) + + def addUnexpectedSuccess(self, test, details=None): + outcome = getattr(self.decorated, 'addUnexpectedSuccess', None) + if outcome is None: + return self.decorated.addSuccess(test) + if details is not None: + try: + return outcome(test, details=details) + except TypeError: + pass + return outcome(test) + + def addSuccess(self, test, details=None): + if details is not None: + try: + return self.decorated.addSuccess(test, details=details) + except TypeError: + pass + return self.decorated.addSuccess(test) + + def _check_args(self, err, details): + param_count = 0 + if err is not None: + param_count += 1 + if details is not None: + param_count += 1 + if param_count != 1: + raise ValueError("Must pass only one of err '%s' and details '%s" + % (err, details)) + + def _details_to_exc_info(self, details): + """Convert a details dict to an exc_info tuple.""" + return (_StringException, + _StringException(_details_to_str(details)), None) + + def done(self): + try: + return self.decorated.done() + except AttributeError: + return + + def progress(self, offset, whence): + method = getattr(self.decorated, 'progress', None) + if method is None: + return + return method(offset, whence) + + @property + def shouldStop(self): + return self.decorated.shouldStop + + def startTest(self, test): + return self.decorated.startTest(test) + + def startTestRun(self): + try: + return self.decorated.startTestRun() + except AttributeError: + return + + def stop(self): + return self.decorated.stop() + + def stopTest(self, test): + return self.decorated.stopTest(test) + + def stopTestRun(self): + try: + return self.decorated.stopTestRun() + except AttributeError: + return + + def tags(self, new_tags, gone_tags): + method = getattr(self.decorated, 'tags', None) + if method is None: + return + return method(new_tags, gone_tags) + + def time(self, a_datetime): + method = getattr(self.decorated, 'time', None) + if method is None: + return + return method(a_datetime) + + def wasSuccessful(self): + return self.decorated.wasSuccessful() + + +class _StringException(Exception): + """An exception made from an arbitrary string.""" + + def __hash__(self): + return id(self) + + def __str__(self): + """Stringify better than 2.x's default behaviour of ascii encoding.""" + return self.args[0] + + def __eq__(self, other): + try: + return self.args == other.args + except AttributeError: + return False + + +def _details_to_str(details): + """Convert a details dict to a string.""" + chars = [] + # sorted is for testing, may want to remove that and use a dict + # subclass with defined order for items instead. + for key, content in sorted(details.items()): + if content.content_type.type != 'text': + chars.append('Binary content: %s\n' % key) + continue + chars.append('Text attachment: %s\n' % key) + chars.append('------------\n') + chars.extend(content.iter_text()) + if not chars[-1].endswith('\n'): + chars.append('\n') + chars.append('------------\n') + return ''.join(chars) diff --git a/lib/subunit/python/testtools/tests/__init__.py b/lib/subunit/python/testtools/tests/__init__.py new file mode 100644 index 0000000000..e1d1148d5c --- /dev/null +++ b/lib/subunit/python/testtools/tests/__init__.py @@ -0,0 +1,28 @@ +# See README for copyright and licensing details. + +import unittest +from testtools.tests import ( + test_content, + test_content_type, + test_matchers, + test_runtest, + test_testtools, + test_testresult, + test_testsuite, + ) + + +def test_suite(): + suites = [] + modules = [ + test_content, + test_content_type, + test_matchers, + test_runtest, + test_testresult, + test_testsuite, + test_testtools, + ] + for module in modules: + suites.append(getattr(module, 'test_suite')()) + return unittest.TestSuite(suites) diff --git a/lib/subunit/python/testtools/tests/helpers.py b/lib/subunit/python/testtools/tests/helpers.py new file mode 100644 index 0000000000..c4cf10c736 --- /dev/null +++ b/lib/subunit/python/testtools/tests/helpers.py @@ -0,0 +1,67 @@ +# Copyright (c) 2008 Jonathan M. Lange. See LICENSE for details. + +"""Helpers for tests.""" + +import sys + +__metaclass__ = type +__all__ = [ + 'LoggingResult', + ] + +from testtools import TestResult + + +try: + raise Exception +except Exception: + an_exc_info = sys.exc_info() + +# Deprecated: This classes attributes are somewhat non deterministic which +# leads to hard to predict tests (because Python upstream are changing things. +class LoggingResult(TestResult): + """TestResult that logs its event to a list.""" + + def __init__(self, log): + self._events = log + super(LoggingResult, self).__init__() + + def startTest(self, test): + self._events.append(('startTest', test)) + super(LoggingResult, self).startTest(test) + + def stopTest(self, test): + self._events.append(('stopTest', test)) + super(LoggingResult, self).stopTest(test) + + def addFailure(self, test, error): + self._events.append(('addFailure', test, error)) + super(LoggingResult, self).addFailure(test, error) + + def addError(self, test, error): + self._events.append(('addError', test, error)) + super(LoggingResult, self).addError(test, error) + + def addSkip(self, test, reason): + self._events.append(('addSkip', test, reason)) + super(LoggingResult, self).addSkip(test, reason) + + def addSuccess(self, test): + self._events.append(('addSuccess', test)) + super(LoggingResult, self).addSuccess(test) + + def startTestRun(self): + self._events.append('startTestRun') + super(LoggingResult, self).startTestRun() + + def stopTestRun(self): + self._events.append('stopTestRun') + super(LoggingResult, self).stopTestRun() + + def done(self): + self._events.append('done') + super(LoggingResult, self).done() + +# Note, the following three classes are different to LoggingResult by +# being fully defined exact matches rather than supersets. +from testtools.testresult.doubles import * diff --git a/lib/subunit/python/testtools/tests/test_content.py b/lib/subunit/python/testtools/tests/test_content.py new file mode 100644 index 0000000000..1159362036 --- /dev/null +++ b/lib/subunit/python/testtools/tests/test_content.py @@ -0,0 +1,72 @@ +# Copyright (c) 2008 Jonathan M. Lange. See LICENSE for details. + +import unittest +from testtools.content import Content, TracebackContent +from testtools.content_type import ContentType +from testtools.utils import _u +from testtools.tests.helpers import an_exc_info + + +def test_suite(): + from unittest import TestLoader + return TestLoader().loadTestsFromName(__name__) + + +class TestContent(unittest.TestCase): + + def test___init___None_errors(self): + self.assertRaises(ValueError, Content, None, None) + self.assertRaises(ValueError, Content, None, lambda: ["traceback"]) + self.assertRaises(ValueError, Content, + ContentType("text", "traceback"), None) + + def test___init___sets_ivars(self): + content_type = ContentType("foo", "bar") + content = Content(content_type, lambda: ["bytes"]) + self.assertEqual(content_type, content.content_type) + self.assertEqual(["bytes"], list(content.iter_bytes())) + + def test___eq__(self): + content_type = ContentType("foo", "bar") + content1 = Content(content_type, lambda: ["bytes"]) + content2 = Content(content_type, lambda: ["bytes"]) + content3 = Content(content_type, lambda: ["by", "tes"]) + content4 = Content(content_type, lambda: ["by", "te"]) + content5 = Content(ContentType("f", "b"), lambda: ["by", "tes"]) + self.assertEqual(content1, content2) + self.assertEqual(content1, content3) + self.assertNotEqual(content1, content4) + self.assertNotEqual(content1, content5) + + def test_iter_text_not_text_errors(self): + content_type = ContentType("foo", "bar") + content = Content(content_type, lambda: ["bytes"]) + self.assertRaises(ValueError, content.iter_text) + + def test_iter_text_decodes(self): + content_type = ContentType("text", "strange", {"charset": "utf8"}) + content = Content( + content_type, lambda: [_u("bytes\xea").encode("utf8")]) + self.assertEqual([_u("bytes\xea")], list(content.iter_text())) + + def test_iter_text_default_charset_iso_8859_1(self): + content_type = ContentType("text", "strange") + text = _u("bytes\xea") + iso_version = text.encode("ISO-8859-1") + content = Content(content_type, lambda: [iso_version]) + self.assertEqual([text], list(content.iter_text())) + + +class TestTracebackContent(unittest.TestCase): + + def test___init___None_errors(self): + self.assertRaises(ValueError, TracebackContent, None, None) + + def test___init___sets_ivars(self): + content = TracebackContent(an_exc_info, self) + content_type = ContentType("text", "x-traceback", + {"language": "python", "charset": "utf8"}) + self.assertEqual(content_type, content.content_type) + result = unittest.TestResult() + expected = result._exc_info_to_string(an_exc_info, self) + self.assertEqual(expected, ''.join(list(content.iter_text()))) diff --git a/lib/subunit/python/testtools/tests/test_content_type.py b/lib/subunit/python/testtools/tests/test_content_type.py new file mode 100644 index 0000000000..dbefc21dec --- /dev/null +++ b/lib/subunit/python/testtools/tests/test_content_type.py @@ -0,0 +1,34 @@ +# Copyright (c) 2008 Jonathan M. Lange. See LICENSE for details. + +import unittest +from testtools.content_type import ContentType + + +def test_suite(): + from unittest import TestLoader + return TestLoader().loadTestsFromName(__name__) + + +class TestContentType(unittest.TestCase): + + def test___init___None_errors(self): + self.assertRaises(ValueError, ContentType, None, None) + self.assertRaises(ValueError, ContentType, None, "traceback") + self.assertRaises(ValueError, ContentType, "text", None) + + def test___init___sets_ivars(self): + content_type = ContentType("foo", "bar") + self.assertEqual("foo", content_type.type) + self.assertEqual("bar", content_type.subtype) + self.assertEqual({}, content_type.parameters) + + def test___init___with_parameters(self): + content_type = ContentType("foo", "bar", {"quux":"thing"}) + self.assertEqual({"quux":"thing"}, content_type.parameters) + + def test___eq__(self): + content_type1 = ContentType("foo", "bar", {"quux":"thing"}) + content_type2 = ContentType("foo", "bar", {"quux":"thing"}) + content_type3 = ContentType("foo", "bar", {"quux":"thing2"}) + self.assertTrue(content_type1.__eq__(content_type2)) + self.assertFalse(content_type1.__eq__(content_type3)) diff --git a/lib/subunit/python/testtools/tests/test_matchers.py b/lib/subunit/python/testtools/tests/test_matchers.py new file mode 100644 index 0000000000..a9f4b245eb --- /dev/null +++ b/lib/subunit/python/testtools/tests/test_matchers.py @@ -0,0 +1,113 @@ +# Copyright (c) 2008 Jonathan M. Lange. See LICENSE for details. + +"""Tests for matchers.""" + +import doctest + +from testtools import ( + Matcher, # check that Matcher is exposed at the top level for docs. + TestCase, + ) +from testtools.matchers import ( + Equals, + DocTestMatches, + MatchesAny, + ) + + +class TestMatchersInterface: + + def test_matches_match(self): + matcher = self.matches_matcher + matches = self.matches_matches + mismatches = self.matches_mismatches + for candidate in matches: + self.assertEqual(None, matcher.match(candidate)) + for candidate in mismatches: + mismatch = matcher.match(candidate) + self.assertNotEqual(None, mismatch) + self.assertNotEqual(None, getattr(mismatch, 'describe', None)) + + def test__str__(self): + # [(expected, object to __str__)]. + examples = self.str_examples + for expected, matcher in examples: + self.assertThat(matcher, DocTestMatches(expected)) + + def test_describe_difference(self): + # [(expected, matchee, matcher), ...] + examples = self.describe_examples + for difference, matchee, matcher in examples: + mismatch = matcher.match(matchee) + self.assertEqual(difference, mismatch.describe()) + + +class TestDocTestMatchesInterface(TestCase, TestMatchersInterface): + + matches_matcher = DocTestMatches("Ran 1 test in ...s", doctest.ELLIPSIS) + matches_matches = ["Ran 1 test in 0.000s", "Ran 1 test in 1.234s"] + matches_mismatches = ["Ran 1 tests in 0.000s", "Ran 2 test in 0.000s"] + + str_examples = [("DocTestMatches('Ran 1 test in ...s\\n')", + DocTestMatches("Ran 1 test in ...s")), + ("DocTestMatches('foo\\n', flags=8)", DocTestMatches("foo", flags=8)), + ] + + describe_examples = [('Expected:\n Ran 1 tests in ...s\nGot:\n' + ' Ran 1 test in 0.123s\n', "Ran 1 test in 0.123s", + DocTestMatches("Ran 1 tests in ...s", doctest.ELLIPSIS))] + + +class TestDocTestMatchesSpecific(TestCase): + + def test___init__simple(self): + matcher = DocTestMatches("foo") + self.assertEqual("foo\n", matcher.want) + + def test___init__flags(self): + matcher = DocTestMatches("bar\n", doctest.ELLIPSIS) + self.assertEqual("bar\n", matcher.want) + self.assertEqual(doctest.ELLIPSIS, matcher.flags) + + +class TestEqualsInterface(TestCase, TestMatchersInterface): + + matches_matcher = Equals(1) + matches_matches = [1] + matches_mismatches = [2] + + str_examples = [("Equals(1)", Equals(1)), ("Equals('1')", Equals('1'))] + + describe_examples = [("1 != 2", 2, Equals(1))] + + +class TestMatchersAnyInterface(TestCase, TestMatchersInterface): + + matches_matcher = MatchesAny(DocTestMatches("1"), DocTestMatches("2")) + matches_matches = ["1", "2"] + matches_mismatches = ["3"] + + str_examples = [( + "MatchesAny(DocTestMatches('1\\n'), DocTestMatches('2\\n'))", + MatchesAny(DocTestMatches("1"), DocTestMatches("2"))), + ] + + describe_examples = [("""Differences: [ +Expected: + 1 +Got: + 3 + +Expected: + 2 +Got: + 3 + +] +""", + "3", MatchesAny(DocTestMatches("1"), DocTestMatches("2")))] + + +def test_suite(): + from unittest import TestLoader + return TestLoader().loadTestsFromName(__name__) diff --git a/lib/subunit/python/testtools/tests/test_runtest.py b/lib/subunit/python/testtools/tests/test_runtest.py new file mode 100644 index 0000000000..5c46ad1784 --- /dev/null +++ b/lib/subunit/python/testtools/tests/test_runtest.py @@ -0,0 +1,185 @@ +# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details. + +"""Tests for the RunTest single test execution logic.""" + +from testtools import ( + ExtendedToOriginalDecorator, + RunTest, + TestCase, + TestResult, + ) +from testtools.tests.helpers import ExtendedTestResult + + +class TestRunTest(TestCase): + + def make_case(self): + class Case(TestCase): + def test(self): + pass + return Case('test') + + def test___init___short(self): + run = RunTest("bar") + self.assertEqual("bar", run.case) + self.assertEqual([], run.handlers) + + def test__init____handlers(self): + handlers = [("quux", "baz")] + run = RunTest("bar", handlers) + self.assertEqual(handlers, run.handlers) + + def test_run_with_result(self): + # test.run passes result down to _run_test_method. + log = [] + class Case(TestCase): + def _run_test_method(self, result): + log.append(result) + case = Case('_run_test_method') + run = RunTest(case, lambda x: log.append(x)) + result = TestResult() + run.run(result) + self.assertEqual(1, len(log)) + self.assertEqual(result, log[0].decorated) + + def test_run_no_result_manages_new_result(self): + log = [] + run = RunTest(self.make_case(), lambda x: log.append(x) or x) + result = run.run() + self.assertIsInstance(result.decorated, TestResult) + + def test__run_core_called(self): + case = self.make_case() + log = [] + run = RunTest(case, lambda x: x) + run._run_core = lambda: log.append('foo') + run.run() + self.assertEqual(['foo'], log) + + def test__run_user_does_not_catch_keyboard(self): + case = self.make_case() + def raises(): + raise KeyboardInterrupt("yo") + run = RunTest(case, None) + run.result = ExtendedTestResult() + self.assertRaises(KeyboardInterrupt, run._run_user, raises) + self.assertEqual([], run.result._events) + + def test__run_user_calls_onException(self): + case = self.make_case() + log = [] + def handler(exc_info): + log.append("got it") + self.assertEqual(3, len(exc_info)) + self.assertIsInstance(exc_info[1], KeyError) + self.assertIs(KeyError, exc_info[0]) + case.addOnException(handler) + e = KeyError('Yo') + def raises(): + raise e + def log_exc(self, result, err): + log.append((result, err)) + run = RunTest(case, [(KeyError, log_exc)]) + run.result = ExtendedTestResult() + status = run._run_user(raises) + self.assertEqual(run.exception_caught, status) + self.assertEqual([], run.result._events) + self.assertEqual(["got it", (run.result, e)], log) + + def test__run_user_can_catch_Exception(self): + case = self.make_case() + e = Exception('Yo') + def raises(): + raise e + log = [] + def log_exc(self, result, err): + log.append((result, err)) + run = RunTest(case, [(Exception, log_exc)]) + run.result = ExtendedTestResult() + status = run._run_user(raises) + self.assertEqual(run.exception_caught, status) + self.assertEqual([], run.result._events) + self.assertEqual([(run.result, e)], log) + + def test__run_user_uncaught_Exception_raised(self): + case = self.make_case() + e = KeyError('Yo') + def raises(): + raise e + log = [] + def log_exc(self, result, err): + log.append((result, err)) + run = RunTest(case, [(ValueError, log_exc)]) + run.result = ExtendedTestResult() + self.assertRaises(KeyError, run._run_user, raises) + self.assertEqual([], run.result._events) + self.assertEqual([], log) + + def test__run_user_uncaught_Exception_from_exception_handler_raised(self): + case = self.make_case() + def broken_handler(exc_info): + # ValueError because thats what we know how to catch - and must + # not. + raise ValueError('boo') + case.addOnException(broken_handler) + e = KeyError('Yo') + def raises(): + raise e + log = [] + def log_exc(self, result, err): + log.append((result, err)) + run = RunTest(case, [(ValueError, log_exc)]) + run.result = ExtendedTestResult() + self.assertRaises(ValueError, run._run_user, raises) + self.assertEqual([], run.result._events) + self.assertEqual([], log) + + def test__run_user_returns_result(self): + case = self.make_case() + def returns(): + return 1 + run = RunTest(case) + run.result = ExtendedTestResult() + self.assertEqual(1, run._run_user(returns)) + self.assertEqual([], run.result._events) + + def test__run_one_decorates_result(self): + log = [] + class Run(RunTest): + def _run_prepared_result(self, result): + log.append(result) + return result + run = Run(self.make_case(), lambda x: x) + result = run._run_one('foo') + self.assertEqual([result], log) + self.assertIsInstance(log[0], ExtendedToOriginalDecorator) + self.assertEqual('foo', result.decorated) + + def test__run_prepared_result_calls_start_and_stop_test(self): + result = ExtendedTestResult() + case = self.make_case() + run = RunTest(case, lambda x: x) + run.run(result) + self.assertEqual([ + ('startTest', case), + ('addSuccess', case), + ('stopTest', case), + ], result._events) + + def test__run_prepared_result_calls_stop_test_always(self): + result = ExtendedTestResult() + case = self.make_case() + def inner(): + raise Exception("foo") + run = RunTest(case, lambda x: x) + run._run_core = inner + self.assertRaises(Exception, run.run, result) + self.assertEqual([ + ('startTest', case), + ('stopTest', case), + ], result._events) + + +def test_suite(): + from unittest import TestLoader + return TestLoader().loadTestsFromName(__name__) diff --git a/lib/subunit/python/testtools/tests/test_testresult.py b/lib/subunit/python/testtools/tests/test_testresult.py new file mode 100644 index 0000000000..df15b91244 --- /dev/null +++ b/lib/subunit/python/testtools/tests/test_testresult.py @@ -0,0 +1,807 @@ +# Copyright (c) 2008 Jonathan M. Lange. See LICENSE for details. + +"""Test TestResults and related things.""" + +__metaclass__ = type + +import datetime +try: + from cStringIO import StringIO +except ImportError: + from io import StringIO +import doctest +import sys +import threading + +from testtools import ( + ExtendedToOriginalDecorator, + MultiTestResult, + TestCase, + TestResult, + TextTestResult, + ThreadsafeForwardingResult, + testresult, + ) +from testtools.content import Content, ContentType +from testtools.matchers import DocTestMatches +from testtools.utils import _u, _b +from testtools.tests.helpers import ( + LoggingResult, + Python26TestResult, + Python27TestResult, + ExtendedTestResult, + an_exc_info + ) + + +class TestTestResultContract(TestCase): + """Tests for the contract of TestResults.""" + + def test_addExpectedFailure(self): + # Calling addExpectedFailure(test, exc_info) completes ok. + result = self.makeResult() + result.addExpectedFailure(self, an_exc_info) + + def test_addExpectedFailure_details(self): + # Calling addExpectedFailure(test, details=xxx) completes ok. + result = self.makeResult() + result.addExpectedFailure(self, details={}) + + def test_addError_details(self): + # Calling addError(test, details=xxx) completes ok. + result = self.makeResult() + result.addError(self, details={}) + + def test_addFailure_details(self): + # Calling addFailure(test, details=xxx) completes ok. + result = self.makeResult() + result.addFailure(self, details={}) + + def test_addSkipped(self): + # Calling addSkip(test, reason) completes ok. + result = self.makeResult() + result.addSkip(self, _u("Skipped for some reason")) + + def test_addSkipped_details(self): + # Calling addSkip(test, reason) completes ok. + result = self.makeResult() + result.addSkip(self, details={}) + + def test_addUnexpectedSuccess(self): + # Calling addUnexpectedSuccess(test) completes ok. + result = self.makeResult() + result.addUnexpectedSuccess(self) + + def test_addUnexpectedSuccess_details(self): + # Calling addUnexpectedSuccess(test) completes ok. + result = self.makeResult() + result.addUnexpectedSuccess(self, details={}) + + def test_addSuccess_details(self): + # Calling addSuccess(test) completes ok. + result = self.makeResult() + result.addSuccess(self, details={}) + + def test_startStopTestRun(self): + # Calling startTestRun completes ok. + result = self.makeResult() + result.startTestRun() + result.stopTestRun() + + +class TestTestResultContract(TestTestResultContract): + + def makeResult(self): + return TestResult() + + +class TestMultiTestresultContract(TestTestResultContract): + + def makeResult(self): + return MultiTestResult(TestResult(), TestResult()) + + +class TestTextTestResultContract(TestTestResultContract): + + def makeResult(self): + return TextTestResult(StringIO()) + + +class TestThreadSafeForwardingResultContract(TestTestResultContract): + + def makeResult(self): + result_semaphore = threading.Semaphore(1) + target = TestResult() + return ThreadsafeForwardingResult(target, result_semaphore) + + +class TestTestResult(TestCase): + """Tests for `TestResult`.""" + + def makeResult(self): + """Make an arbitrary result for testing.""" + return TestResult() + + def test_addSkipped(self): + # Calling addSkip on a TestResult records the test that was skipped in + # its skip_reasons dict. + result = self.makeResult() + result.addSkip(self, _u("Skipped for some reason")) + self.assertEqual({_u("Skipped for some reason"):[self]}, + result.skip_reasons) + result.addSkip(self, _u("Skipped for some reason")) + self.assertEqual({_u("Skipped for some reason"):[self, self]}, + result.skip_reasons) + result.addSkip(self, _u("Skipped for another reason")) + self.assertEqual({_u("Skipped for some reason"):[self, self], + _u("Skipped for another reason"):[self]}, + result.skip_reasons) + + def test_now_datetime_now(self): + result = self.makeResult() + olddatetime = testresult.real.datetime + def restore(): + testresult.real.datetime = olddatetime + self.addCleanup(restore) + class Module: + pass + now = datetime.datetime.now() + stubdatetime = Module() + stubdatetime.datetime = Module() + stubdatetime.datetime.now = lambda: now + testresult.real.datetime = stubdatetime + # Calling _now() looks up the time. + self.assertEqual(now, result._now()) + then = now + datetime.timedelta(0, 1) + # Set an explicit datetime, which gets returned from then on. + result.time(then) + self.assertNotEqual(now, result._now()) + self.assertEqual(then, result._now()) + # go back to looking it up. + result.time(None) + self.assertEqual(now, result._now()) + + def test_now_datetime_time(self): + result = self.makeResult() + now = datetime.datetime.now() + result.time(now) + self.assertEqual(now, result._now()) + + +class TestWithFakeExceptions(TestCase): + + def makeExceptionInfo(self, exceptionFactory, *args, **kwargs): + try: + raise exceptionFactory(*args, **kwargs) + except: + return sys.exc_info() + + +class TestMultiTestResult(TestWithFakeExceptions): + """Tests for `MultiTestResult`.""" + + def setUp(self): + TestWithFakeExceptions.setUp(self) + self.result1 = LoggingResult([]) + self.result2 = LoggingResult([]) + self.multiResult = MultiTestResult(self.result1, self.result2) + + def assertResultLogsEqual(self, expectedEvents): + """Assert that our test results have received the expected events.""" + self.assertEqual(expectedEvents, self.result1._events) + self.assertEqual(expectedEvents, self.result2._events) + + def test_empty(self): + # Initializing a `MultiTestResult` doesn't do anything to its + # `TestResult`s. + self.assertResultLogsEqual([]) + + def test_startTest(self): + # Calling `startTest` on a `MultiTestResult` calls `startTest` on all + # its `TestResult`s. + self.multiResult.startTest(self) + self.assertResultLogsEqual([('startTest', self)]) + + def test_stopTest(self): + # Calling `stopTest` on a `MultiTestResult` calls `stopTest` on all + # its `TestResult`s. + self.multiResult.stopTest(self) + self.assertResultLogsEqual([('stopTest', self)]) + + def test_addSkipped(self): + # Calling `addSkip` on a `MultiTestResult` calls addSkip on its + # results. + reason = _u("Skipped for some reason") + self.multiResult.addSkip(self, reason) + self.assertResultLogsEqual([('addSkip', self, reason)]) + + def test_addSuccess(self): + # Calling `addSuccess` on a `MultiTestResult` calls `addSuccess` on + # all its `TestResult`s. + self.multiResult.addSuccess(self) + self.assertResultLogsEqual([('addSuccess', self)]) + + def test_done(self): + # Calling `done` on a `MultiTestResult` calls `done` on all its + # `TestResult`s. + self.multiResult.done() + self.assertResultLogsEqual([('done')]) + + def test_addFailure(self): + # Calling `addFailure` on a `MultiTestResult` calls `addFailure` on + # all its `TestResult`s. + exc_info = self.makeExceptionInfo(AssertionError, 'failure') + self.multiResult.addFailure(self, exc_info) + self.assertResultLogsEqual([('addFailure', self, exc_info)]) + + def test_addError(self): + # Calling `addError` on a `MultiTestResult` calls `addError` on all + # its `TestResult`s. + exc_info = self.makeExceptionInfo(RuntimeError, 'error') + self.multiResult.addError(self, exc_info) + self.assertResultLogsEqual([('addError', self, exc_info)]) + + def test_startTestRun(self): + # Calling `startTestRun` on a `MultiTestResult` forwards to all its + # `TestResult`s. + self.multiResult.startTestRun() + self.assertResultLogsEqual([('startTestRun')]) + + def test_stopTestRun(self): + # Calling `stopTestRun` on a `MultiTestResult` forwards to all its + # `TestResult`s. + self.multiResult.stopTestRun() + self.assertResultLogsEqual([('stopTestRun')]) + + +class TestTextTestResult(TestWithFakeExceptions): + """Tests for `TextTestResult`.""" + + def setUp(self): + super(TestTextTestResult, self).setUp() + self.result = TextTestResult(StringIO()) + + def make_erroring_test(self): + class Test(TestCase): + def error(self): + 1/0 + return Test("error") + + def make_failing_test(self): + class Test(TestCase): + def failed(self): + self.fail("yo!") + return Test("failed") + + def make_test(self): + class Test(TestCase): + def test(self): + pass + return Test("test") + + def getvalue(self): + return self.result.stream.getvalue() + + def test__init_sets_stream(self): + result = TextTestResult("fp") + self.assertEqual("fp", result.stream) + + def reset_output(self): + self.result.stream = StringIO() + + def test_startTestRun(self): + self.result.startTestRun() + self.assertEqual("Tests running...\n", self.getvalue()) + + def test_stopTestRun_count_many(self): + test = self.make_test() + self.result.startTestRun() + self.result.startTest(test) + self.result.stopTest(test) + self.result.startTest(test) + self.result.stopTest(test) + self.result.stream = StringIO() + self.result.stopTestRun() + self.assertThat(self.getvalue(), + DocTestMatches("Ran 2 tests in ...s\n...", doctest.ELLIPSIS)) + + def test_stopTestRun_count_single(self): + test = self.make_test() + self.result.startTestRun() + self.result.startTest(test) + self.result.stopTest(test) + self.reset_output() + self.result.stopTestRun() + self.assertThat(self.getvalue(), + DocTestMatches("Ran 1 test in ...s\n\nOK\n", doctest.ELLIPSIS)) + + def test_stopTestRun_count_zero(self): + self.result.startTestRun() + self.reset_output() + self.result.stopTestRun() + self.assertThat(self.getvalue(), + DocTestMatches("Ran 0 tests in ...s\n\nOK\n", doctest.ELLIPSIS)) + + def test_stopTestRun_current_time(self): + test = self.make_test() + now = datetime.datetime.now() + self.result.time(now) + self.result.startTestRun() + self.result.startTest(test) + now = now + datetime.timedelta(0, 0, 0, 1) + self.result.time(now) + self.result.stopTest(test) + self.reset_output() + self.result.stopTestRun() + self.assertThat(self.getvalue(), + DocTestMatches("... in 0.001s\n...", doctest.ELLIPSIS)) + + def test_stopTestRun_successful(self): + self.result.startTestRun() + self.result.stopTestRun() + self.assertThat(self.getvalue(), + DocTestMatches("...\n\nOK\n", doctest.ELLIPSIS)) + + def test_stopTestRun_not_successful_failure(self): + test = self.make_failing_test() + self.result.startTestRun() + test.run(self.result) + self.result.stopTestRun() + self.assertThat(self.getvalue(), + DocTestMatches("...\n\nFAILED (failures=1)\n", doctest.ELLIPSIS)) + + def test_stopTestRun_not_successful_error(self): + test = self.make_erroring_test() + self.result.startTestRun() + test.run(self.result) + self.result.stopTestRun() + self.assertThat(self.getvalue(), + DocTestMatches("...\n\nFAILED (failures=1)\n", doctest.ELLIPSIS)) + + def test_stopTestRun_shows_details(self): + self.result.startTestRun() + self.make_erroring_test().run(self.result) + self.make_failing_test().run(self.result) + self.reset_output() + self.result.stopTestRun() + self.assertThat(self.getvalue(), + DocTestMatches("""...====================================================================== +ERROR: testtools.tests.test_testresult.Test.error +---------------------------------------------------------------------- +Text attachment: traceback +------------ +Traceback (most recent call last): + File "...testtools...runtest.py", line ..., in _run_user... + return fn(*args) + File "...testtools...testcase.py", line ..., in _run_test_method + testMethod() + File "...testtools...tests...test_testresult.py", line ..., in error + 1/0 +ZeroDivisionError: int... division or modulo by zero +------------ +====================================================================== +FAIL: testtools.tests.test_testresult.Test.failed +---------------------------------------------------------------------- +Text attachment: traceback +------------ +Traceback (most recent call last): + File "...testtools...runtest.py", line ..., in _run_user... + return fn(*args) + File "...testtools...testcase.py", line ..., in _run_test_method + testMethod() + File "...testtools...tests...test_testresult.py", line ..., in failed + self.fail("yo!") +AssertionError: yo! +------------ +...""", doctest.ELLIPSIS)) + + +class TestThreadSafeForwardingResult(TestWithFakeExceptions): + """Tests for `MultiTestResult`.""" + + def setUp(self): + TestWithFakeExceptions.setUp(self) + self.result_semaphore = threading.Semaphore(1) + self.target = LoggingResult([]) + self.result1 = ThreadsafeForwardingResult(self.target, + self.result_semaphore) + + def test_nonforwarding_methods(self): + # startTest and stopTest are not forwarded because they need to be + # batched. + self.result1.startTest(self) + self.result1.stopTest(self) + self.assertEqual([], self.target._events) + + def test_startTestRun(self): + self.result1.startTestRun() + self.result2 = ThreadsafeForwardingResult(self.target, + self.result_semaphore) + self.result2.startTestRun() + self.assertEqual(["startTestRun", "startTestRun"], self.target._events) + + def test_stopTestRun(self): + self.result1.stopTestRun() + self.result2 = ThreadsafeForwardingResult(self.target, + self.result_semaphore) + self.result2.stopTestRun() + self.assertEqual(["stopTestRun", "stopTestRun"], self.target._events) + + def test_forwarding_methods(self): + # error, failure, skip and success are forwarded in batches. + exc_info1 = self.makeExceptionInfo(RuntimeError, 'error') + self.result1.addError(self, exc_info1) + exc_info2 = self.makeExceptionInfo(AssertionError, 'failure') + self.result1.addFailure(self, exc_info2) + reason = _u("Skipped for some reason") + self.result1.addSkip(self, reason) + self.result1.addSuccess(self) + self.assertEqual([('startTest', self), + ('addError', self, exc_info1), + ('stopTest', self), + ('startTest', self), + ('addFailure', self, exc_info2), + ('stopTest', self), + ('startTest', self), + ('addSkip', self, reason), + ('stopTest', self), + ('startTest', self), + ('addSuccess', self), + ('stopTest', self), + ], self.target._events) + + +class TestExtendedToOriginalResultDecoratorBase(TestCase): + + def make_26_result(self): + self.result = Python26TestResult() + self.make_converter() + + def make_27_result(self): + self.result = Python27TestResult() + self.make_converter() + + def make_converter(self): + self.converter = ExtendedToOriginalDecorator(self.result) + + def make_extended_result(self): + self.result = ExtendedTestResult() + self.make_converter() + + def check_outcome_details(self, outcome): + """Call an outcome with a details dict to be passed through.""" + # This dict is /not/ convertible - thats deliberate, as it should + # not hit the conversion code path. + details = {'foo': 'bar'} + getattr(self.converter, outcome)(self, details=details) + self.assertEqual([(outcome, self, details)], self.result._events) + + def get_details_and_string(self): + """Get a details dict and expected string.""" + text1 = lambda: [_b("1\n2\n")] + text2 = lambda: [_b("3\n4\n")] + bin1 = lambda: [_b("5\n")] + details = {'text 1': Content(ContentType('text', 'plain'), text1), + 'text 2': Content(ContentType('text', 'strange'), text2), + 'bin 1': Content(ContentType('application', 'binary'), bin1)} + return (details, "Binary content: bin 1\n" + "Text attachment: text 1\n------------\n1\n2\n" + "------------\nText attachment: text 2\n------------\n" + "3\n4\n------------\n") + + def check_outcome_details_to_exec_info(self, outcome, expected=None): + """Call an outcome with a details dict to be made into exc_info.""" + # The conversion is a done using RemoteError and the string contents + # of the text types in the details dict. + if not expected: + expected = outcome + details, err_str = self.get_details_and_string() + getattr(self.converter, outcome)(self, details=details) + err = self.converter._details_to_exc_info(details) + self.assertEqual([(expected, self, err)], self.result._events) + + def check_outcome_details_to_nothing(self, outcome, expected=None): + """Call an outcome with a details dict to be swallowed.""" + if not expected: + expected = outcome + details = {'foo': 'bar'} + getattr(self.converter, outcome)(self, details=details) + self.assertEqual([(expected, self)], self.result._events) + + def check_outcome_details_to_string(self, outcome): + """Call an outcome with a details dict to be stringified.""" + details, err_str = self.get_details_and_string() + getattr(self.converter, outcome)(self, details=details) + self.assertEqual([(outcome, self, err_str)], self.result._events) + + def check_outcome_exc_info(self, outcome, expected=None): + """Check that calling a legacy outcome still works.""" + # calling some outcome with the legacy exc_info style api (no keyword + # parameters) gets passed through. + if not expected: + expected = outcome + err = sys.exc_info() + getattr(self.converter, outcome)(self, err) + self.assertEqual([(expected, self, err)], self.result._events) + + def check_outcome_exc_info_to_nothing(self, outcome, expected=None): + """Check that calling a legacy outcome on a fallback works.""" + # calling some outcome with the legacy exc_info style api (no keyword + # parameters) gets passed through. + if not expected: + expected = outcome + err = sys.exc_info() + getattr(self.converter, outcome)(self, err) + self.assertEqual([(expected, self)], self.result._events) + + def check_outcome_nothing(self, outcome, expected=None): + """Check that calling a legacy outcome still works.""" + if not expected: + expected = outcome + getattr(self.converter, outcome)(self) + self.assertEqual([(expected, self)], self.result._events) + + def check_outcome_string_nothing(self, outcome, expected): + """Check that calling outcome with a string calls expected.""" + getattr(self.converter, outcome)(self, "foo") + self.assertEqual([(expected, self)], self.result._events) + + def check_outcome_string(self, outcome): + """Check that calling outcome with a string works.""" + getattr(self.converter, outcome)(self, "foo") + self.assertEqual([(outcome, self, "foo")], self.result._events) + + +class TestExtendedToOriginalResultDecorator( + TestExtendedToOriginalResultDecoratorBase): + + def test_progress_py26(self): + self.make_26_result() + self.converter.progress(1, 2) + + def test_progress_py27(self): + self.make_27_result() + self.converter.progress(1, 2) + + def test_progress_pyextended(self): + self.make_extended_result() + self.converter.progress(1, 2) + self.assertEqual([('progress', 1, 2)], self.result._events) + + def test_shouldStop(self): + self.make_26_result() + self.assertEqual(False, self.converter.shouldStop) + self.converter.decorated.stop() + self.assertEqual(True, self.converter.shouldStop) + + def test_startTest_py26(self): + self.make_26_result() + self.converter.startTest(self) + self.assertEqual([('startTest', self)], self.result._events) + + def test_startTest_py27(self): + self.make_27_result() + self.converter.startTest(self) + self.assertEqual([('startTest', self)], self.result._events) + + def test_startTest_pyextended(self): + self.make_extended_result() + self.converter.startTest(self) + self.assertEqual([('startTest', self)], self.result._events) + + def test_startTestRun_py26(self): + self.make_26_result() + self.converter.startTestRun() + self.assertEqual([], self.result._events) + + def test_startTestRun_py27(self): + self.make_27_result() + self.converter.startTestRun() + self.assertEqual([('startTestRun',)], self.result._events) + + def test_startTestRun_pyextended(self): + self.make_extended_result() + self.converter.startTestRun() + self.assertEqual([('startTestRun',)], self.result._events) + + def test_stopTest_py26(self): + self.make_26_result() + self.converter.stopTest(self) + self.assertEqual([('stopTest', self)], self.result._events) + + def test_stopTest_py27(self): + self.make_27_result() + self.converter.stopTest(self) + self.assertEqual([('stopTest', self)], self.result._events) + + def test_stopTest_pyextended(self): + self.make_extended_result() + self.converter.stopTest(self) + self.assertEqual([('stopTest', self)], self.result._events) + + def test_stopTestRun_py26(self): + self.make_26_result() + self.converter.stopTestRun() + self.assertEqual([], self.result._events) + + def test_stopTestRun_py27(self): + self.make_27_result() + self.converter.stopTestRun() + self.assertEqual([('stopTestRun',)], self.result._events) + + def test_stopTestRun_pyextended(self): + self.make_extended_result() + self.converter.stopTestRun() + self.assertEqual([('stopTestRun',)], self.result._events) + + def test_tags_py26(self): + self.make_26_result() + self.converter.tags(1, 2) + + def test_tags_py27(self): + self.make_27_result() + self.converter.tags(1, 2) + + def test_tags_pyextended(self): + self.make_extended_result() + self.converter.tags(1, 2) + self.assertEqual([('tags', 1, 2)], self.result._events) + + def test_time_py26(self): + self.make_26_result() + self.converter.time(1) + + def test_time_py27(self): + self.make_27_result() + self.converter.time(1) + + def test_time_pyextended(self): + self.make_extended_result() + self.converter.time(1) + self.assertEqual([('time', 1)], self.result._events) + + +class TestExtendedToOriginalAddError(TestExtendedToOriginalResultDecoratorBase): + + outcome = 'addError' + + def test_outcome_Original_py26(self): + self.make_26_result() + self.check_outcome_exc_info(self.outcome) + + def test_outcome_Original_py27(self): + self.make_27_result() + self.check_outcome_exc_info(self.outcome) + + def test_outcome_Original_pyextended(self): + self.make_extended_result() + self.check_outcome_exc_info(self.outcome) + + def test_outcome_Extended_py26(self): + self.make_26_result() + self.check_outcome_details_to_exec_info(self.outcome) + + def test_outcome_Extended_py27(self): + self.make_27_result() + self.check_outcome_details_to_exec_info(self.outcome) + + def test_outcome_Extended_pyextended(self): + self.make_extended_result() + self.check_outcome_details(self.outcome) + + def test_outcome__no_details(self): + self.make_extended_result() + self.assertRaises(ValueError, + getattr(self.converter, self.outcome), self) + + +class TestExtendedToOriginalAddFailure( + TestExtendedToOriginalAddError): + + outcome = 'addFailure' + + +class TestExtendedToOriginalAddExpectedFailure( + TestExtendedToOriginalAddError): + + outcome = 'addExpectedFailure' + + def test_outcome_Original_py26(self): + self.make_26_result() + self.check_outcome_exc_info_to_nothing(self.outcome, 'addSuccess') + + def test_outcome_Extended_py26(self): + self.make_26_result() + self.check_outcome_details_to_nothing(self.outcome, 'addSuccess') + + + +class TestExtendedToOriginalAddSkip( + TestExtendedToOriginalResultDecoratorBase): + + outcome = 'addSkip' + + def test_outcome_Original_py26(self): + self.make_26_result() + self.check_outcome_string_nothing(self.outcome, 'addSuccess') + + def test_outcome_Original_py27(self): + self.make_27_result() + self.check_outcome_string(self.outcome) + + def test_outcome_Original_pyextended(self): + self.make_extended_result() + self.check_outcome_string(self.outcome) + + def test_outcome_Extended_py26(self): + self.make_26_result() + self.check_outcome_string_nothing(self.outcome, 'addSuccess') + + def test_outcome_Extended_py27(self): + self.make_27_result() + self.check_outcome_details_to_string(self.outcome) + + def test_outcome_Extended_pyextended(self): + self.make_extended_result() + self.check_outcome_details(self.outcome) + + def test_outcome__no_details(self): + self.make_extended_result() + self.assertRaises(ValueError, + getattr(self.converter, self.outcome), self) + + +class TestExtendedToOriginalAddSuccess( + TestExtendedToOriginalResultDecoratorBase): + + outcome = 'addSuccess' + expected = 'addSuccess' + + def test_outcome_Original_py26(self): + self.make_26_result() + self.check_outcome_nothing(self.outcome, self.expected) + + def test_outcome_Original_py27(self): + self.make_27_result() + self.check_outcome_nothing(self.outcome) + + def test_outcome_Original_pyextended(self): + self.make_extended_result() + self.check_outcome_nothing(self.outcome) + + def test_outcome_Extended_py26(self): + self.make_26_result() + self.check_outcome_details_to_nothing(self.outcome, self.expected) + + def test_outcome_Extended_py27(self): + self.make_27_result() + self.check_outcome_details_to_nothing(self.outcome) + + def test_outcome_Extended_pyextended(self): + self.make_extended_result() + self.check_outcome_details(self.outcome) + + +class TestExtendedToOriginalAddUnexpectedSuccess( + TestExtendedToOriginalAddSuccess): + + outcome = 'addUnexpectedSuccess' + + +class TestExtendedToOriginalResultOtherAttributes( + TestExtendedToOriginalResultDecoratorBase): + + def test_other_attribute(self): + class OtherExtendedResult: + def foo(self): + return 2 + bar = 1 + self.result = OtherExtendedResult() + self.make_converter() + self.assertEqual(1, self.converter.bar) + self.assertEqual(2, self.converter.foo()) + + +def test_suite(): + from unittest import TestLoader + return TestLoader().loadTestsFromName(__name__) diff --git a/lib/subunit/python/testtools/tests/test_testsuite.py b/lib/subunit/python/testtools/tests/test_testsuite.py new file mode 100644 index 0000000000..3f2f02758f --- /dev/null +++ b/lib/subunit/python/testtools/tests/test_testsuite.py @@ -0,0 +1,56 @@ +# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details. + +"""Test ConcurrentTestSuite and related things.""" + +__metaclass__ = type + +import unittest + +from testtools import ( + ConcurrentTestSuite, + iterate_tests, + TestCase, + ) +from testtools.matchers import ( + Equals, + ) +from testtools.tests.helpers import LoggingResult + + +class TestConcurrentTestSuiteRun(TestCase): + + def test_trivial(self): + log = [] + result = LoggingResult(log) + class Sample(TestCase): + def __hash__(self): + return id(self) + + def test_method1(self): + pass + def test_method2(self): + pass + test1 = Sample('test_method1') + test2 = Sample('test_method2') + original_suite = unittest.TestSuite([test1, test2]) + suite = ConcurrentTestSuite(original_suite, self.split_suite) + suite.run(result) + test1 = log[0][1] + test2 = log[-1][1] + self.assertIsInstance(test1, Sample) + self.assertIsInstance(test2, Sample) + self.assertNotEqual(test1.id(), test2.id()) + # We expect the start/outcome/stop to be grouped + expected = [('startTest', test1), ('addSuccess', test1), + ('stopTest', test1), ('startTest', test2), ('addSuccess', test2), + ('stopTest', test2)] + self.assertThat(log, Equals(expected)) + + def split_suite(self, suite): + tests = list(iterate_tests(suite)) + return tests[0], tests[1] + + +def test_suite(): + from unittest import TestLoader + return TestLoader().loadTestsFromName(__name__) diff --git a/lib/subunit/python/testtools/tests/test_testtools.py b/lib/subunit/python/testtools/tests/test_testtools.py new file mode 100644 index 0000000000..8cd90de6fe --- /dev/null +++ b/lib/subunit/python/testtools/tests/test_testtools.py @@ -0,0 +1,743 @@ +# Copyright (c) 2008 Jonathan M. Lange. See LICENSE for details. + +"""Tests for extensions to the base test library.""" + +import sys +import unittest + +from testtools import ( + TestCase, + clone_test_with_new_id, + content, + skip, + skipIf, + skipUnless, + testcase, + ) +from testtools.matchers import ( + Equals, + ) +from testtools.tests.helpers import ( + an_exc_info, + LoggingResult, + Python26TestResult, + Python27TestResult, + ExtendedTestResult, + ) + + +class TestEquality(TestCase): + """Test `TestCase`'s equality implementation.""" + + def test_identicalIsEqual(self): + # TestCase's are equal if they are identical. + self.assertEqual(self, self) + + def test_nonIdenticalInUnequal(self): + # TestCase's are not equal if they are not identical. + self.assertNotEqual(TestCase(methodName='run'), + TestCase(methodName='skip')) + + +class TestAssertions(TestCase): + """Test assertions in TestCase.""" + + def raiseError(self, exceptionFactory, *args, **kwargs): + raise exceptionFactory(*args, **kwargs) + + def test_formatTypes_single(self): + # Given a single class, _formatTypes returns the name. + class Foo: + pass + self.assertEqual('Foo', self._formatTypes(Foo)) + + def test_formatTypes_multiple(self): + # Given multiple types, _formatTypes returns the names joined by + # commas. + class Foo: + pass + class Bar: + pass + self.assertEqual('Foo, Bar', self._formatTypes([Foo, Bar])) + + def test_assertRaises(self): + # assertRaises asserts that a callable raises a particular exception. + self.assertRaises(RuntimeError, self.raiseError, RuntimeError) + + def test_assertRaises_fails_when_no_error_raised(self): + # assertRaises raises self.failureException when it's passed a + # callable that raises no error. + ret = ('orange', 42) + try: + self.assertRaises(RuntimeError, lambda: ret) + except self.failureException: + # We expected assertRaises to raise this exception. + e = sys.exc_info()[1] + self.assertEqual( + '%s not raised, %r returned instead.' + % (self._formatTypes(RuntimeError), ret), str(e)) + else: + self.fail('Expected assertRaises to fail, but it did not.') + + def test_assertRaises_fails_when_different_error_raised(self): + # assertRaises re-raises an exception that it didn't expect. + self.assertRaises( + ZeroDivisionError, + self.assertRaises, + RuntimeError, self.raiseError, ZeroDivisionError) + + def test_assertRaises_returns_the_raised_exception(self): + # assertRaises returns the exception object that was raised. This is + # useful for testing that exceptions have the right message. + + # This contraption stores the raised exception, so we can compare it + # to the return value of assertRaises. + raisedExceptions = [] + def raiseError(): + try: + raise RuntimeError('Deliberate error') + except RuntimeError: + raisedExceptions.append(sys.exc_info()[1]) + raise + + exception = self.assertRaises(RuntimeError, raiseError) + self.assertEqual(1, len(raisedExceptions)) + self.assertTrue( + exception is raisedExceptions[0], + "%r is not %r" % (exception, raisedExceptions[0])) + + def test_assertRaises_with_multiple_exceptions(self): + # assertRaises((ExceptionOne, ExceptionTwo), function) asserts that + # function raises one of ExceptionTwo or ExceptionOne. + expectedExceptions = (RuntimeError, ZeroDivisionError) + self.assertRaises( + expectedExceptions, self.raiseError, expectedExceptions[0]) + self.assertRaises( + expectedExceptions, self.raiseError, expectedExceptions[1]) + + def test_assertRaises_with_multiple_exceptions_failure_mode(self): + # If assertRaises is called expecting one of a group of exceptions and + # a callable that doesn't raise an exception, then fail with an + # appropriate error message. + expectedExceptions = (RuntimeError, ZeroDivisionError) + failure = self.assertRaises( + self.failureException, + self.assertRaises, expectedExceptions, lambda: None) + self.assertEqual( + '%s not raised, None returned instead.' + % self._formatTypes(expectedExceptions), str(failure)) + + def assertFails(self, message, function, *args, **kwargs): + """Assert that function raises a failure with the given message.""" + failure = self.assertRaises( + self.failureException, function, *args, **kwargs) + self.assertEqual(message, str(failure)) + + def test_assertIn_success(self): + # assertIn(needle, haystack) asserts that 'needle' is in 'haystack'. + self.assertIn(3, range(10)) + self.assertIn('foo', 'foo bar baz') + self.assertIn('foo', 'foo bar baz'.split()) + + def test_assertIn_failure(self): + # assertIn(needle, haystack) fails the test when 'needle' is not in + # 'haystack'. + self.assertFails('3 not in [0, 1, 2]', self.assertIn, 3, [0, 1, 2]) + self.assertFails( + '%r not in %r' % ('qux', 'foo bar baz'), + self.assertIn, 'qux', 'foo bar baz') + + def test_assertNotIn_success(self): + # assertNotIn(needle, haystack) asserts that 'needle' is not in + # 'haystack'. + self.assertNotIn(3, [0, 1, 2]) + self.assertNotIn('qux', 'foo bar baz') + + def test_assertNotIn_failure(self): + # assertNotIn(needle, haystack) fails the test when 'needle' is in + # 'haystack'. + self.assertFails('3 in [1, 2, 3]', self.assertNotIn, 3, [1, 2, 3]) + self.assertFails( + '%r in %r' % ('foo', 'foo bar baz'), + self.assertNotIn, 'foo', 'foo bar baz') + + def test_assertIsInstance(self): + # assertIsInstance asserts that an object is an instance of a class. + + class Foo: + """Simple class for testing assertIsInstance.""" + + foo = Foo() + self.assertIsInstance(foo, Foo) + + def test_assertIsInstance_multiple_classes(self): + # assertIsInstance asserts that an object is an instance of one of a + # group of classes. + + class Foo: + """Simple class for testing assertIsInstance.""" + + class Bar: + """Another simple class for testing assertIsInstance.""" + + foo = Foo() + self.assertIsInstance(foo, (Foo, Bar)) + self.assertIsInstance(Bar(), (Foo, Bar)) + + def test_assertIsInstance_failure(self): + # assertIsInstance(obj, klass) fails the test when obj is not an + # instance of klass. + + class Foo: + """Simple class for testing assertIsInstance.""" + + self.assertFails( + '42 is not an instance of %s' % self._formatTypes(Foo), + self.assertIsInstance, 42, Foo) + + def test_assertIsInstance_failure_multiple_classes(self): + # assertIsInstance(obj, (klass1, klass2)) fails the test when obj is + # not an instance of klass1 or klass2. + + class Foo: + """Simple class for testing assertIsInstance.""" + + class Bar: + """Another simple class for testing assertIsInstance.""" + + self.assertFails( + '42 is not an instance of %s' % self._formatTypes([Foo, Bar]), + self.assertIsInstance, 42, (Foo, Bar)) + + def test_assertIs(self): + # assertIs asserts that an object is identical to another object. + self.assertIs(None, None) + some_list = [42] + self.assertIs(some_list, some_list) + some_object = object() + self.assertIs(some_object, some_object) + + def test_assertIs_fails(self): + # assertIs raises assertion errors if one object is not identical to + # another. + self.assertFails('None is not 42', self.assertIs, None, 42) + self.assertFails('[42] is not [42]', self.assertIs, [42], [42]) + + def test_assertIsNot(self): + # assertIsNot asserts that an object is not identical to another + # object. + self.assertIsNot(None, 42) + self.assertIsNot([42], [42]) + self.assertIsNot(object(), object()) + + def test_assertIsNot_fails(self): + # assertIsNot raises assertion errors if one object is identical to + # another. + self.assertFails('None is None', self.assertIsNot, None, None) + some_list = [42] + self.assertFails( + '[42] is [42]', self.assertIsNot, some_list, some_list) + + def test_assertThat_matches_clean(self): + class Matcher: + def match(self, foo): + return None + self.assertThat("foo", Matcher()) + + def test_assertThat_mismatch_raises_description(self): + calls = [] + class Mismatch: + def __init__(self, thing): + self.thing = thing + def describe(self): + calls.append(('describe_diff', self.thing)) + return "object is not a thing" + class Matcher: + def match(self, thing): + calls.append(('match', thing)) + return Mismatch(thing) + def __str__(self): + calls.append(('__str__',)) + return "a description" + class Test(TestCase): + def test(self): + self.assertThat("foo", Matcher()) + result = Test("test").run() + self.assertEqual([ + ('match', "foo"), + ('describe_diff', "foo"), + ('__str__',), + ], calls) + self.assertFalse(result.wasSuccessful()) + + +class TestAddCleanup(TestCase): + """Tests for TestCase.addCleanup.""" + + class LoggingTest(TestCase): + """A test that logs calls to setUp, runTest and tearDown.""" + + def setUp(self): + TestCase.setUp(self) + self._calls = ['setUp'] + + def brokenSetUp(self): + # A tearDown that deliberately fails. + self._calls = ['brokenSetUp'] + raise RuntimeError('Deliberate Failure') + + def runTest(self): + self._calls.append('runTest') + + def tearDown(self): + self._calls.append('tearDown') + TestCase.tearDown(self) + + def setUp(self): + TestCase.setUp(self) + self._result_calls = [] + self.test = TestAddCleanup.LoggingTest('runTest') + self.logging_result = LoggingResult(self._result_calls) + + def assertErrorLogEqual(self, messages): + self.assertEqual(messages, [call[0] for call in self._result_calls]) + + def assertTestLogEqual(self, messages): + """Assert that the call log equals `messages`.""" + case = self._result_calls[0][1] + self.assertEqual(messages, case._calls) + + def logAppender(self, message): + """A cleanup that appends `message` to the tests log. + + Cleanups are callables that are added to a test by addCleanup. To + verify that our cleanups run in the right order, we add strings to a + list that acts as a log. This method returns a cleanup that will add + the given message to that log when run. + """ + self.test._calls.append(message) + + def test_fixture(self): + # A normal run of self.test logs 'setUp', 'runTest' and 'tearDown'. + # This test doesn't test addCleanup itself, it just sanity checks the + # fixture. + self.test.run(self.logging_result) + self.assertTestLogEqual(['setUp', 'runTest', 'tearDown']) + + def test_cleanup_run_before_tearDown(self): + # Cleanup functions added with 'addCleanup' are called before tearDown + # runs. + self.test.addCleanup(self.logAppender, 'cleanup') + self.test.run(self.logging_result) + self.assertTestLogEqual(['setUp', 'runTest', 'tearDown', 'cleanup']) + + def test_add_cleanup_called_if_setUp_fails(self): + # Cleanup functions added with 'addCleanup' are called even if setUp + # fails. Note that tearDown has a different behavior: it is only + # called when setUp succeeds. + self.test.setUp = self.test.brokenSetUp + self.test.addCleanup(self.logAppender, 'cleanup') + self.test.run(self.logging_result) + self.assertTestLogEqual(['brokenSetUp', 'cleanup']) + + def test_addCleanup_called_in_reverse_order(self): + # Cleanup functions added with 'addCleanup' are called in reverse + # order. + # + # One of the main uses of addCleanup is to dynamically create + # resources that need some sort of explicit tearDown. Often one + # resource will be created in terms of another, e.g., + # self.first = self.makeFirst() + # self.second = self.makeSecond(self.first) + # + # When this happens, we generally want to clean up the second resource + # before the first one, since the second depends on the first. + self.test.addCleanup(self.logAppender, 'first') + self.test.addCleanup(self.logAppender, 'second') + self.test.run(self.logging_result) + self.assertTestLogEqual( + ['setUp', 'runTest', 'tearDown', 'second', 'first']) + + def test_tearDown_runs_after_cleanup_failure(self): + # tearDown runs even if a cleanup function fails. + self.test.addCleanup(lambda: 1/0) + self.test.run(self.logging_result) + self.assertTestLogEqual(['setUp', 'runTest', 'tearDown']) + + def test_cleanups_continue_running_after_error(self): + # All cleanups are always run, even if one or two of them fail. + self.test.addCleanup(self.logAppender, 'first') + self.test.addCleanup(lambda: 1/0) + self.test.addCleanup(self.logAppender, 'second') + self.test.run(self.logging_result) + self.assertTestLogEqual( + ['setUp', 'runTest', 'tearDown', 'second', 'first']) + + def test_error_in_cleanups_are_captured(self): + # If a cleanup raises an error, we want to record it and fail the the + # test, even though we go on to run other cleanups. + self.test.addCleanup(lambda: 1/0) + self.test.run(self.logging_result) + self.assertErrorLogEqual(['startTest', 'addError', 'stopTest']) + + def test_keyboard_interrupt_not_caught(self): + # If a cleanup raises KeyboardInterrupt, it gets reraised. + def raiseKeyboardInterrupt(): + raise KeyboardInterrupt() + self.test.addCleanup(raiseKeyboardInterrupt) + self.assertRaises( + KeyboardInterrupt, self.test.run, self.logging_result) + + def test_multipleErrorsReported(self): + # Errors from all failing cleanups are reported. + self.test.addCleanup(lambda: 1/0) + self.test.addCleanup(lambda: 1/0) + self.test.run(self.logging_result) + self.assertErrorLogEqual( + ['startTest', 'addError', 'addError', 'stopTest']) + + +class TestWithDetails(TestCase): + + def assertDetailsProvided(self, case, expected_outcome, expected_keys): + """Assert that when case is run, details are provided to the result. + + :param case: A TestCase to run. + :param expected_outcome: The call that should be made. + :param expected_keys: The keys to look for. + """ + result = ExtendedTestResult() + case.run(result) + case = result._events[0][1] + expected = [ + ('startTest', case), + (expected_outcome, case), + ('stopTest', case), + ] + self.assertEqual(3, len(result._events)) + self.assertEqual(expected[0], result._events[0]) + self.assertEqual(expected[1], result._events[1][0:2]) + # Checking the TB is right is rather tricky. doctest line matching + # would help, but 'meh'. + self.assertEqual(sorted(expected_keys), + sorted(result._events[1][2].keys())) + self.assertEqual(expected[-1], result._events[-1]) + + def get_content(self): + return content.Content( + content.ContentType("text", "foo"), lambda: ['foo']) + + +class TestExpectedFailure(TestWithDetails): + """Tests for expected failures and unexpected successess.""" + + def make_unexpected_case(self): + class Case(TestCase): + def test(self): + raise testcase._UnexpectedSuccess + case = Case('test') + return case + + def test_raising__UnexpectedSuccess_py27(self): + case = self.make_unexpected_case() + result = Python27TestResult() + case.run(result) + case = result._events[0][1] + self.assertEqual([ + ('startTest', case), + ('addUnexpectedSuccess', case), + ('stopTest', case), + ], result._events) + + def test_raising__UnexpectedSuccess_extended(self): + case = self.make_unexpected_case() + result = ExtendedTestResult() + case.run(result) + case = result._events[0][1] + self.assertEqual([ + ('startTest', case), + ('addUnexpectedSuccess', case, {}), + ('stopTest', case), + ], result._events) + + def make_xfail_case_xfails(self): + content = self.get_content() + class Case(TestCase): + def test(self): + self.addDetail("foo", content) + self.expectFailure("we are sad", self.assertEqual, + 1, 0) + case = Case('test') + return case + + def make_xfail_case_succeeds(self): + content = self.get_content() + class Case(TestCase): + def test(self): + self.addDetail("foo", content) + self.expectFailure("we are sad", self.assertEqual, + 1, 1) + case = Case('test') + return case + + def test_expectFailure_KnownFailure_extended(self): + case = self.make_xfail_case_xfails() + self.assertDetailsProvided(case, "addExpectedFailure", + ["foo", "traceback", "reason"]) + + def test_expectFailure_KnownFailure_unexpected_success(self): + case = self.make_xfail_case_succeeds() + self.assertDetailsProvided(case, "addUnexpectedSuccess", + ["foo", "reason"]) + + +class TestUniqueFactories(TestCase): + """Tests for getUniqueString and getUniqueInteger.""" + + def test_getUniqueInteger(self): + # getUniqueInteger returns an integer that increments each time you + # call it. + one = self.getUniqueInteger() + self.assertEqual(1, one) + two = self.getUniqueInteger() + self.assertEqual(2, two) + + def test_getUniqueString(self): + # getUniqueString returns the current test id followed by a unique + # integer. + name_one = self.getUniqueString() + self.assertEqual('%s-%d' % (self.id(), 1), name_one) + name_two = self.getUniqueString() + self.assertEqual('%s-%d' % (self.id(), 2), name_two) + + def test_getUniqueString_prefix(self): + # If getUniqueString is given an argument, it uses that argument as + # the prefix of the unique string, rather than the test id. + name_one = self.getUniqueString('foo') + self.assertThat(name_one, Equals('foo-1')) + name_two = self.getUniqueString('bar') + self.assertThat(name_two, Equals('bar-2')) + + +class TestCloneTestWithNewId(TestCase): + """Tests for clone_test_with_new_id.""" + + def test_clone_test_with_new_id(self): + class FooTestCase(TestCase): + def test_foo(self): + pass + test = FooTestCase('test_foo') + oldName = test.id() + newName = self.getUniqueString() + newTest = clone_test_with_new_id(test, newName) + self.assertEqual(newName, newTest.id()) + self.assertEqual(oldName, test.id(), + "the original test instance should be unchanged.") + + +class TestDetailsProvided(TestWithDetails): + + def test_addDetail(self): + mycontent = self.get_content() + self.addDetail("foo", mycontent) + details = self.getDetails() + self.assertEqual({"foo": mycontent}, details) + + def test_addError(self): + class Case(TestCase): + def test(this): + this.addDetail("foo", self.get_content()) + 1/0 + self.assertDetailsProvided(Case("test"), "addError", + ["foo", "traceback"]) + + def test_addFailure(self): + class Case(TestCase): + def test(this): + this.addDetail("foo", self.get_content()) + self.fail('yo') + self.assertDetailsProvided(Case("test"), "addFailure", + ["foo", "traceback"]) + + def test_addSkip(self): + class Case(TestCase): + def test(this): + this.addDetail("foo", self.get_content()) + self.skip('yo') + self.assertDetailsProvided(Case("test"), "addSkip", + ["foo", "reason"]) + + def test_addSucccess(self): + class Case(TestCase): + def test(this): + this.addDetail("foo", self.get_content()) + self.assertDetailsProvided(Case("test"), "addSuccess", + ["foo"]) + + def test_addUnexpectedSuccess(self): + class Case(TestCase): + def test(this): + this.addDetail("foo", self.get_content()) + raise testcase._UnexpectedSuccess() + self.assertDetailsProvided(Case("test"), "addUnexpectedSuccess", + ["foo"]) + + +class TestSetupTearDown(TestCase): + + def test_setUpNotCalled(self): + class DoesnotcallsetUp(TestCase): + def setUp(self): + pass + def test_method(self): + pass + result = unittest.TestResult() + DoesnotcallsetUp('test_method').run(result) + self.assertEqual(1, len(result.errors)) + + def test_tearDownNotCalled(self): + class DoesnotcalltearDown(TestCase): + def test_method(self): + pass + def tearDown(self): + pass + result = unittest.TestResult() + DoesnotcalltearDown('test_method').run(result) + self.assertEqual(1, len(result.errors)) + + +class TestSkipping(TestCase): + """Tests for skipping of tests functionality.""" + + def test_skip_causes_skipException(self): + self.assertRaises(self.skipException, self.skip, "Skip this test") + + def test_skip_without_reason_works(self): + class Test(TestCase): + def test(self): + raise self.skipException() + case = Test("test") + result = ExtendedTestResult() + case.run(result) + self.assertEqual('addSkip', result._events[1][0]) + self.assertEqual('no reason given.', + ''.join(result._events[1][2]['reason'].iter_text())) + + def test_skipException_in_setup_calls_result_addSkip(self): + class TestThatRaisesInSetUp(TestCase): + def setUp(self): + TestCase.setUp(self) + self.skip("skipping this test") + def test_that_passes(self): + pass + calls = [] + result = LoggingResult(calls) + test = TestThatRaisesInSetUp("test_that_passes") + test.run(result) + case = result._events[0][1] + self.assertEqual([('startTest', case), + ('addSkip', case, "Text attachment: reason\n------------\n" + "skipping this test\n------------\n"), ('stopTest', case)], + calls) + + def test_skipException_in_test_method_calls_result_addSkip(self): + class SkippingTest(TestCase): + def test_that_raises_skipException(self): + self.skip("skipping this test") + result = Python27TestResult() + test = SkippingTest("test_that_raises_skipException") + test.run(result) + case = result._events[0][1] + self.assertEqual([('startTest', case), + ('addSkip', case, "Text attachment: reason\n------------\n" + "skipping this test\n------------\n"), ('stopTest', case)], + result._events) + + def test_skip__in_setup_with_old_result_object_calls_addSuccess(self): + class SkippingTest(TestCase): + def setUp(self): + TestCase.setUp(self) + raise self.skipException("skipping this test") + def test_that_raises_skipException(self): + pass + result = Python26TestResult() + test = SkippingTest("test_that_raises_skipException") + test.run(result) + self.assertEqual('addSuccess', result._events[1][0]) + + def test_skip_with_old_result_object_calls_addError(self): + class SkippingTest(TestCase): + def test_that_raises_skipException(self): + raise self.skipException("skipping this test") + result = Python26TestResult() + test = SkippingTest("test_that_raises_skipException") + test.run(result) + self.assertEqual('addSuccess', result._events[1][0]) + + def test_skip_decorator(self): + class SkippingTest(TestCase): + @skip("skipping this test") + def test_that_is_decorated_with_skip(self): + self.fail() + result = Python26TestResult() + test = SkippingTest("test_that_is_decorated_with_skip") + test.run(result) + self.assertEqual('addSuccess', result._events[1][0]) + + def test_skipIf_decorator(self): + class SkippingTest(TestCase): + @skipIf(True, "skipping this test") + def test_that_is_decorated_with_skipIf(self): + self.fail() + result = Python26TestResult() + test = SkippingTest("test_that_is_decorated_with_skipIf") + test.run(result) + self.assertEqual('addSuccess', result._events[1][0]) + + def test_skipUnless_decorator(self): + class SkippingTest(TestCase): + @skipUnless(False, "skipping this test") + def test_that_is_decorated_with_skipUnless(self): + self.fail() + result = Python26TestResult() + test = SkippingTest("test_that_is_decorated_with_skipUnless") + test.run(result) + self.assertEqual('addSuccess', result._events[1][0]) + + +class TestOnException(TestCase): + + def test_default_works(self): + events = [] + class Case(TestCase): + def method(self): + self.onException(an_exc_info) + events.append(True) + case = Case("method") + case.run() + self.assertThat(events, Equals([True])) + + def test_added_handler_works(self): + events = [] + class Case(TestCase): + def method(self): + self.addOnException(events.append) + self.onException(an_exc_info) + case = Case("method") + case.run() + self.assertThat(events, Equals([an_exc_info])) + + def test_handler_that_raises_is_not_caught(self): + events = [] + class Case(TestCase): + def method(self): + self.addOnException(events.index) + self.assertRaises(ValueError, self.onException, an_exc_info) + case = Case("method") + case.run() + self.assertThat(events, Equals([])) + + +def test_suite(): + from unittest import TestLoader + return TestLoader().loadTestsFromName(__name__) diff --git a/lib/subunit/python/testtools/testsuite.py b/lib/subunit/python/testtools/testsuite.py new file mode 100644 index 0000000000..26b193799b --- /dev/null +++ b/lib/subunit/python/testtools/testsuite.py @@ -0,0 +1,74 @@ +# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details. + +"""Test suites and related things.""" + +__metaclass__ = type +__all__ = [ + 'ConcurrentTestSuite', + ] + +try: + import Queue +except ImportError: + import queue as Queue +import threading +import unittest + +import testtools + + +class ConcurrentTestSuite(unittest.TestSuite): + """A TestSuite whose run() calls out to a concurrency strategy.""" + + def __init__(self, suite, make_tests): + """Create a ConcurrentTestSuite to execute suite. + + :param suite: A suite to run concurrently. + :param make_tests: A helper function to split the tests in the + ConcurrentTestSuite into some number of concurrently executing + sub-suites. make_tests must take a suite, and return an iterable + of TestCase-like object, each of which must have a run(result) + method. + """ + super(ConcurrentTestSuite, self).__init__([suite]) + self.make_tests = make_tests + + def run(self, result): + """Run the tests concurrently. + + This calls out to the provided make_tests helper, and then serialises + the results so that result only sees activity from one TestCase at + a time. + + ConcurrentTestSuite provides no special mechanism to stop the tests + returned by make_tests, it is up to the make_tests to honour the + shouldStop attribute on the result object they are run with, which will + be set if an exception is raised in the thread which + ConcurrentTestSuite.run is called in. + """ + tests = self.make_tests(self) + try: + threads = {} + queue = Queue.Queue() + result_semaphore = threading.Semaphore(1) + for test in tests: + process_result = testtools.ThreadsafeForwardingResult(result, + result_semaphore) + reader_thread = threading.Thread( + target=self._run_test, args=(test, process_result, queue)) + threads[test] = reader_thread, process_result + reader_thread.start() + while threads: + finished_test = queue.get() + threads[finished_test][0].join() + del threads[finished_test] + except: + for thread, process_result in threads.values(): + process_result.stop() + raise + + def _run_test(self, test, process_result, queue): + try: + test.run(process_result) + finally: + queue.put(test) diff --git a/lib/subunit/python/testtools/utils.py b/lib/subunit/python/testtools/utils.py new file mode 100644 index 0000000000..325572297b --- /dev/null +++ b/lib/subunit/python/testtools/utils.py @@ -0,0 +1,39 @@ +# Copyright (c) 2008 Jonathan M. Lange. See LICENSE for details. + +"""Utilities for dealing with stuff in unittest.""" + + +import sys + +__metaclass__ = type +__all__ = [ + 'iterate_tests', + ] + + +if sys.version_info > (3, 0): + def _u(s): + """Replacement for u'some string' in Python 3.""" + return s + def _b(s): + """A byte literal.""" + return s.encode("latin-1") + advance_iterator = next +else: + def _u(s): + return unicode(s, "latin-1") + def _b(s): + return s + advance_iterator = lambda it: it.next() + + +def iterate_tests(test_suite_or_case): + """Iterate through all of the test cases in `test_suite_or_case`.""" + try: + suite = iter(test_suite_or_case) + except TypeError: + yield test_suite_or_case + else: + for test in suite: + for subtest in iterate_tests(test): + yield subtest |