diff options
Diffstat (limited to 'lib/subunit/python/subunit/tests/test_test_results.py')
-rw-r--r-- | lib/subunit/python/subunit/tests/test_test_results.py | 272 |
1 files changed, 269 insertions, 3 deletions
diff --git a/lib/subunit/python/subunit/tests/test_test_results.py b/lib/subunit/python/subunit/tests/test_test_results.py index 94d22748e8..236dfa22e5 100644 --- a/lib/subunit/python/subunit/tests/test_test_results.py +++ b/lib/subunit/python/subunit/tests/test_test_results.py @@ -14,16 +14,25 @@ # limitations under that license. # +import csv import datetime +import sys import unittest from testtools import TestCase +from testtools.compat import StringIO +from testtools.content import ( + text_content, + TracebackContent, + ) from testtools.testresult.doubles import ExtendedTestResult import subunit import subunit.iso8601 as iso8601 import subunit.test_results +import testtools + class LoggingDecorator(subunit.test_results.HookedTestResultDecorator): @@ -192,12 +201,55 @@ class TestAutoTimingTestResultDecorator(unittest.TestCase): class TestTagCollapsingDecorator(TestCase): - def test_tags_forwarded_outside_of_tests(self): + def test_tags_collapsed_outside_of_tests(self): result = ExtendedTestResult() tag_collapser = subunit.test_results.TagCollapsingDecorator(result) - tag_collapser.tags(set(['a', 'b']), set()) + tag_collapser.tags(set(['a']), set()) + tag_collapser.tags(set(['b']), set()) + tag_collapser.startTest(self) self.assertEquals( - [('tags', set(['a', 'b']), set([]))], result._events) + [('tags', set(['a', 'b']), set([])), + ('startTest', self), + ], result._events) + + def test_tags_collapsed_outside_of_tests_are_flushed(self): + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TagCollapsingDecorator(result) + tag_collapser.startTestRun() + tag_collapser.tags(set(['a']), set()) + tag_collapser.tags(set(['b']), set()) + tag_collapser.startTest(self) + tag_collapser.addSuccess(self) + tag_collapser.stopTest(self) + tag_collapser.stopTestRun() + self.assertEquals( + [('startTestRun',), + ('tags', set(['a', 'b']), set([])), + ('startTest', self), + ('addSuccess', self), + ('stopTest', self), + ('stopTestRun',), + ], result._events) + + def test_tags_forwarded_after_tests(self): + test = subunit.RemotedTestCase('foo') + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TagCollapsingDecorator(result) + tag_collapser.startTestRun() + tag_collapser.startTest(test) + tag_collapser.addSuccess(test) + tag_collapser.stopTest(test) + tag_collapser.tags(set(['a']), set(['b'])) + tag_collapser.stopTestRun() + self.assertEqual( + [('startTestRun',), + ('startTest', test), + ('addSuccess', test), + ('stopTest', test), + ('tags', set(['a']), set(['b'])), + ('stopTestRun',), + ], + result._events) def test_tags_collapsed_inside_of_tests(self): result = ExtendedTestResult() @@ -229,6 +281,25 @@ class TestTagCollapsingDecorator(TestCase): ('stopTest', test)], result._events) + def test_tags_sent_before_result(self): + # Because addSuccess and friends tend to send subunit output + # immediately, and because 'tags:' before a result line means + # something different to 'tags:' after a result line, we need to be + # sure that tags are emitted before 'addSuccess' (or whatever). + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TagCollapsingDecorator(result) + test = subunit.RemotedTestCase('foo') + tag_collapser.startTest(test) + tag_collapser.tags(set(['a']), set()) + tag_collapser.addSuccess(test) + tag_collapser.stopTest(test) + self.assertEquals( + [('startTest', test), + ('tags', set(['a']), set()), + ('addSuccess', test), + ('stopTest', test)], + result._events) + class TestTimeCollapsingDecorator(TestCase): @@ -294,6 +365,201 @@ class TestTimeCollapsingDecorator(TestCase): ('stopTest', foo)], result._events) +class TestByTestResultTests(testtools.TestCase): + + def setUp(self): + super(TestByTestResultTests, self).setUp() + self.log = [] + self.result = subunit.test_results.TestByTestResult(self.on_test) + if sys.version_info >= (3, 0): + self.result._now = iter(range(5)).__next__ + else: + self.result._now = iter(range(5)).next + + def assertCalled(self, **kwargs): + defaults = { + 'test': self, + 'tags': set(), + 'details': None, + 'start_time': 0, + 'stop_time': 1, + } + defaults.update(kwargs) + self.assertEqual([defaults], self.log) + + def on_test(self, **kwargs): + self.log.append(kwargs) + + def test_no_tests_nothing_reported(self): + self.result.startTestRun() + self.result.stopTestRun() + self.assertEqual([], self.log) + + def test_add_success(self): + self.result.startTest(self) + self.result.addSuccess(self) + self.result.stopTest(self) + self.assertCalled(status='success') + + def test_add_success_details(self): + self.result.startTest(self) + details = {'foo': 'bar'} + self.result.addSuccess(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='success', details=details) + + def test_tags(self): + if not getattr(self.result, 'tags', None): + self.skipTest("No tags in testtools") + self.result.tags(['foo'], []) + self.result.startTest(self) + self.result.addSuccess(self) + self.result.stopTest(self) + self.assertCalled(status='success', tags=set(['foo'])) + + def test_add_error(self): + self.result.startTest(self) + try: + 1/0 + except ZeroDivisionError: + error = sys.exc_info() + self.result.addError(self, error) + self.result.stopTest(self) + self.assertCalled( + status='error', + details={'traceback': TracebackContent(error, self)}) + + def test_add_error_details(self): + self.result.startTest(self) + details = {"foo": text_content("bar")} + self.result.addError(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='error', details=details) + + def test_add_failure(self): + self.result.startTest(self) + try: + self.fail("intentional failure") + except self.failureException: + failure = sys.exc_info() + self.result.addFailure(self, failure) + self.result.stopTest(self) + self.assertCalled( + status='failure', + details={'traceback': TracebackContent(failure, self)}) + + def test_add_failure_details(self): + self.result.startTest(self) + details = {"foo": text_content("bar")} + self.result.addFailure(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='failure', details=details) + + def test_add_xfail(self): + self.result.startTest(self) + try: + 1/0 + except ZeroDivisionError: + error = sys.exc_info() + self.result.addExpectedFailure(self, error) + self.result.stopTest(self) + self.assertCalled( + status='xfail', + details={'traceback': TracebackContent(error, self)}) + + def test_add_xfail_details(self): + self.result.startTest(self) + details = {"foo": text_content("bar")} + self.result.addExpectedFailure(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='xfail', details=details) + + def test_add_unexpected_success(self): + self.result.startTest(self) + details = {'foo': 'bar'} + self.result.addUnexpectedSuccess(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='success', details=details) + + def test_add_skip_reason(self): + self.result.startTest(self) + reason = self.getUniqueString() + self.result.addSkip(self, reason) + self.result.stopTest(self) + self.assertCalled( + status='skip', details={'reason': text_content(reason)}) + + def test_add_skip_details(self): + self.result.startTest(self) + details = {'foo': 'bar'} + self.result.addSkip(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='skip', details=details) + + def test_twice(self): + self.result.startTest(self) + self.result.addSuccess(self, details={'foo': 'bar'}) + self.result.stopTest(self) + self.result.startTest(self) + self.result.addSuccess(self) + self.result.stopTest(self) + self.assertEqual( + [{'test': self, + 'status': 'success', + 'start_time': 0, + 'stop_time': 1, + 'tags': set(), + 'details': {'foo': 'bar'}}, + {'test': self, + 'status': 'success', + 'start_time': 2, + 'stop_time': 3, + 'tags': set(), + 'details': None}, + ], + self.log) + + +class TestCsvResult(testtools.TestCase): + + def parse_stream(self, stream): + stream.seek(0) + reader = csv.reader(stream) + return list(reader) + + def test_csv_output(self): + stream = StringIO() + result = subunit.test_results.CsvResult(stream) + if sys.version_info >= (3, 0): + result._now = iter(range(5)).__next__ + else: + result._now = iter(range(5)).next + result.startTestRun() + result.startTest(self) + result.addSuccess(self) + result.stopTest(self) + result.stopTestRun() + self.assertEqual( + [['test', 'status', 'start_time', 'stop_time'], + [self.id(), 'success', '0', '1'], + ], + self.parse_stream(stream)) + + def test_just_header_when_no_tests(self): + stream = StringIO() + result = subunit.test_results.CsvResult(stream) + result.startTestRun() + result.stopTestRun() + self.assertEqual( + [['test', 'status', 'start_time', 'stop_time']], + self.parse_stream(stream)) + + def test_no_output_before_events(self): + stream = StringIO() + subunit.test_results.CsvResult(stream) + self.assertEqual([], self.parse_stream(stream)) + + def test_suite(): loader = subunit.tests.TestUtil.TestLoader() result = loader.loadTestsFromName(__name__) |