summaryrefslogtreecommitdiff
path: root/lib/subunit/python
diff options
context:
space:
mode:
authorJelmer Vernooij <jelmer@samba.org>2012-11-14 09:47:16 +0100
committerJelmer Vernooij <jelmer@samba.org>2012-11-14 12:11:57 +0100
commita53caea7a27c8616cabfc2e5bdf91a90e35891d5 (patch)
tree5e1ad6e0387c076cc41d14213efbe4457d861690 /lib/subunit/python
parent7b654a8c180a6467147189332916a5e56634b5af (diff)
downloadsamba-a53caea7a27c8616cabfc2e5bdf91a90e35891d5.tar.gz
samba-a53caea7a27c8616cabfc2e5bdf91a90e35891d5.tar.bz2
samba-a53caea7a27c8616cabfc2e5bdf91a90e35891d5.zip
subunit: Update to latest upstream version.
Autobuild-User(master): Jelmer Vernooij <jelmer@samba.org> Autobuild-Date(master): Wed Nov 14 12:11:58 CET 2012 on sn-devel-104
Diffstat (limited to 'lib/subunit/python')
-rw-r--r--lib/subunit/python/subunit/__init__.py71
-rw-r--r--lib/subunit/python/subunit/filters.py125
-rw-r--r--lib/subunit/python/subunit/iso8601.py2
-rw-r--r--lib/subunit/python/subunit/test_results.py404
-rwxr-xr-xlib/subunit/python/subunit/tests/sample-script.py20
-rwxr-xr-xlib/subunit/python/subunit/tests/sample-two-script.py8
-rw-r--r--lib/subunit/python/subunit/tests/test_run.py4
-rw-r--r--lib/subunit/python/subunit/tests/test_subunit_filter.py170
-rw-r--r--lib/subunit/python/subunit/tests/test_test_protocol.py38
-rw-r--r--lib/subunit/python/subunit/tests/test_test_results.py272
10 files changed, 950 insertions, 164 deletions
diff --git a/lib/subunit/python/subunit/__init__.py b/lib/subunit/python/subunit/__init__.py
index b4c939756f..6015c0e68c 100644
--- a/lib/subunit/python/subunit/__init__.py
+++ b/lib/subunit/python/subunit/__init__.py
@@ -121,8 +121,14 @@ import re
import subprocess
import sys
import unittest
+if sys.version_info > (3, 0):
+ from io import UnsupportedOperation as _UnsupportedOperation
+else:
+ _UnsupportedOperation = AttributeError
+
from testtools import content, content_type, ExtendedToOriginalDecorator
+from testtools.content import TracebackContent
from testtools.compat import _b, _u, BytesIO, StringIO
try:
from testtools.testresult.real import _StringException
@@ -182,9 +188,15 @@ def tags_to_new_gone(tags):
class DiscardStream(object):
"""A filelike object which discards what is written to it."""
+ def fileno(self):
+ raise _UnsupportedOperation()
+
def write(self, bytes):
pass
+ def read(self, len=0):
+ return _b('')
+
class _ParserState(object):
"""State for the subunit parser."""
@@ -599,8 +611,8 @@ class TestProtocolClient(testresult.TestResult):
def __init__(self, stream):
testresult.TestResult.__init__(self)
+ stream = _make_stream_binary(stream)
self._stream = stream
- _make_stream_binary(stream)
self._progress_fmt = _b("progress: ")
self._bytes_eol = _b("\n")
self._progress_plus = _b("+")
@@ -682,10 +694,9 @@ class TestProtocolClient(testresult.TestResult):
raise ValueError
if error is not None:
self._stream.write(self._start_simple)
- # XXX: this needs to be made much stricter, along the lines of
- # Martin[gz]'s work in testtools. Perhaps subunit can use that?
- for line in self._exc_info_to_unicode(error, test).splitlines():
- self._stream.write(("%s\n" % line).encode('utf8'))
+ tb_content = TracebackContent(error, test)
+ for bytes in tb_content.iter_bytes():
+ self._stream.write(bytes)
elif details is not None:
self._write_details(details)
else:
@@ -755,6 +766,15 @@ class TestProtocolClient(testresult.TestResult):
self._stream.write(self._progress_fmt + prefix + offset +
self._bytes_eol)
+ def tags(self, new_tags, gone_tags):
+ """Inform the client about tags added/removed from the stream."""
+ if not new_tags and not gone_tags:
+ return
+ tags = set([tag.encode('utf8') for tag in new_tags])
+ tags.update([_b("-") + tag.encode('utf8') for tag in gone_tags])
+ tag_line = _b("tags: ") + _b(" ").join(tags) + _b("\n")
+ self._stream.write(tag_line)
+
def time(self, a_datetime):
"""Inform the client of the time.
@@ -1122,7 +1142,7 @@ class ProtocolTestCase(object):
:seealso: TestProtocolServer (the subunit wire protocol parser).
"""
- def __init__(self, stream, passthrough=None, forward=False):
+ def __init__(self, stream, passthrough=None, forward=None):
"""Create a ProtocolTestCase reading from stream.
:param stream: A filelike object which a subunit stream can be read
@@ -1132,9 +1152,11 @@ class ProtocolTestCase(object):
:param forward: A stream to pass subunit input on to. If not supplied
subunit input is not forwarded.
"""
+ stream = _make_stream_binary(stream)
self._stream = stream
- _make_stream_binary(stream)
self._passthrough = passthrough
+ if forward is not None:
+ forward = _make_stream_binary(forward)
self._forward = forward
def __call__(self, result=None):
@@ -1217,11 +1239,6 @@ def get_default_formatter():
return stream
-if sys.version_info > (3, 0):
- from io import UnsupportedOperation as _NoFilenoError
-else:
- _NoFilenoError = AttributeError
-
def read_test_list(path):
"""Read a list of test ids from a file on disk.
@@ -1236,15 +1253,37 @@ def read_test_list(path):
def _make_stream_binary(stream):
- """Ensure that a stream will be binary safe. See _make_binary_on_windows."""
+ """Ensure that a stream will be binary safe. See _make_binary_on_windows.
+
+ :return: A binary version of the same stream (some streams cannot be
+ 'fixed' but can be unwrapped).
+ """
try:
fileno = stream.fileno()
- except _NoFilenoError:
- return
- _make_binary_on_windows(fileno)
+ except _UnsupportedOperation:
+ pass
+ else:
+ _make_binary_on_windows(fileno)
+ return _unwrap_text(stream)
def _make_binary_on_windows(fileno):
"""Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
if sys.platform == "win32":
import msvcrt
msvcrt.setmode(fileno, os.O_BINARY)
+
+
+def _unwrap_text(stream):
+ """Unwrap stream if it is a text stream to get the original buffer."""
+ if sys.version_info > (3, 0):
+ try:
+ # Read streams
+ if type(stream.read(0)) is str:
+ return stream.buffer
+ except (_UnsupportedOperation, IOError):
+ # Cannot read from the stream: try via writes
+ try:
+ stream.write(_b(''))
+ except TypeError:
+ return stream.buffer
+ return stream
diff --git a/lib/subunit/python/subunit/filters.py b/lib/subunit/python/subunit/filters.py
new file mode 100644
index 0000000000..dc3fd8aedb
--- /dev/null
+++ b/lib/subunit/python/subunit/filters.py
@@ -0,0 +1,125 @@
+# subunit: extensions to python unittest to get test results from subprocesses.
+# Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
+#
+# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
+# license at the users choice. A copy of both licenses are available in the
+# project source as Apache-2.0 and BSD. You may not use this file except in
+# compliance with one of these two licences.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# license you chose for the specific language governing permissions and
+# limitations under that license.
+#
+
+
+from optparse import OptionParser
+import sys
+
+from subunit import DiscardStream, ProtocolTestCase
+
+
+def make_options(description):
+ parser = OptionParser(description=description)
+ parser.add_option(
+ "--no-passthrough", action="store_true",
+ help="Hide all non subunit input.", default=False,
+ dest="no_passthrough")
+ parser.add_option(
+ "-o", "--output-to",
+ help="Send the output to this path rather than stdout.")
+ parser.add_option(
+ "-f", "--forward", action="store_true", default=False,
+ help="Forward subunit stream on stdout.")
+ return parser
+
+
+def run_tests_from_stream(input_stream, result, passthrough_stream=None,
+ forward_stream=None):
+ """Run tests from a subunit input stream through 'result'.
+
+ :param input_stream: A stream containing subunit input.
+ :param result: A TestResult that will receive the test events.
+ :param passthrough_stream: All non-subunit input received will be
+ sent to this stream. If not provided, uses the ``TestProtocolServer``
+ default, which is ``sys.stdout``.
+ :param forward_stream: All subunit input received will be forwarded
+ to this stream. If not provided, uses the ``TestProtocolServer``
+ default, which is to not forward any input.
+ """
+ test = ProtocolTestCase(
+ input_stream, passthrough=passthrough_stream,
+ forward=forward_stream)
+ result.startTestRun()
+ test.run(result)
+ result.stopTestRun()
+
+
+def filter_by_result(result_factory, output_path, passthrough, forward,
+ input_stream=sys.stdin):
+ """Filter an input stream using a test result.
+
+ :param result_factory: A callable that when passed an output stream
+ returns a TestResult. It is expected that this result will output
+ to the given stream.
+ :param output_path: A path send output to. If None, output will be go
+ to ``sys.stdout``.
+ :param passthrough: If True, all non-subunit input will be sent to
+ ``sys.stdout``. If False, that input will be discarded.
+ :param forward: If True, all subunit input will be forwarded directly to
+ ``sys.stdout`` as well as to the ``TestResult``.
+ :param input_stream: The source of subunit input. Defaults to
+ ``sys.stdin``.
+ :return: A test result with the resultts of the run.
+ """
+ if passthrough:
+ passthrough_stream = sys.stdout
+ else:
+ passthrough_stream = DiscardStream()
+
+ if forward:
+ forward_stream = sys.stdout
+ else:
+ forward_stream = DiscardStream()
+
+ if output_path is None:
+ output_to = sys.stdout
+ else:
+ output_to = file(output_path, 'wb')
+
+ try:
+ result = result_factory(output_to)
+ run_tests_from_stream(
+ input_stream, result, passthrough_stream, forward_stream)
+ finally:
+ if output_path:
+ output_to.close()
+ return result
+
+
+def run_filter_script(result_factory, description, post_run_hook=None):
+ """Main function for simple subunit filter scripts.
+
+ Many subunit filter scripts take a stream of subunit input and use a
+ TestResult to handle the events generated by that stream. This function
+ wraps a lot of the boiler-plate around that by making a script with
+ options for handling passthrough information and stream forwarding, and
+ that will exit with a successful return code (i.e. 0) if the input stream
+ represents a successful test run.
+
+ :param result_factory: A callable that takes an output stream and returns
+ a test result that outputs to that stream.
+ :param description: A description of the filter script.
+ """
+ parser = make_options(description)
+ (options, args) = parser.parse_args()
+ result = filter_by_result(
+ result_factory, options.output_to, not options.no_passthrough,
+ options.forward)
+ if post_run_hook:
+ post_run_hook(result)
+ if result.wasSuccessful():
+ sys.exit(0)
+ else:
+ sys.exit(1)
diff --git a/lib/subunit/python/subunit/iso8601.py b/lib/subunit/python/subunit/iso8601.py
index cbe9a3b3eb..07855d0975 100644
--- a/lib/subunit/python/subunit/iso8601.py
+++ b/lib/subunit/python/subunit/iso8601.py
@@ -127,7 +127,7 @@ def parse_date(datestring, default_timezone=UTC):
if groups["fraction"] is None:
groups["fraction"] = 0
else:
- groups["fraction"] = int(float("0.%s" % groups["fraction"]) * 1e6)
+ groups["fraction"] = int(float("0.%s" % groups["fraction"].decode()) * 1e6)
return datetime(int(groups["year"]), int(groups["month"]), int(groups["day"]),
int(groups["hour"]), int(groups["minute"]), int(groups["second"]),
int(groups["fraction"]), tz)
diff --git a/lib/subunit/python/subunit/test_results.py b/lib/subunit/python/subunit/test_results.py
index 33fb50e073..c00a2d3e97 100644
--- a/lib/subunit/python/subunit/test_results.py
+++ b/lib/subunit/python/subunit/test_results.py
@@ -16,9 +16,15 @@
"""TestResult helper classes used to by subunit."""
+import csv
import datetime
import testtools
+from testtools.compat import all
+from testtools.content import (
+ text_content,
+ TracebackContent,
+ )
from subunit import iso8601
@@ -34,6 +40,9 @@ class TestResultDecorator(object):
or features by degrading them.
"""
+ # XXX: Since lp:testtools r250, this is in testtools. Once it's released,
+ # we should gut this and just use that.
+
def __init__(self, decorated):
"""Create a TestResultDecorator forwarding to decorated."""
# Make every decorator degrade gracefully.
@@ -200,34 +209,44 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator):
return self.decorated.time(a_datetime)
-class TagCollapsingDecorator(TestResultDecorator):
- """Collapses many 'tags' calls into one where possible."""
+class TagsMixin(object):
- def __init__(self, result):
- super(TagCollapsingDecorator, self).__init__(result)
- # The (new, gone) tags for the current test.
- self._current_test_tags = None
+ def __init__(self):
+ self._clear_tags()
- def startTest(self, test):
- """Start a test.
+ def _clear_tags(self):
+ self._global_tags = set(), set()
+ self._test_tags = None
- Not directly passed to the client, but used for handling of tags
- correctly.
- """
- self.decorated.startTest(test)
- self._current_test_tags = set(), set()
+ def _get_active_tags(self):
+ global_new, global_gone = self._global_tags
+ if self._test_tags is None:
+ return set(global_new)
+ test_new, test_gone = self._test_tags
+ return global_new.difference(test_gone).union(test_new)
- def stopTest(self, test):
- """Stop a test.
+ def _get_current_scope(self):
+ if self._test_tags:
+ return self._test_tags
+ return self._global_tags
- Not directly passed to the client, but used for handling of tags
- correctly.
- """
- # Tags to output for this test.
- if self._current_test_tags[0] or self._current_test_tags[1]:
- self.decorated.tags(*self._current_test_tags)
- self.decorated.stopTest(test)
- self._current_test_tags = None
+ def _flush_current_scope(self, tag_receiver):
+ new_tags, gone_tags = self._get_current_scope()
+ if new_tags or gone_tags:
+ tag_receiver.tags(new_tags, gone_tags)
+ if self._test_tags:
+ self._test_tags = set(), set()
+ else:
+ self._global_tags = set(), set()
+
+ def startTestRun(self):
+ self._clear_tags()
+
+ def startTest(self, test):
+ self._test_tags = set(), set()
+
+ def stopTest(self, test):
+ self._test_tags = None
def tags(self, new_tags, gone_tags):
"""Handle tag instructions.
@@ -238,14 +257,25 @@ class TagCollapsingDecorator(TestResultDecorator):
:param new_tags: Tags to add,
:param gone_tags: Tags to remove.
"""
- if self._current_test_tags is not None:
- # gather the tags until the test stops.
- self._current_test_tags[0].update(new_tags)
- self._current_test_tags[0].difference_update(gone_tags)
- self._current_test_tags[1].update(gone_tags)
- self._current_test_tags[1].difference_update(new_tags)
- else:
- return self.decorated.tags(new_tags, gone_tags)
+ current_new_tags, current_gone_tags = self._get_current_scope()
+ current_new_tags.update(new_tags)
+ current_new_tags.difference_update(gone_tags)
+ current_gone_tags.update(gone_tags)
+ current_gone_tags.difference_update(new_tags)
+
+
+class TagCollapsingDecorator(HookedTestResultDecorator, TagsMixin):
+ """Collapses many 'tags' calls into one where possible."""
+
+ def __init__(self, result):
+ super(TagCollapsingDecorator, self).__init__(result)
+ self._clear_tags()
+
+ def _before_event(self):
+ self._flush_current_scope(self.decorated)
+
+ def tags(self, new_tags, gone_tags):
+ TagsMixin.tags(self, new_tags, gone_tags)
class TimeCollapsingDecorator(HookedTestResultDecorator):
@@ -273,93 +303,58 @@ class TimeCollapsingDecorator(HookedTestResultDecorator):
self._last_received_time = a_time
-def all_true(bools):
- """Return True if all of 'bools' are True. False otherwise."""
- for b in bools:
- if not b:
- return False
- return True
+def and_predicates(predicates):
+ """Return a predicate that is true iff all predicates are true."""
+ # XXX: Should probably be in testtools to be better used by matchers. jml
+ return lambda *args, **kwargs: all(p(*args, **kwargs) for p in predicates)
-class TestResultFilter(TestResultDecorator):
- """A pyunit TestResult interface implementation which filters tests.
+def make_tag_filter(with_tags, without_tags):
+ """Make a callback that checks tests against tags."""
- Tests that pass the filter are handed on to another TestResult instance
- for further processing/reporting. To obtain the filtered results,
- the other instance must be interrogated.
+ with_tags = with_tags and set(with_tags) or None
+ without_tags = without_tags and set(without_tags) or None
- :ivar result: The result that tests are passed to after filtering.
- :ivar filter_predicate: The callback run to decide whether to pass
- a result.
- """
+ def check_tags(test, outcome, err, details, tags):
+ if with_tags and not with_tags <= tags:
+ return False
+ if without_tags and bool(without_tags & tags):
+ return False
+ return True
- def __init__(self, result, filter_error=False, filter_failure=False,
- filter_success=True, filter_skip=False, filter_xfail=False,
- filter_predicate=None, fixup_expected_failures=None):
- """Create a FilterResult object filtering to result.
+ return check_tags
- :param filter_error: Filter out errors.
- :param filter_failure: Filter out failures.
- :param filter_success: Filter out successful tests.
- :param filter_skip: Filter out skipped tests.
- :param filter_xfail: Filter out expected failure tests.
- :param filter_predicate: A callable taking (test, outcome, err,
- details) and returning True if the result should be passed
- through. err and details may be none if no error or extra
- metadata is available. outcome is the name of the outcome such
- as 'success' or 'failure'.
- :param fixup_expected_failures: Set of test ids to consider known
- failing.
- """
- super(TestResultFilter, self).__init__(result)
+
+class _PredicateFilter(TestResultDecorator, TagsMixin):
+
+ def __init__(self, result, predicate):
+ super(_PredicateFilter, self).__init__(result)
+ self._clear_tags()
self.decorated = TimeCollapsingDecorator(
TagCollapsingDecorator(self.decorated))
- predicates = []
- if filter_error:
- predicates.append(lambda t, outcome, e, d: outcome != 'error')
- if filter_failure:
- predicates.append(lambda t, outcome, e, d: outcome != 'failure')
- if filter_success:
- predicates.append(lambda t, outcome, e, d: outcome != 'success')
- if filter_skip:
- predicates.append(lambda t, outcome, e, d: outcome != 'skip')
- if filter_xfail:
- predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
- if filter_predicate is not None:
- predicates.append(filter_predicate)
- self.filter_predicate = (
- lambda test, outcome, err, details:
- all_true(p(test, outcome, err, details) for p in predicates))
+ self._predicate = predicate
# The current test (for filtering tags)
self._current_test = None
# Has the current test been filtered (for outputting test tags)
self._current_test_filtered = None
# Calls to this result that we don't know whether to forward on yet.
self._buffered_calls = []
- if fixup_expected_failures is None:
- self._fixup_expected_failures = frozenset()
- else:
- self._fixup_expected_failures = fixup_expected_failures
+
+ def filter_predicate(self, test, outcome, error, details):
+ return self._predicate(
+ test, outcome, error, details, self._get_active_tags())
def addError(self, test, err=None, details=None):
if (self.filter_predicate(test, 'error', err, details)):
- if self._failure_expected(test):
- self._buffered_calls.append(
- ('addExpectedFailure', [test, err], {'details': details}))
- else:
- self._buffered_calls.append(
- ('addError', [test, err], {'details': details}))
+ self._buffered_calls.append(
+ ('addError', [test, err], {'details': details}))
else:
self._filtered()
def addFailure(self, test, err=None, details=None):
if (self.filter_predicate(test, 'failure', err, details)):
- if self._failure_expected(test):
- self._buffered_calls.append(
- ('addExpectedFailure', [test, err], {'details': details}))
- else:
- self._buffered_calls.append(
- ('addFailure', [test, err], {'details': details}))
+ self._buffered_calls.append(
+ ('addFailure', [test, err], {'details': details}))
else:
self._filtered()
@@ -370,17 +365,6 @@ class TestResultFilter(TestResultDecorator):
else:
self._filtered()
- def addSuccess(self, test, details=None):
- if (self.filter_predicate(test, 'success', None, details)):
- if self._failure_expected(test):
- self._buffered_calls.append(
- ('addUnexpectedSuccess', [test], {'details': details}))
- else:
- self._buffered_calls.append(
- ('addSuccess', [test], {'details': details}))
- else:
- self._filtered()
-
def addExpectedFailure(self, test, err=None, details=None):
if self.filter_predicate(test, 'expectedfailure', err, details):
self._buffered_calls.append(
@@ -392,18 +376,23 @@ class TestResultFilter(TestResultDecorator):
self._buffered_calls.append(
('addUnexpectedSuccess', [test], {'details': details}))
+ def addSuccess(self, test, details=None):
+ if (self.filter_predicate(test, 'success', None, details)):
+ self._buffered_calls.append(
+ ('addSuccess', [test], {'details': details}))
+ else:
+ self._filtered()
+
def _filtered(self):
self._current_test_filtered = True
- def _failure_expected(self, test):
- return (test.id() in self._fixup_expected_failures)
-
def startTest(self, test):
"""Start a test.
Not directly passed to the client, but used for handling of tags
correctly.
"""
+ TagsMixin.startTest(self, test)
self._current_test = test
self._current_test_filtered = False
self._buffered_calls.append(('startTest', [test], {}))
@@ -415,19 +404,23 @@ class TestResultFilter(TestResultDecorator):
correctly.
"""
if not self._current_test_filtered:
- # Tags to output for this test.
for method, args, kwargs in self._buffered_calls:
getattr(self.decorated, method)(*args, **kwargs)
self.decorated.stopTest(test)
self._current_test = None
self._current_test_filtered = None
self._buffered_calls = []
+ TagsMixin.stopTest(self, test)
- def time(self, a_time):
+ def tags(self, new_tags, gone_tags):
+ TagsMixin.tags(self, new_tags, gone_tags)
if self._current_test is not None:
- self._buffered_calls.append(('time', [a_time], {}))
+ self._buffered_calls.append(('tags', [new_tags, gone_tags], {}))
else:
- return self.decorated.time(a_time)
+ return super(_PredicateFilter, self).tags(new_tags, gone_tags)
+
+ def time(self, a_time):
+ return self.decorated.time(a_time)
def id_to_orig_id(self, id):
if id.startswith("subunit.RemotedTestCase."):
@@ -435,6 +428,95 @@ class TestResultFilter(TestResultDecorator):
return id
+class TestResultFilter(TestResultDecorator):
+ """A pyunit TestResult interface implementation which filters tests.
+
+ Tests that pass the filter are handed on to another TestResult instance
+ for further processing/reporting. To obtain the filtered results,
+ the other instance must be interrogated.
+
+ :ivar result: The result that tests are passed to after filtering.
+ :ivar filter_predicate: The callback run to decide whether to pass
+ a result.
+ """
+
+ def __init__(self, result, filter_error=False, filter_failure=False,
+ filter_success=True, filter_skip=False, filter_xfail=False,
+ filter_predicate=None, fixup_expected_failures=None):
+ """Create a FilterResult object filtering to result.
+
+ :param filter_error: Filter out errors.
+ :param filter_failure: Filter out failures.
+ :param filter_success: Filter out successful tests.
+ :param filter_skip: Filter out skipped tests.
+ :param filter_xfail: Filter out expected failure tests.
+ :param filter_predicate: A callable taking (test, outcome, err,
+ details, tags) and returning True if the result should be passed
+ through. err and details may be none if no error or extra
+ metadata is available. outcome is the name of the outcome such
+ as 'success' or 'failure'. tags is new in 0.0.8; 0.0.7 filters
+ are still supported but should be updated to accept the tags
+ parameter for efficiency.
+ :param fixup_expected_failures: Set of test ids to consider known
+ failing.
+ """
+ predicates = []
+ if filter_error:
+ predicates.append(
+ lambda t, outcome, e, d, tags: outcome != 'error')
+ if filter_failure:
+ predicates.append(
+ lambda t, outcome, e, d, tags: outcome != 'failure')
+ if filter_success:
+ predicates.append(
+ lambda t, outcome, e, d, tags: outcome != 'success')
+ if filter_skip:
+ predicates.append(
+ lambda t, outcome, e, d, tags: outcome != 'skip')
+ if filter_xfail:
+ predicates.append(
+ lambda t, outcome, e, d, tags: outcome != 'expectedfailure')
+ if filter_predicate is not None:
+ def compat(test, outcome, error, details, tags):
+ # 0.0.7 and earlier did not support the 'tags' parameter.
+ try:
+ return filter_predicate(
+ test, outcome, error, details, tags)
+ except TypeError:
+ return filter_predicate(test, outcome, error, details)
+ predicates.append(compat)
+ predicate = and_predicates(predicates)
+ super(TestResultFilter, self).__init__(
+ _PredicateFilter(result, predicate))
+ if fixup_expected_failures is None:
+ self._fixup_expected_failures = frozenset()
+ else:
+ self._fixup_expected_failures = fixup_expected_failures
+
+ def addError(self, test, err=None, details=None):
+ if self._failure_expected(test):
+ self.addExpectedFailure(test, err=err, details=details)
+ else:
+ super(TestResultFilter, self).addError(
+ test, err=err, details=details)
+
+ def addFailure(self, test, err=None, details=None):
+ if self._failure_expected(test):
+ self.addExpectedFailure(test, err=err, details=details)
+ else:
+ super(TestResultFilter, self).addFailure(
+ test, err=err, details=details)
+
+ def addSuccess(self, test, details=None):
+ if self._failure_expected(test):
+ self.addUnexpectedSuccess(test, details=details)
+ else:
+ super(TestResultFilter, self).addSuccess(test, details=details)
+
+ def _failure_expected(self, test):
+ return (test.id() in self._fixup_expected_failures)
+
+
class TestIdPrintingResult(testtools.TestResult):
def __init__(self, stream, show_times=False):
@@ -493,3 +575,97 @@ class TestIdPrintingResult(testtools.TestResult):
def wasSuccessful(self):
"Tells whether or not this result was a success"
return self.failed_tests == 0
+
+
+class TestByTestResult(testtools.TestResult):
+ """Call something every time a test completes."""
+
+# XXX: In testtools since lp:testtools r249. Once that's released, just
+# import that.
+
+ def __init__(self, on_test):
+ """Construct a ``TestByTestResult``.
+
+ :param on_test: A callable that take a test case, a status (one of
+ "success", "failure", "error", "skip", or "xfail"), a start time
+ (a ``datetime`` with timezone), a stop time, an iterable of tags,
+ and a details dict. Is called at the end of each test (i.e. on
+ ``stopTest``) with the accumulated values for that test.
+ """
+ super(TestByTestResult, self).__init__()
+ self._on_test = on_test
+
+ def startTest(self, test):
+ super(TestByTestResult, self).startTest(test)
+ self._start_time = self._now()
+ # There's no supported (i.e. tested) behaviour that relies on these
+ # being set, but it makes me more comfortable all the same. -- jml
+ self._status = None
+ self._details = None
+ self._stop_time = None
+
+ def stopTest(self, test):
+ self._stop_time = self._now()
+ super(TestByTestResult, self).stopTest(test)
+ self._on_test(
+ test=test,
+ status=self._status,
+ start_time=self._start_time,
+ stop_time=self._stop_time,
+ # current_tags is new in testtools 0.9.13.
+ tags=getattr(self, 'current_tags', None),
+ details=self._details)
+
+ def _err_to_details(self, test, err, details):
+ if details:
+ return details
+ return {'traceback': TracebackContent(err, test)}
+
+ def addSuccess(self, test, details=None):
+ super(TestByTestResult, self).addSuccess(test)
+ self._status = 'success'
+ self._details = details
+
+ def addFailure(self, test, err=None, details=None):
+ super(TestByTestResult, self).addFailure(test, err, details)
+ self._status = 'failure'
+ self._details = self._err_to_details(test, err, details)
+
+ def addError(self, test, err=None, details=None):
+ super(TestByTestResult, self).addError(test, err, details)
+ self._status = 'error'
+ self._details = self._err_to_details(test, err, details)
+
+ def addSkip(self, test, reason=None, details=None):
+ super(TestByTestResult, self).addSkip(test, reason, details)
+ self._status = 'skip'
+ if details is None:
+ details = {'reason': text_content(reason)}
+ elif reason:
+ # XXX: What if details already has 'reason' key?
+ details['reason'] = text_content(reason)
+ self._details = details
+
+ def addExpectedFailure(self, test, err=None, details=None):
+ super(TestByTestResult, self).addExpectedFailure(test, err, details)
+ self._status = 'xfail'
+ self._details = self._err_to_details(test, err, details)
+
+ def addUnexpectedSuccess(self, test, details=None):
+ super(TestByTestResult, self).addUnexpectedSuccess(test, details)
+ self._status = 'success'
+ self._details = details
+
+
+class CsvResult(TestByTestResult):
+
+ def __init__(self, stream):
+ super(CsvResult, self).__init__(self._on_test)
+ self._write_row = csv.writer(stream).writerow
+
+ def _on_test(self, test, status, start_time, stop_time, tags, details):
+ self._write_row([test.id(), status, start_time, stop_time])
+
+ def startTestRun(self):
+ super(CsvResult, self).startTestRun()
+ self._write_row(['test', 'status', 'start_time', 'stop_time'])
diff --git a/lib/subunit/python/subunit/tests/sample-script.py b/lib/subunit/python/subunit/tests/sample-script.py
index 618e4952d7..91838f6d6f 100755
--- a/lib/subunit/python/subunit/tests/sample-script.py
+++ b/lib/subunit/python/subunit/tests/sample-script.py
@@ -7,15 +7,15 @@ if len(sys.argv) == 2:
# subunit.tests.test_test_protocol.TestExecTestCase.test_sample_method_args
# uses this code path to be sure that the arguments were passed to
# sample-script.py
- print "test fail"
- print "error fail"
+ print("test fail")
+ print("error fail")
sys.exit(0)
-print "test old mcdonald"
-print "success old mcdonald"
-print "test bing crosby"
-print "failure bing crosby ["
-print "foo.c:53:ERROR invalid state"
-print "]"
-print "test an error"
-print "error an error"
+print("test old mcdonald")
+print("success old mcdonald")
+print("test bing crosby")
+print("failure bing crosby [")
+print("foo.c:53:ERROR invalid state")
+print("]")
+print("test an error")
+print("error an error")
sys.exit(0)
diff --git a/lib/subunit/python/subunit/tests/sample-two-script.py b/lib/subunit/python/subunit/tests/sample-two-script.py
index d5550842bf..fc73dfc409 100755
--- a/lib/subunit/python/subunit/tests/sample-two-script.py
+++ b/lib/subunit/python/subunit/tests/sample-two-script.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
import sys
-print "test old mcdonald"
-print "success old mcdonald"
-print "test bing crosby"
-print "success bing crosby"
+print("test old mcdonald")
+print("success old mcdonald")
+print("test bing crosby")
+print("success bing crosby")
sys.exit(0)
diff --git a/lib/subunit/python/subunit/tests/test_run.py b/lib/subunit/python/subunit/tests/test_run.py
index 5a96bcf30e..10519ed086 100644
--- a/lib/subunit/python/subunit/tests/test_run.py
+++ b/lib/subunit/python/subunit/tests/test_run.py
@@ -14,7 +14,7 @@
# limitations under that license.
#
-from cStringIO import StringIO
+from testtools.compat import BytesIO
import unittest
from testtools import PlaceHolder
@@ -42,7 +42,7 @@ class TimeCollectingTestResult(unittest.TestResult):
class TestSubunitTestRunner(unittest.TestCase):
def test_includes_timing_output(self):
- io = StringIO()
+ io = BytesIO()
runner = SubunitTestRunner(stream=io)
test = PlaceHolder('name')
runner.run(test)
diff --git a/lib/subunit/python/subunit/tests/test_subunit_filter.py b/lib/subunit/python/subunit/tests/test_subunit_filter.py
index 06754840eb..33b924824d 100644
--- a/lib/subunit/python/subunit/tests/test_subunit_filter.py
+++ b/lib/subunit/python/subunit/tests/test_subunit_filter.py
@@ -17,15 +17,18 @@
"""Tests for subunit.TestResultFilter."""
from datetime import datetime
+import os
+import subprocess
+import sys
from subunit import iso8601
import unittest
from testtools import TestCase
-from testtools.compat import _b, BytesIO, StringIO
+from testtools.compat import _b, BytesIO
from testtools.testresult.doubles import ExtendedTestResult
import subunit
-from subunit.test_results import TestResultFilter
+from subunit.test_results import make_tag_filter, TestResultFilter
class TestTestResultFilter(TestCase):
@@ -77,6 +80,40 @@ xfail todo
filtered_result.failures])
self.assertEqual(4, filtered_result.testsRun)
+ def test_tag_filter(self):
+ tag_filter = make_tag_filter(['global'], ['local'])
+ result = ExtendedTestResult()
+ result_filter = TestResultFilter(
+ result, filter_success=False, filter_predicate=tag_filter)
+ self.run_tests(result_filter)
+ tests_included = [
+ event[1] for event in result._events if event[0] == 'startTest']
+ tests_expected = list(map(
+ subunit.RemotedTestCase,
+ ['passed', 'error', 'skipped', 'todo']))
+ self.assertEquals(tests_expected, tests_included)
+
+ def test_tags_tracked_correctly(self):
+ tag_filter = make_tag_filter(['a'], [])
+ result = ExtendedTestResult()
+ result_filter = TestResultFilter(
+ result, filter_success=False, filter_predicate=tag_filter)
+ input_stream = _b(
+ "test: foo\n"
+ "tags: a\n"
+ "successful: foo\n"
+ "test: bar\n"
+ "successful: bar\n")
+ self.run_tests(result_filter, input_stream)
+ foo = subunit.RemotedTestCase('foo')
+ self.assertEquals(
+ [('startTest', foo),
+ ('tags', set(['a']), set()),
+ ('addSuccess', foo),
+ ('stopTest', foo),
+ ],
+ result._events)
+
def test_exclude_errors(self):
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result, filter_error=True)
@@ -151,6 +188,8 @@ xfail todo
def test_filter_predicate(self):
"""You can filter by predicate callbacks"""
+ # 0.0.7 and earlier did not support the 'tags' parameter, so we need
+ # to test that we still support behaviour without it.
filtered_result = unittest.TestResult()
def filter_cb(test, outcome, err, details):
return outcome == 'success'
@@ -161,6 +200,18 @@ xfail todo
# Only success should pass
self.assertEqual(1, filtered_result.testsRun)
+ def test_filter_predicate_with_tags(self):
+ """You can filter by predicate callbacks that accept tags"""
+ filtered_result = unittest.TestResult()
+ def filter_cb(test, outcome, err, details, tags):
+ return outcome == 'success'
+ result_filter = TestResultFilter(filtered_result,
+ filter_predicate=filter_cb,
+ filter_success=False)
+ self.run_tests(result_filter)
+ # Only success should pass
+ self.assertEqual(1, filtered_result.testsRun)
+
def test_time_ordering_preserved(self):
# Passing a subunit stream through TestResultFilter preserves the
# relative ordering of 'time' directives and any other subunit
@@ -179,14 +230,41 @@ xfail todo
result_filter = TestResultFilter(result)
self.run_tests(result_filter, subunit_stream)
foo = subunit.RemotedTestCase('foo')
- self.assertEquals(
+ self.maxDiff = None
+ self.assertEqual(
[('time', date_a),
- ('startTest', foo),
('time', date_b),
+ ('startTest', foo),
('addError', foo, {}),
('stopTest', foo),
('time', date_c)], result._events)
+ def test_time_passes_through_filtered_tests(self):
+ # Passing a subunit stream through TestResultFilter preserves 'time'
+ # directives even if a specific test is filtered out.
+ date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC)
+ date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC)
+ date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC)
+ subunit_stream = _b('\n'.join([
+ "time: %s",
+ "test: foo",
+ "time: %s",
+ "success: foo",
+ "time: %s",
+ ""]) % (date_a, date_b, date_c))
+ result = ExtendedTestResult()
+ result_filter = TestResultFilter(result)
+ result_filter.startTestRun()
+ self.run_tests(result_filter, subunit_stream)
+ result_filter.stopTestRun()
+ foo = subunit.RemotedTestCase('foo')
+ self.maxDiff = None
+ self.assertEqual(
+ [('startTestRun',),
+ ('time', date_a),
+ ('time', date_c),
+ ('stopTestRun',),], result._events)
+
def test_skip_preserved(self):
subunit_stream = _b('\n'.join([
"test: foo",
@@ -201,6 +279,90 @@ xfail todo
('addSkip', foo, {}),
('stopTest', foo), ], result._events)
+ if sys.version_info < (2, 7):
+ # These tests require Python >=2.7.
+ del test_fixup_expected_failures, test_fixup_expected_errors, test_fixup_unexpected_success
+
+
+class TestFilterCommand(TestCase):
+
+ example_subunit_stream = _b("""\
+tags: global
+test passed
+success passed
+test failed
+tags: local
+failure failed
+test error
+error error [
+error details
+]
+test skipped
+skip skipped
+test todo
+xfail todo
+""")
+
+ def run_command(self, args, stream):
+ root = os.path.dirname(
+ os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
+ script_path = os.path.join(root, 'filters', 'subunit-filter')
+ command = [sys.executable, script_path] + list(args)
+ ps = subprocess.Popen(
+ command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ out, err = ps.communicate(stream)
+ if ps.returncode != 0:
+ raise RuntimeError("%s failed: %s" % (command, err))
+ return out
+
+ def to_events(self, stream):
+ test = subunit.ProtocolTestCase(BytesIO(stream))
+ result = ExtendedTestResult()
+ test.run(result)
+ return result._events
+
+ def test_default(self):
+ output = self.run_command([], _b(
+ "test: foo\n"
+ "skip: foo\n"
+ ))
+ events = self.to_events(output)
+ foo = subunit.RemotedTestCase('foo')
+ self.assertEqual(
+ [('startTest', foo),
+ ('addSkip', foo, {}),
+ ('stopTest', foo)],
+ events)
+
+ def test_tags(self):
+ output = self.run_command(['-s', '--with-tag', 'a'], _b(
+ "tags: a\n"
+ "test: foo\n"
+ "success: foo\n"
+ "tags: -a\n"
+ "test: bar\n"
+ "success: bar\n"
+ "test: baz\n"
+ "tags: a\n"
+ "success: baz\n"
+ ))
+ events = self.to_events(output)
+ foo = subunit.RemotedTestCase('foo')
+ baz = subunit.RemotedTestCase('baz')
+ self.assertEqual(
+ [('tags', set(['a']), set()),
+ ('startTest', foo),
+ ('addSuccess', foo),
+ ('stopTest', foo),
+ ('tags', set(), set(['a'])),
+ ('startTest', baz),
+ ('tags', set(['a']), set()),
+ ('addSuccess', baz),
+ ('stopTest', baz),
+ ],
+ events)
+
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
diff --git a/lib/subunit/python/subunit/tests/test_test_protocol.py b/lib/subunit/python/subunit/tests/test_test_protocol.py
index c93aabd80c..ec6830d03b 100644
--- a/lib/subunit/python/subunit/tests/test_test_protocol.py
+++ b/lib/subunit/python/subunit/tests/test_test_protocol.py
@@ -18,9 +18,9 @@ import datetime
import unittest
import os
-from testtools import skipIf, TestCase
-from testtools.compat import _b, _u, BytesIO, StringIO
-from testtools.content import Content, TracebackContent
+from testtools import skipIf, TestCase, TestResult
+from testtools.compat import _b, _u, BytesIO
+from testtools.content import Content, TracebackContent, text_content
from testtools.content_type import ContentType
try:
from testtools.testresult.doubles import (
@@ -40,6 +40,10 @@ from subunit import _remote_exception_str, _remote_exception_str_chunked
import subunit.iso8601 as iso8601
+def details_to_str(details):
+ return TestResult()._err_details_to_string(None, details=details)
+
+
class TestTestImports(unittest.TestCase):
def test_imports(self):
@@ -87,11 +91,12 @@ class TestTestProtocolServerPipe(unittest.TestCase):
def test_story(self):
client = unittest.TestResult()
protocol = subunit.TestProtocolServer(client)
+ traceback = "foo.c:53:ERROR invalid state\n"
pipe = BytesIO(_b("test old mcdonald\n"
"success old mcdonald\n"
"test bing crosby\n"
"failure bing crosby [\n"
- "foo.c:53:ERROR invalid state\n"
+ + traceback +
"]\n"
"test an error\n"
"error an error\n"))
@@ -102,9 +107,8 @@ class TestTestProtocolServerPipe(unittest.TestCase):
[(an_error, _remote_exception_str + '\n')])
self.assertEqual(
client.failures,
- [(bing, _remote_exception_str + ": Text attachment: traceback\n"
- "------------\nfoo.c:53:ERROR invalid state\n"
- "------------\n\n")])
+ [(bing, _remote_exception_str + ": "
+ + details_to_str({'traceback': text_content(traceback)}) + "\n")])
self.assertEqual(client.testsRun, 3)
def test_non_test_characters_forwarded_immediately(self):
@@ -559,9 +563,7 @@ class TestTestProtocolServerAddxFail(unittest.TestCase):
value = details
else:
if error_message is not None:
- value = subunit.RemoteError(_u("Text attachment: traceback\n"
- "------------\n") + _u(error_message) +
- _u("------------\n"))
+ value = subunit.RemoteError(details_to_str(details))
else:
value = subunit.RemoteError()
self.assertEqual([
@@ -1299,6 +1301,22 @@ class TestTestProtocolClient(unittest.TestCase):
"something\n"
"F\r\nserialised\nform0\r\n]\n" % self.test.id()))
+ def test_tags_empty(self):
+ self.protocol.tags(set(), set())
+ self.assertEqual(_b(""), self.io.getvalue())
+
+ def test_tags_add(self):
+ self.protocol.tags(set(['foo']), set())
+ self.assertEqual(_b("tags: foo\n"), self.io.getvalue())
+
+ def test_tags_both(self):
+ self.protocol.tags(set(['quux']), set(['bar']))
+ self.assertEqual(_b("tags: quux -bar\n"), self.io.getvalue())
+
+ def test_tags_gone(self):
+ self.protocol.tags(set(), set(['bar']))
+ self.assertEqual(_b("tags: -bar\n"), self.io.getvalue())
+
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
diff --git a/lib/subunit/python/subunit/tests/test_test_results.py b/lib/subunit/python/subunit/tests/test_test_results.py
index 94d22748e8..236dfa22e5 100644
--- a/lib/subunit/python/subunit/tests/test_test_results.py
+++ b/lib/subunit/python/subunit/tests/test_test_results.py
@@ -14,16 +14,25 @@
# limitations under that license.
#
+import csv
import datetime
+import sys
import unittest
from testtools import TestCase
+from testtools.compat import StringIO
+from testtools.content import (
+ text_content,
+ TracebackContent,
+ )
from testtools.testresult.doubles import ExtendedTestResult
import subunit
import subunit.iso8601 as iso8601
import subunit.test_results
+import testtools
+
class LoggingDecorator(subunit.test_results.HookedTestResultDecorator):
@@ -192,12 +201,55 @@ class TestAutoTimingTestResultDecorator(unittest.TestCase):
class TestTagCollapsingDecorator(TestCase):
- def test_tags_forwarded_outside_of_tests(self):
+ def test_tags_collapsed_outside_of_tests(self):
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
- tag_collapser.tags(set(['a', 'b']), set())
+ tag_collapser.tags(set(['a']), set())
+ tag_collapser.tags(set(['b']), set())
+ tag_collapser.startTest(self)
self.assertEquals(
- [('tags', set(['a', 'b']), set([]))], result._events)
+ [('tags', set(['a', 'b']), set([])),
+ ('startTest', self),
+ ], result._events)
+
+ def test_tags_collapsed_outside_of_tests_are_flushed(self):
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+ tag_collapser.startTestRun()
+ tag_collapser.tags(set(['a']), set())
+ tag_collapser.tags(set(['b']), set())
+ tag_collapser.startTest(self)
+ tag_collapser.addSuccess(self)
+ tag_collapser.stopTest(self)
+ tag_collapser.stopTestRun()
+ self.assertEquals(
+ [('startTestRun',),
+ ('tags', set(['a', 'b']), set([])),
+ ('startTest', self),
+ ('addSuccess', self),
+ ('stopTest', self),
+ ('stopTestRun',),
+ ], result._events)
+
+ def test_tags_forwarded_after_tests(self):
+ test = subunit.RemotedTestCase('foo')
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+ tag_collapser.startTestRun()
+ tag_collapser.startTest(test)
+ tag_collapser.addSuccess(test)
+ tag_collapser.stopTest(test)
+ tag_collapser.tags(set(['a']), set(['b']))
+ tag_collapser.stopTestRun()
+ self.assertEqual(
+ [('startTestRun',),
+ ('startTest', test),
+ ('addSuccess', test),
+ ('stopTest', test),
+ ('tags', set(['a']), set(['b'])),
+ ('stopTestRun',),
+ ],
+ result._events)
def test_tags_collapsed_inside_of_tests(self):
result = ExtendedTestResult()
@@ -229,6 +281,25 @@ class TestTagCollapsingDecorator(TestCase):
('stopTest', test)],
result._events)
+ def test_tags_sent_before_result(self):
+ # Because addSuccess and friends tend to send subunit output
+ # immediately, and because 'tags:' before a result line means
+ # something different to 'tags:' after a result line, we need to be
+ # sure that tags are emitted before 'addSuccess' (or whatever).
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+ test = subunit.RemotedTestCase('foo')
+ tag_collapser.startTest(test)
+ tag_collapser.tags(set(['a']), set())
+ tag_collapser.addSuccess(test)
+ tag_collapser.stopTest(test)
+ self.assertEquals(
+ [('startTest', test),
+ ('tags', set(['a']), set()),
+ ('addSuccess', test),
+ ('stopTest', test)],
+ result._events)
+
class TestTimeCollapsingDecorator(TestCase):
@@ -294,6 +365,201 @@ class TestTimeCollapsingDecorator(TestCase):
('stopTest', foo)], result._events)
+class TestByTestResultTests(testtools.TestCase):
+
+ def setUp(self):
+ super(TestByTestResultTests, self).setUp()
+ self.log = []
+ self.result = subunit.test_results.TestByTestResult(self.on_test)
+ if sys.version_info >= (3, 0):
+ self.result._now = iter(range(5)).__next__
+ else:
+ self.result._now = iter(range(5)).next
+
+ def assertCalled(self, **kwargs):
+ defaults = {
+ 'test': self,
+ 'tags': set(),
+ 'details': None,
+ 'start_time': 0,
+ 'stop_time': 1,
+ }
+ defaults.update(kwargs)
+ self.assertEqual([defaults], self.log)
+
+ def on_test(self, **kwargs):
+ self.log.append(kwargs)
+
+ def test_no_tests_nothing_reported(self):
+ self.result.startTestRun()
+ self.result.stopTestRun()
+ self.assertEqual([], self.log)
+
+ def test_add_success(self):
+ self.result.startTest(self)
+ self.result.addSuccess(self)
+ self.result.stopTest(self)
+ self.assertCalled(status='success')
+
+ def test_add_success_details(self):
+ self.result.startTest(self)
+ details = {'foo': 'bar'}
+ self.result.addSuccess(self, details=details)
+ self.result.stopTest(self)
+ self.assertCalled(status='success', details=details)
+
+ def test_tags(self):
+ if not getattr(self.result, 'tags', None):
+ self.skipTest("No tags in testtools")
+ self.result.tags(['foo'], [])
+ self.result.startTest(self)
+ self.result.addSuccess(self)
+ self.result.stopTest(self)
+ self.assertCalled(status='success', tags=set(['foo']))
+
+ def test_add_error(self):
+ self.result.startTest(self)
+ try:
+ 1/0
+ except ZeroDivisionError:
+ error = sys.exc_info()
+ self.result.addError(self, error)
+ self.result.stopTest(self)
+ self.assertCalled(
+ status='error',
+ details={'traceback': TracebackContent(error, self)})
+
+ def test_add_error_details(self):
+ self.result.startTest(self)
+ details = {"foo": text_content("bar")}
+ self.result.addError(self, details=details)
+ self.result.stopTest(self)
+ self.assertCalled(status='error', details=details)
+
+ def test_add_failure(self):
+ self.result.startTest(self)
+ try:
+ self.fail("intentional failure")
+ except self.failureException:
+ failure = sys.exc_info()
+ self.result.addFailure(self, failure)
+ self.result.stopTest(self)
+ self.assertCalled(
+ status='failure',
+ details={'traceback': TracebackContent(failure, self)})
+
+ def test_add_failure_details(self):
+ self.result.startTest(self)
+ details = {"foo": text_content("bar")}
+ self.result.addFailure(self, details=details)
+ self.result.stopTest(self)
+ self.assertCalled(status='failure', details=details)
+
+ def test_add_xfail(self):
+ self.result.startTest(self)
+ try:
+ 1/0
+ except ZeroDivisionError:
+ error = sys.exc_info()
+ self.result.addExpectedFailure(self, error)
+ self.result.stopTest(self)
+ self.assertCalled(
+ status='xfail',
+ details={'traceback': TracebackContent(error, self)})
+
+ def test_add_xfail_details(self):
+ self.result.startTest(self)
+ details = {"foo": text_content("bar")}
+ self.result.addExpectedFailure(self, details=details)
+ self.result.stopTest(self)
+ self.assertCalled(status='xfail', details=details)
+
+ def test_add_unexpected_success(self):
+ self.result.startTest(self)
+ details = {'foo': 'bar'}
+ self.result.addUnexpectedSuccess(self, details=details)
+ self.result.stopTest(self)
+ self.assertCalled(status='success', details=details)
+
+ def test_add_skip_reason(self):
+ self.result.startTest(self)
+ reason = self.getUniqueString()
+ self.result.addSkip(self, reason)
+ self.result.stopTest(self)
+ self.assertCalled(
+ status='skip', details={'reason': text_content(reason)})
+
+ def test_add_skip_details(self):
+ self.result.startTest(self)
+ details = {'foo': 'bar'}
+ self.result.addSkip(self, details=details)
+ self.result.stopTest(self)
+ self.assertCalled(status='skip', details=details)
+
+ def test_twice(self):
+ self.result.startTest(self)
+ self.result.addSuccess(self, details={'foo': 'bar'})
+ self.result.stopTest(self)
+ self.result.startTest(self)
+ self.result.addSuccess(self)
+ self.result.stopTest(self)
+ self.assertEqual(
+ [{'test': self,
+ 'status': 'success',
+ 'start_time': 0,
+ 'stop_time': 1,
+ 'tags': set(),
+ 'details': {'foo': 'bar'}},
+ {'test': self,
+ 'status': 'success',
+ 'start_time': 2,
+ 'stop_time': 3,
+ 'tags': set(),
+ 'details': None},
+ ],
+ self.log)
+
+
+class TestCsvResult(testtools.TestCase):
+
+ def parse_stream(self, stream):
+ stream.seek(0)
+ reader = csv.reader(stream)
+ return list(reader)
+
+ def test_csv_output(self):
+ stream = StringIO()
+ result = subunit.test_results.CsvResult(stream)
+ if sys.version_info >= (3, 0):
+ result._now = iter(range(5)).__next__
+ else:
+ result._now = iter(range(5)).next
+ result.startTestRun()
+ result.startTest(self)
+ result.addSuccess(self)
+ result.stopTest(self)
+ result.stopTestRun()
+ self.assertEqual(
+ [['test', 'status', 'start_time', 'stop_time'],
+ [self.id(), 'success', '0', '1'],
+ ],
+ self.parse_stream(stream))
+
+ def test_just_header_when_no_tests(self):
+ stream = StringIO()
+ result = subunit.test_results.CsvResult(stream)
+ result.startTestRun()
+ result.stopTestRun()
+ self.assertEqual(
+ [['test', 'status', 'start_time', 'stop_time']],
+ self.parse_stream(stream))
+
+ def test_no_output_before_events(self):
+ stream = StringIO()
+ subunit.test_results.CsvResult(stream)
+ self.assertEqual([], self.parse_stream(stream))
+
+
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
result = loader.loadTestsFromName(__name__)