diff options
Diffstat (limited to 'lib/testtools/testtools/testresult/real.py')
-rw-r--r-- | lib/testtools/testtools/testresult/real.py | 348 |
1 files changed, 282 insertions, 66 deletions
diff --git a/lib/testtools/testtools/testresult/real.py b/lib/testtools/testtools/testresult/real.py index a627f0900e..cf3ecf4fc8 100644 --- a/lib/testtools/testtools/testresult/real.py +++ b/lib/testtools/testtools/testresult/real.py @@ -1,4 +1,4 @@ -# Copyright (c) 2008 testtools developers. See LICENSE for details. +# Copyright (c) 2008-2012 testtools developers. See LICENSE for details. """Test results and related things.""" @@ -6,16 +6,22 @@ __metaclass__ = type __all__ = [ 'ExtendedToOriginalDecorator', 'MultiTestResult', + 'Tagger', 'TestResult', + 'TestResultDecorator', 'ThreadsafeForwardingResult', ] import datetime import sys -import traceback import unittest -from testtools.compat import all, _format_exc_info, str_is_unicode, _u +from testtools.compat import all, str_is_unicode, _u +from testtools.content import ( + text_content, + TracebackContent, + ) +from testtools.tags import TagContext # From http://docs.python.org/library/datetime.html _ZERO = datetime.timedelta(0) @@ -36,9 +42,6 @@ class UTC(datetime.tzinfo): utc = UTC() -STDOUT_LINE = '\nStdout:\n%s' -STDERR_LINE = '\nStderr:\n%s' - class TestResult(unittest.TestResult): """Subclass of unittest.TestResult extending the protocol for flexability. @@ -118,7 +121,7 @@ class TestResult(unittest.TestResult): if reason is None: reason = 'No reason given' else: - reason = ''.join(reason.iter_text()) + reason = reason.as_text() skip_list = self.skip_reasons.setdefault(reason, []) skip_list.append(test) @@ -141,50 +144,17 @@ class TestResult(unittest.TestResult): """ return not (self.errors or self.failures or self.unexpectedSuccesses) - def _exc_info_to_unicode(self, err, test): - """Converts a sys.exc_info()-style tuple of values into a string. - - Copied from Python 2.7's unittest.TestResult._exc_info_to_string. - """ - exctype, value, tb = err - # Skip test runner traceback levels - while tb and self._is_relevant_tb_level(tb): - tb = tb.tb_next - - # testtools customization. When str is unicode (e.g. IronPython, - # Python 3), traceback.format_exception returns unicode. For Python 2, - # it returns bytes. We need to guarantee unicode. - if str_is_unicode: - format_exception = traceback.format_exception - else: - format_exception = _format_exc_info - - if test.failureException and isinstance(value, test.failureException): - # Skip assert*() traceback levels - length = self._count_relevant_tb_levels(tb) - msgLines = format_exception(exctype, value, tb, length) - else: - msgLines = format_exception(exctype, value, tb) - - if getattr(self, 'buffer', None): - output = sys.stdout.getvalue() - error = sys.stderr.getvalue() - if output: - if not output.endswith('\n'): - output += '\n' - msgLines.append(STDOUT_LINE % output) - if error: - if not error.endswith('\n'): - error += '\n' - msgLines.append(STDERR_LINE % error) - return ''.join(msgLines) - def _err_details_to_string(self, test, err=None, details=None): """Convert an error in exc_info form or a contents dict to a string.""" if err is not None: - return self._exc_info_to_unicode(err, test) + return TracebackContent(err, test).as_text() return _details_to_str(details, special='traceback') + def _exc_info_to_unicode(self, err, test): + # Deprecated. Only present because subunit upcalls to it. See + # <https://bugs.launchpad.net/testtools/+bug/929063>. + return TracebackContent(err, test).as_text() + def _now(self): """Return the current 'test time'. @@ -207,6 +177,7 @@ class TestResult(unittest.TestResult): super(TestResult, self).__init__() self.skip_reasons = {} self.__now = None + self._tags = TagContext() # -- Start: As per python 2.7 -- self.expectedFailures = [] self.unexpectedSuccesses = [] @@ -218,6 +189,27 @@ class TestResult(unittest.TestResult): New in python 2.7 """ + def startTest(self, test): + super(TestResult, self).startTest(test) + self._tags = TagContext(self._tags) + + def stopTest(self, test): + self._tags = self._tags.parent + super(TestResult, self).stopTest(test) + + @property + def current_tags(self): + """The currently set tags.""" + return self._tags.get_current_tags() + + def tags(self, new_tags, gone_tags): + """Add and remove tags from the test. + + :param new_tags: A set of tags to be added to the stream. + :param gone_tags: A set of tags to be removed from the stream. + """ + self._tags.change_tags(new_tags, gone_tags) + def time(self, a_datetime): """Provide a timestamp to represent the current time. @@ -244,7 +236,7 @@ class MultiTestResult(TestResult): """A test result that dispatches to many test results.""" def __init__(self, *results): - TestResult.__init__(self) + super(MultiTestResult, self).__init__() self._results = list(map(ExtendedToOriginalDecorator, results)) def __repr__(self): @@ -257,9 +249,11 @@ class MultiTestResult(TestResult): for result in self._results) def startTest(self, test): + super(MultiTestResult, self).startTest(test) return self._dispatch('startTest', test) def stopTest(self, test): + super(MultiTestResult, self).stopTest(test) return self._dispatch('stopTest', test) def addError(self, test, error=None, details=None): @@ -282,11 +276,16 @@ class MultiTestResult(TestResult): return self._dispatch('addUnexpectedSuccess', test, details=details) def startTestRun(self): + super(MultiTestResult, self).startTestRun() return self._dispatch('startTestRun') def stopTestRun(self): return self._dispatch('stopTestRun') + def tags(self, new_tags, gone_tags): + super(MultiTestResult, self).tags(new_tags, gone_tags) + return self._dispatch('tags', new_tags, gone_tags) + def time(self, a_datetime): return self._dispatch('time', a_datetime) @@ -358,46 +357,67 @@ class TextTestResult(TestResult): class ThreadsafeForwardingResult(TestResult): """A TestResult which ensures the target does not receive mixed up calls. - This is used when receiving test results from multiple sources, and batches - up all the activity for a single test into a thread-safe batch where all - other ThreadsafeForwardingResult objects sharing the same semaphore will be - locked out. + Multiple ``ThreadsafeForwardingResults`` can forward to the same target + result, and that target result will only ever receive the complete set of + events for one test at a time. + + This is enforced using a semaphore, which further guarantees that tests + will be sent atomically even if the ``ThreadsafeForwardingResults`` are in + different threads. - Typical use of ThreadsafeForwardingResult involves creating one - ThreadsafeForwardingResult per thread in a ConcurrentTestSuite. These - forward to the TestResult that the ConcurrentTestSuite run method was - called with. + ``ThreadsafeForwardingResult`` is typically used by + ``ConcurrentTestSuite``, which creates one ``ThreadsafeForwardingResult`` + per thread, each of which wraps of the TestResult that + ``ConcurrentTestSuite.run()`` is called with. - target.done() is called once for each ThreadsafeForwardingResult that - forwards to the same target. If the target's done() takes special action, - care should be taken to accommodate this. + target.startTestRun() and target.stopTestRun() are called once for each + ThreadsafeForwardingResult that forwards to the same target. If the target + takes special action on these events, it should take care to accommodate + this. + + time() and tags() calls are batched to be adjacent to the test result and + in the case of tags() are coerced into test-local scope, avoiding the + opportunity for bugs around global state in the target. """ def __init__(self, target, semaphore): """Create a ThreadsafeForwardingResult forwarding to target. - :param target: A TestResult. - :param semaphore: A threading.Semaphore with limit 1. + :param target: A ``TestResult``. + :param semaphore: A ``threading.Semaphore`` with limit 1. """ TestResult.__init__(self) self.result = ExtendedToOriginalDecorator(target) self.semaphore = semaphore + self._test_start = None + self._global_tags = set(), set() + self._test_tags = set(), set() def __repr__(self): return '<%s %r>' % (self.__class__.__name__, self.result) + def _any_tags(self, tags): + return bool(tags[0] or tags[1]) + def _add_result_with_semaphore(self, method, test, *args, **kwargs): + now = self._now() self.semaphore.acquire() try: self.result.time(self._test_start) self.result.startTest(test) - self.result.time(self._now()) + self.result.time(now) + if self._any_tags(self._global_tags): + self.result.tags(*self._global_tags) + if self._any_tags(self._test_tags): + self.result.tags(*self._test_tags) + self._test_tags = set(), set() try: method(test, *args, **kwargs) finally: self.result.stopTest(test) finally: self.semaphore.release() + self._test_start = None def addError(self, test, err=None, details=None): self._add_result_with_semaphore(self.result.addError, @@ -424,6 +444,7 @@ class ThreadsafeForwardingResult(TestResult): test, details=details) def startTestRun(self): + super(ThreadsafeForwardingResult, self).startTestRun() self.semaphore.acquire() try: self.result.startTestRun() @@ -451,6 +472,27 @@ class ThreadsafeForwardingResult(TestResult): def wasSuccessful(self): return self.result.wasSuccessful() + def tags(self, new_tags, gone_tags): + """See `TestResult`.""" + super(ThreadsafeForwardingResult, self).tags(new_tags, gone_tags) + if self._test_start is not None: + self._test_tags = _merge_tags( + self._test_tags, (new_tags, gone_tags)) + else: + self._global_tags = _merge_tags( + self._global_tags, (new_tags, gone_tags)) + + +def _merge_tags(existing, changed): + new_tags, gone_tags = changed + result_new = set(existing[0]) + result_gone = set(existing[1]) + result_new.update(new_tags) + result_new.difference_update(gone_tags) + result_gone.update(gone_tags) + result_gone.difference_update(new_tags) + return result_new, result_gone + class ExtendedToOriginalDecorator(object): """Permit new TestResult API code to degrade gracefully with old results. @@ -464,6 +506,7 @@ class ExtendedToOriginalDecorator(object): def __init__(self, decorated): self.decorated = decorated + self._tags = TagContext() def __repr__(self): return '<%s %r>' % (self.__class__.__name__, self.decorated) @@ -516,7 +559,7 @@ class ExtendedToOriginalDecorator(object): except TypeError: # extract the reason if it's available try: - reason = ''.join(details['reason'].iter_text()) + reason = details['reason'].as_text() except KeyError: reason = _details_to_str(details) return addSkip(test, reason) @@ -560,6 +603,11 @@ class ExtendedToOriginalDecorator(object): _StringException(_details_to_str(details, special='traceback')), None) + @property + def current_tags(self): + return getattr( + self.decorated, 'current_tags', self._tags.get_current_tags()) + def done(self): try: return self.decorated.done() @@ -577,9 +625,11 @@ class ExtendedToOriginalDecorator(object): return self.decorated.shouldStop def startTest(self, test): + self._tags = TagContext(self._tags) return self.decorated.startTest(test) def startTestRun(self): + self._tags = TagContext() try: return self.decorated.startTestRun() except AttributeError: @@ -589,6 +639,7 @@ class ExtendedToOriginalDecorator(object): return self.decorated.stop() def stopTest(self, test): + self._tags = self._tags.parent return self.decorated.stopTest(test) def stopTestRun(self): @@ -599,9 +650,10 @@ class ExtendedToOriginalDecorator(object): def tags(self, new_tags, gone_tags): method = getattr(self.decorated, 'tags', None) - if method is None: - return - return method(new_tags, gone_tags) + if method is not None: + return method(new_tags, gone_tags) + else: + self._tags.change_tags(new_tags, gone_tags) def time(self, a_datetime): method = getattr(self.decorated, 'time', None) @@ -613,6 +665,170 @@ class ExtendedToOriginalDecorator(object): return self.decorated.wasSuccessful() +class TestResultDecorator(object): + """General pass-through decorator. + + This provides a base that other TestResults can inherit from to + gain basic forwarding functionality. + """ + + def __init__(self, decorated): + """Create a TestResultDecorator forwarding to decorated.""" + self.decorated = decorated + + def startTest(self, test): + return self.decorated.startTest(test) + + def startTestRun(self): + return self.decorated.startTestRun() + + def stopTest(self, test): + return self.decorated.stopTest(test) + + def stopTestRun(self): + return self.decorated.stopTestRun() + + def addError(self, test, err=None, details=None): + return self.decorated.addError(test, err, details=details) + + def addFailure(self, test, err=None, details=None): + return self.decorated.addFailure(test, err, details=details) + + def addSuccess(self, test, details=None): + return self.decorated.addSuccess(test, details=details) + + def addSkip(self, test, reason=None, details=None): + return self.decorated.addSkip(test, reason, details=details) + + def addExpectedFailure(self, test, err=None, details=None): + return self.decorated.addExpectedFailure(test, err, details=details) + + def addUnexpectedSuccess(self, test, details=None): + return self.decorated.addUnexpectedSuccess(test, details=details) + + def progress(self, offset, whence): + return self.decorated.progress(offset, whence) + + def wasSuccessful(self): + return self.decorated.wasSuccessful() + + @property + def current_tags(self): + return self.decorated.current_tags + + @property + def shouldStop(self): + return self.decorated.shouldStop + + def stop(self): + return self.decorated.stop() + + @property + def testsRun(self): + return self.decorated.testsRun + + def tags(self, new_tags, gone_tags): + return self.decorated.tags(new_tags, gone_tags) + + def time(self, a_datetime): + return self.decorated.time(a_datetime) + + +class Tagger(TestResultDecorator): + """Tag each test individually.""" + + def __init__(self, decorated, new_tags, gone_tags): + """Wrap 'decorated' such that each test is tagged. + + :param new_tags: Tags to be added for each test. + :param gone_tags: Tags to be removed for each test. + """ + super(Tagger, self).__init__(decorated) + self._new_tags = set(new_tags) + self._gone_tags = set(gone_tags) + + def startTest(self, test): + super(Tagger, self).startTest(test) + self.tags(self._new_tags, self._gone_tags) + + +class TestByTestResult(TestResult): + """Call something every time a test completes.""" + + def __init__(self, on_test): + """Construct a ``TestByTestResult``. + + :param on_test: A callable that take a test case, a status (one of + "success", "failure", "error", "skip", or "xfail"), a start time + (a ``datetime`` with timezone), a stop time, an iterable of tags, + and a details dict. Is called at the end of each test (i.e. on + ``stopTest``) with the accumulated values for that test. + """ + super(TestByTestResult, self).__init__() + self._on_test = on_test + + def startTest(self, test): + super(TestByTestResult, self).startTest(test) + self._start_time = self._now() + # There's no supported (i.e. tested) behaviour that relies on these + # being set, but it makes me more comfortable all the same. -- jml + self._status = None + self._details = None + self._stop_time = None + + def stopTest(self, test): + self._stop_time = self._now() + tags = set(self.current_tags) + super(TestByTestResult, self).stopTest(test) + self._on_test( + test=test, + status=self._status, + start_time=self._start_time, + stop_time=self._stop_time, + tags=tags, + details=self._details) + + def _err_to_details(self, test, err, details): + if details: + return details + return {'traceback': TracebackContent(err, test)} + + def addSuccess(self, test, details=None): + super(TestByTestResult, self).addSuccess(test) + self._status = 'success' + self._details = details + + def addFailure(self, test, err=None, details=None): + super(TestByTestResult, self).addFailure(test, err, details) + self._status = 'failure' + self._details = self._err_to_details(test, err, details) + + def addError(self, test, err=None, details=None): + super(TestByTestResult, self).addError(test, err, details) + self._status = 'error' + self._details = self._err_to_details(test, err, details) + + def addSkip(self, test, reason=None, details=None): + super(TestByTestResult, self).addSkip(test, reason, details) + self._status = 'skip' + if details is None: + details = {'reason': text_content(reason)} + elif reason: + # XXX: What if details already has 'reason' key? + details['reason'] = text_content(reason) + self._details = details + + def addExpectedFailure(self, test, err=None, details=None): + super(TestByTestResult, self).addExpectedFailure(test, err, details) + self._status = 'xfail' + self._details = self._err_to_details(test, err, details) + + def addUnexpectedSuccess(self, test, details=None): + super(TestByTestResult, self).addUnexpectedSuccess(test, details) + self._status = 'success' + self._details = details + + class _StringException(Exception): """An exception made from an arbitrary string.""" @@ -665,7 +881,7 @@ def _details_to_str(details, special=None): if content.content_type.type != 'text': binary_attachments.append((key, content.content_type)) continue - text = _u('').join(content.iter_text()).strip() + text = content.as_text().strip() if not text: empty_attachments.append(key) continue |