summaryrefslogtreecommitdiff
path: root/lib/testtools/testtools/testresult/real.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/testtools/testtools/testresult/real.py')
-rw-r--r--lib/testtools/testtools/testresult/real.py348
1 files changed, 282 insertions, 66 deletions
diff --git a/lib/testtools/testtools/testresult/real.py b/lib/testtools/testtools/testresult/real.py
index a627f0900e..cf3ecf4fc8 100644
--- a/lib/testtools/testtools/testresult/real.py
+++ b/lib/testtools/testtools/testresult/real.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2008 testtools developers. See LICENSE for details.
+# Copyright (c) 2008-2012 testtools developers. See LICENSE for details.
"""Test results and related things."""
@@ -6,16 +6,22 @@ __metaclass__ = type
__all__ = [
'ExtendedToOriginalDecorator',
'MultiTestResult',
+ 'Tagger',
'TestResult',
+ 'TestResultDecorator',
'ThreadsafeForwardingResult',
]
import datetime
import sys
-import traceback
import unittest
-from testtools.compat import all, _format_exc_info, str_is_unicode, _u
+from testtools.compat import all, str_is_unicode, _u
+from testtools.content import (
+ text_content,
+ TracebackContent,
+ )
+from testtools.tags import TagContext
# From http://docs.python.org/library/datetime.html
_ZERO = datetime.timedelta(0)
@@ -36,9 +42,6 @@ class UTC(datetime.tzinfo):
utc = UTC()
-STDOUT_LINE = '\nStdout:\n%s'
-STDERR_LINE = '\nStderr:\n%s'
-
class TestResult(unittest.TestResult):
"""Subclass of unittest.TestResult extending the protocol for flexability.
@@ -118,7 +121,7 @@ class TestResult(unittest.TestResult):
if reason is None:
reason = 'No reason given'
else:
- reason = ''.join(reason.iter_text())
+ reason = reason.as_text()
skip_list = self.skip_reasons.setdefault(reason, [])
skip_list.append(test)
@@ -141,50 +144,17 @@ class TestResult(unittest.TestResult):
"""
return not (self.errors or self.failures or self.unexpectedSuccesses)
- def _exc_info_to_unicode(self, err, test):
- """Converts a sys.exc_info()-style tuple of values into a string.
-
- Copied from Python 2.7's unittest.TestResult._exc_info_to_string.
- """
- exctype, value, tb = err
- # Skip test runner traceback levels
- while tb and self._is_relevant_tb_level(tb):
- tb = tb.tb_next
-
- # testtools customization. When str is unicode (e.g. IronPython,
- # Python 3), traceback.format_exception returns unicode. For Python 2,
- # it returns bytes. We need to guarantee unicode.
- if str_is_unicode:
- format_exception = traceback.format_exception
- else:
- format_exception = _format_exc_info
-
- if test.failureException and isinstance(value, test.failureException):
- # Skip assert*() traceback levels
- length = self._count_relevant_tb_levels(tb)
- msgLines = format_exception(exctype, value, tb, length)
- else:
- msgLines = format_exception(exctype, value, tb)
-
- if getattr(self, 'buffer', None):
- output = sys.stdout.getvalue()
- error = sys.stderr.getvalue()
- if output:
- if not output.endswith('\n'):
- output += '\n'
- msgLines.append(STDOUT_LINE % output)
- if error:
- if not error.endswith('\n'):
- error += '\n'
- msgLines.append(STDERR_LINE % error)
- return ''.join(msgLines)
-
def _err_details_to_string(self, test, err=None, details=None):
"""Convert an error in exc_info form or a contents dict to a string."""
if err is not None:
- return self._exc_info_to_unicode(err, test)
+ return TracebackContent(err, test).as_text()
return _details_to_str(details, special='traceback')
+ def _exc_info_to_unicode(self, err, test):
+ # Deprecated. Only present because subunit upcalls to it. See
+ # <https://bugs.launchpad.net/testtools/+bug/929063>.
+ return TracebackContent(err, test).as_text()
+
def _now(self):
"""Return the current 'test time'.
@@ -207,6 +177,7 @@ class TestResult(unittest.TestResult):
super(TestResult, self).__init__()
self.skip_reasons = {}
self.__now = None
+ self._tags = TagContext()
# -- Start: As per python 2.7 --
self.expectedFailures = []
self.unexpectedSuccesses = []
@@ -218,6 +189,27 @@ class TestResult(unittest.TestResult):
New in python 2.7
"""
+ def startTest(self, test):
+ super(TestResult, self).startTest(test)
+ self._tags = TagContext(self._tags)
+
+ def stopTest(self, test):
+ self._tags = self._tags.parent
+ super(TestResult, self).stopTest(test)
+
+ @property
+ def current_tags(self):
+ """The currently set tags."""
+ return self._tags.get_current_tags()
+
+ def tags(self, new_tags, gone_tags):
+ """Add and remove tags from the test.
+
+ :param new_tags: A set of tags to be added to the stream.
+ :param gone_tags: A set of tags to be removed from the stream.
+ """
+ self._tags.change_tags(new_tags, gone_tags)
+
def time(self, a_datetime):
"""Provide a timestamp to represent the current time.
@@ -244,7 +236,7 @@ class MultiTestResult(TestResult):
"""A test result that dispatches to many test results."""
def __init__(self, *results):
- TestResult.__init__(self)
+ super(MultiTestResult, self).__init__()
self._results = list(map(ExtendedToOriginalDecorator, results))
def __repr__(self):
@@ -257,9 +249,11 @@ class MultiTestResult(TestResult):
for result in self._results)
def startTest(self, test):
+ super(MultiTestResult, self).startTest(test)
return self._dispatch('startTest', test)
def stopTest(self, test):
+ super(MultiTestResult, self).stopTest(test)
return self._dispatch('stopTest', test)
def addError(self, test, error=None, details=None):
@@ -282,11 +276,16 @@ class MultiTestResult(TestResult):
return self._dispatch('addUnexpectedSuccess', test, details=details)
def startTestRun(self):
+ super(MultiTestResult, self).startTestRun()
return self._dispatch('startTestRun')
def stopTestRun(self):
return self._dispatch('stopTestRun')
+ def tags(self, new_tags, gone_tags):
+ super(MultiTestResult, self).tags(new_tags, gone_tags)
+ return self._dispatch('tags', new_tags, gone_tags)
+
def time(self, a_datetime):
return self._dispatch('time', a_datetime)
@@ -358,46 +357,67 @@ class TextTestResult(TestResult):
class ThreadsafeForwardingResult(TestResult):
"""A TestResult which ensures the target does not receive mixed up calls.
- This is used when receiving test results from multiple sources, and batches
- up all the activity for a single test into a thread-safe batch where all
- other ThreadsafeForwardingResult objects sharing the same semaphore will be
- locked out.
+ Multiple ``ThreadsafeForwardingResults`` can forward to the same target
+ result, and that target result will only ever receive the complete set of
+ events for one test at a time.
+
+ This is enforced using a semaphore, which further guarantees that tests
+ will be sent atomically even if the ``ThreadsafeForwardingResults`` are in
+ different threads.
- Typical use of ThreadsafeForwardingResult involves creating one
- ThreadsafeForwardingResult per thread in a ConcurrentTestSuite. These
- forward to the TestResult that the ConcurrentTestSuite run method was
- called with.
+ ``ThreadsafeForwardingResult`` is typically used by
+ ``ConcurrentTestSuite``, which creates one ``ThreadsafeForwardingResult``
+ per thread, each of which wraps of the TestResult that
+ ``ConcurrentTestSuite.run()`` is called with.
- target.done() is called once for each ThreadsafeForwardingResult that
- forwards to the same target. If the target's done() takes special action,
- care should be taken to accommodate this.
+ target.startTestRun() and target.stopTestRun() are called once for each
+ ThreadsafeForwardingResult that forwards to the same target. If the target
+ takes special action on these events, it should take care to accommodate
+ this.
+
+ time() and tags() calls are batched to be adjacent to the test result and
+ in the case of tags() are coerced into test-local scope, avoiding the
+ opportunity for bugs around global state in the target.
"""
def __init__(self, target, semaphore):
"""Create a ThreadsafeForwardingResult forwarding to target.
- :param target: A TestResult.
- :param semaphore: A threading.Semaphore with limit 1.
+ :param target: A ``TestResult``.
+ :param semaphore: A ``threading.Semaphore`` with limit 1.
"""
TestResult.__init__(self)
self.result = ExtendedToOriginalDecorator(target)
self.semaphore = semaphore
+ self._test_start = None
+ self._global_tags = set(), set()
+ self._test_tags = set(), set()
def __repr__(self):
return '<%s %r>' % (self.__class__.__name__, self.result)
+ def _any_tags(self, tags):
+ return bool(tags[0] or tags[1])
+
def _add_result_with_semaphore(self, method, test, *args, **kwargs):
+ now = self._now()
self.semaphore.acquire()
try:
self.result.time(self._test_start)
self.result.startTest(test)
- self.result.time(self._now())
+ self.result.time(now)
+ if self._any_tags(self._global_tags):
+ self.result.tags(*self._global_tags)
+ if self._any_tags(self._test_tags):
+ self.result.tags(*self._test_tags)
+ self._test_tags = set(), set()
try:
method(test, *args, **kwargs)
finally:
self.result.stopTest(test)
finally:
self.semaphore.release()
+ self._test_start = None
def addError(self, test, err=None, details=None):
self._add_result_with_semaphore(self.result.addError,
@@ -424,6 +444,7 @@ class ThreadsafeForwardingResult(TestResult):
test, details=details)
def startTestRun(self):
+ super(ThreadsafeForwardingResult, self).startTestRun()
self.semaphore.acquire()
try:
self.result.startTestRun()
@@ -451,6 +472,27 @@ class ThreadsafeForwardingResult(TestResult):
def wasSuccessful(self):
return self.result.wasSuccessful()
+ def tags(self, new_tags, gone_tags):
+ """See `TestResult`."""
+ super(ThreadsafeForwardingResult, self).tags(new_tags, gone_tags)
+ if self._test_start is not None:
+ self._test_tags = _merge_tags(
+ self._test_tags, (new_tags, gone_tags))
+ else:
+ self._global_tags = _merge_tags(
+ self._global_tags, (new_tags, gone_tags))
+
+
+def _merge_tags(existing, changed):
+ new_tags, gone_tags = changed
+ result_new = set(existing[0])
+ result_gone = set(existing[1])
+ result_new.update(new_tags)
+ result_new.difference_update(gone_tags)
+ result_gone.update(gone_tags)
+ result_gone.difference_update(new_tags)
+ return result_new, result_gone
+
class ExtendedToOriginalDecorator(object):
"""Permit new TestResult API code to degrade gracefully with old results.
@@ -464,6 +506,7 @@ class ExtendedToOriginalDecorator(object):
def __init__(self, decorated):
self.decorated = decorated
+ self._tags = TagContext()
def __repr__(self):
return '<%s %r>' % (self.__class__.__name__, self.decorated)
@@ -516,7 +559,7 @@ class ExtendedToOriginalDecorator(object):
except TypeError:
# extract the reason if it's available
try:
- reason = ''.join(details['reason'].iter_text())
+ reason = details['reason'].as_text()
except KeyError:
reason = _details_to_str(details)
return addSkip(test, reason)
@@ -560,6 +603,11 @@ class ExtendedToOriginalDecorator(object):
_StringException(_details_to_str(details, special='traceback')),
None)
+ @property
+ def current_tags(self):
+ return getattr(
+ self.decorated, 'current_tags', self._tags.get_current_tags())
+
def done(self):
try:
return self.decorated.done()
@@ -577,9 +625,11 @@ class ExtendedToOriginalDecorator(object):
return self.decorated.shouldStop
def startTest(self, test):
+ self._tags = TagContext(self._tags)
return self.decorated.startTest(test)
def startTestRun(self):
+ self._tags = TagContext()
try:
return self.decorated.startTestRun()
except AttributeError:
@@ -589,6 +639,7 @@ class ExtendedToOriginalDecorator(object):
return self.decorated.stop()
def stopTest(self, test):
+ self._tags = self._tags.parent
return self.decorated.stopTest(test)
def stopTestRun(self):
@@ -599,9 +650,10 @@ class ExtendedToOriginalDecorator(object):
def tags(self, new_tags, gone_tags):
method = getattr(self.decorated, 'tags', None)
- if method is None:
- return
- return method(new_tags, gone_tags)
+ if method is not None:
+ return method(new_tags, gone_tags)
+ else:
+ self._tags.change_tags(new_tags, gone_tags)
def time(self, a_datetime):
method = getattr(self.decorated, 'time', None)
@@ -613,6 +665,170 @@ class ExtendedToOriginalDecorator(object):
return self.decorated.wasSuccessful()
+class TestResultDecorator(object):
+ """General pass-through decorator.
+
+ This provides a base that other TestResults can inherit from to
+ gain basic forwarding functionality.
+ """
+
+ def __init__(self, decorated):
+ """Create a TestResultDecorator forwarding to decorated."""
+ self.decorated = decorated
+
+ def startTest(self, test):
+ return self.decorated.startTest(test)
+
+ def startTestRun(self):
+ return self.decorated.startTestRun()
+
+ def stopTest(self, test):
+ return self.decorated.stopTest(test)
+
+ def stopTestRun(self):
+ return self.decorated.stopTestRun()
+
+ def addError(self, test, err=None, details=None):
+ return self.decorated.addError(test, err, details=details)
+
+ def addFailure(self, test, err=None, details=None):
+ return self.decorated.addFailure(test, err, details=details)
+
+ def addSuccess(self, test, details=None):
+ return self.decorated.addSuccess(test, details=details)
+
+ def addSkip(self, test, reason=None, details=None):
+ return self.decorated.addSkip(test, reason, details=details)
+
+ def addExpectedFailure(self, test, err=None, details=None):
+ return self.decorated.addExpectedFailure(test, err, details=details)
+
+ def addUnexpectedSuccess(self, test, details=None):
+ return self.decorated.addUnexpectedSuccess(test, details=details)
+
+ def progress(self, offset, whence):
+ return self.decorated.progress(offset, whence)
+
+ def wasSuccessful(self):
+ return self.decorated.wasSuccessful()
+
+ @property
+ def current_tags(self):
+ return self.decorated.current_tags
+
+ @property
+ def shouldStop(self):
+ return self.decorated.shouldStop
+
+ def stop(self):
+ return self.decorated.stop()
+
+ @property
+ def testsRun(self):
+ return self.decorated.testsRun
+
+ def tags(self, new_tags, gone_tags):
+ return self.decorated.tags(new_tags, gone_tags)
+
+ def time(self, a_datetime):
+ return self.decorated.time(a_datetime)
+
+
+class Tagger(TestResultDecorator):
+ """Tag each test individually."""
+
+ def __init__(self, decorated, new_tags, gone_tags):
+ """Wrap 'decorated' such that each test is tagged.
+
+ :param new_tags: Tags to be added for each test.
+ :param gone_tags: Tags to be removed for each test.
+ """
+ super(Tagger, self).__init__(decorated)
+ self._new_tags = set(new_tags)
+ self._gone_tags = set(gone_tags)
+
+ def startTest(self, test):
+ super(Tagger, self).startTest(test)
+ self.tags(self._new_tags, self._gone_tags)
+
+
+class TestByTestResult(TestResult):
+ """Call something every time a test completes."""
+
+ def __init__(self, on_test):
+ """Construct a ``TestByTestResult``.
+
+ :param on_test: A callable that take a test case, a status (one of
+ "success", "failure", "error", "skip", or "xfail"), a start time
+ (a ``datetime`` with timezone), a stop time, an iterable of tags,
+ and a details dict. Is called at the end of each test (i.e. on
+ ``stopTest``) with the accumulated values for that test.
+ """
+ super(TestByTestResult, self).__init__()
+ self._on_test = on_test
+
+ def startTest(self, test):
+ super(TestByTestResult, self).startTest(test)
+ self._start_time = self._now()
+ # There's no supported (i.e. tested) behaviour that relies on these
+ # being set, but it makes me more comfortable all the same. -- jml
+ self._status = None
+ self._details = None
+ self._stop_time = None
+
+ def stopTest(self, test):
+ self._stop_time = self._now()
+ tags = set(self.current_tags)
+ super(TestByTestResult, self).stopTest(test)
+ self._on_test(
+ test=test,
+ status=self._status,
+ start_time=self._start_time,
+ stop_time=self._stop_time,
+ tags=tags,
+ details=self._details)
+
+ def _err_to_details(self, test, err, details):
+ if details:
+ return details
+ return {'traceback': TracebackContent(err, test)}
+
+ def addSuccess(self, test, details=None):
+ super(TestByTestResult, self).addSuccess(test)
+ self._status = 'success'
+ self._details = details
+
+ def addFailure(self, test, err=None, details=None):
+ super(TestByTestResult, self).addFailure(test, err, details)
+ self._status = 'failure'
+ self._details = self._err_to_details(test, err, details)
+
+ def addError(self, test, err=None, details=None):
+ super(TestByTestResult, self).addError(test, err, details)
+ self._status = 'error'
+ self._details = self._err_to_details(test, err, details)
+
+ def addSkip(self, test, reason=None, details=None):
+ super(TestByTestResult, self).addSkip(test, reason, details)
+ self._status = 'skip'
+ if details is None:
+ details = {'reason': text_content(reason)}
+ elif reason:
+ # XXX: What if details already has 'reason' key?
+ details['reason'] = text_content(reason)
+ self._details = details
+
+ def addExpectedFailure(self, test, err=None, details=None):
+ super(TestByTestResult, self).addExpectedFailure(test, err, details)
+ self._status = 'xfail'
+ self._details = self._err_to_details(test, err, details)
+
+ def addUnexpectedSuccess(self, test, details=None):
+ super(TestByTestResult, self).addUnexpectedSuccess(test, details)
+ self._status = 'success'
+ self._details = details
+
+
class _StringException(Exception):
"""An exception made from an arbitrary string."""
@@ -665,7 +881,7 @@ def _details_to_str(details, special=None):
if content.content_type.type != 'text':
binary_attachments.append((key, content.content_type))
continue
- text = _u('').join(content.iter_text()).strip()
+ text = content.as_text().strip()
if not text:
empty_attachments.append(key)
continue