diff options
27 files changed, 4062 insertions, 505 deletions
diff --git a/lib/subunit/filters/tap2subunit b/lib/subunit/filters/tap2subunit new file mode 100755 index 0000000000..c571972225 --- /dev/null +++ b/lib/subunit/filters/tap2subunit @@ -0,0 +1,26 @@ +#!/usr/bin/env python +#  subunit: extensions to python unittest to get test results from subprocesses. +#  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net> +# +#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +#  license at the users choice. A copy of both licenses are available in the +#  project source as Apache-2.0 and BSD. You may not use this file except in +#  compliance with one of these two licences. +#   +#  Unless required by applicable law or agreed to in writing, software +#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +#  license you chose for the specific language governing permissions and +#  limitations under that license. +# + +"""A filter that reads a TAP stream and outputs a subunit stream. + +More information on TAP is available at +http://testanything.org/wiki/index.php/Main_Page. +""" + +import sys + +from subunit import TAP2SubUnit +sys.exit(TAP2SubUnit(sys.stdin, sys.stdout)) diff --git a/lib/subunit/python/iso8601/LICENSE b/lib/subunit/python/iso8601/LICENSE new file mode 100644 index 0000000000..5ca93dae79 --- /dev/null +++ b/lib/subunit/python/iso8601/LICENSE @@ -0,0 +1,20 @@ +Copyright (c) 2007 Michael Twomey + +Permission is hereby granted, free of charge, to any person obtaining a +copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be included +in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/lib/subunit/python/iso8601/README b/lib/subunit/python/iso8601/README new file mode 100644 index 0000000000..5ec9d45597 --- /dev/null +++ b/lib/subunit/python/iso8601/README @@ -0,0 +1,26 @@ +A simple package to deal with ISO 8601 date time formats. + +ISO 8601 defines a neutral, unambiguous date string format, which also +has the property of sorting naturally. + +e.g. YYYY-MM-DDTHH:MM:SSZ or 2007-01-25T12:00:00Z + +Currently this covers only the most common date formats encountered, not +all of ISO 8601 is handled. + +Currently the following formats are handled: + +* 2006-01-01T00:00:00Z +* 2006-01-01T00:00:00[+-]00:00 + +I'll add more as I encounter them in my day to day life. Patches with  +new formats and tests will be gratefully accepted of course :) + +References: + +* http://www.cl.cam.ac.uk/~mgk25/iso-time.html - simple overview + +* http://hydracen.com/dx/iso8601.htm - more detailed enumeration of +  valid formats. + +See the LICENSE file for the license this package is released under. diff --git a/lib/subunit/python/iso8601/README.subunit b/lib/subunit/python/iso8601/README.subunit new file mode 100644 index 0000000000..d1ed8a11a6 --- /dev/null +++ b/lib/subunit/python/iso8601/README.subunit @@ -0,0 +1,5 @@ +This is a [slightly rearranged] import of http://pypi.python.org/pypi/iso8601/ +version 0.1.4. The OS X hidden files have been stripped, and the package +turned into a single module, to simplify installation. The remainder of the +source distribution is included in the subunit source tree at python/iso8601 +for reference. diff --git a/lib/subunit/python/iso8601/setup.py b/lib/subunit/python/iso8601/setup.py new file mode 100644 index 0000000000..cdb61ecf6a --- /dev/null +++ b/lib/subunit/python/iso8601/setup.py @@ -0,0 +1,58 @@ +try: +    from setuptools import setup +except ImportError: +    from distutils import setup + +long_description="""Simple module to parse ISO 8601 dates + +This module parses the most common forms of ISO 8601 date strings (e.g. +2007-01-14T20:34:22+00:00) into datetime objects. + +>>> import iso8601 +>>> iso8601.parse_date("2007-01-25T12:00:00Z") +datetime.datetime(2007, 1, 25, 12, 0, tzinfo=<iso8601.iso8601.Utc ...>) +>>> + +Changes +======= + +0.1.4 +----- + +* The default_timezone argument wasn't being passed through correctly, +  UTC was being used in every case. Fixes issue 10. + +0.1.3 +----- + +* Fixed the microsecond handling, the generated microsecond values were  +  way too small. Fixes issue 9. + +0.1.2 +----- + +* Adding ParseError to __all__ in iso8601 module, allows people to import it. +  Addresses issue 7. +* Be a little more flexible when dealing with dates without leading zeroes. +  This violates the spec a little, but handles more dates as seen in the  +  field. Addresses issue 6. +* Allow date/time separators other than T. + +0.1.1 +----- + +* When parsing dates without a timezone the specified default is used. If no +  default is specified then UTC is used. Addresses issue 4. +""" + +setup( +    name="iso8601", +    version="0.1.4", +    description=long_description.split("\n")[0], +    long_description=long_description, +    author="Michael Twomey", +    author_email="micktwomey+iso8601@gmail.com", +    url="http://code.google.com/p/pyiso8601/", +    packages=["iso8601"], +    license="MIT", +) diff --git a/lib/subunit/python/iso8601/test_iso8601.py b/lib/subunit/python/iso8601/test_iso8601.py new file mode 100644 index 0000000000..ff9e2731cf --- /dev/null +++ b/lib/subunit/python/iso8601/test_iso8601.py @@ -0,0 +1,111 @@ +import iso8601 + +def test_iso8601_regex(): +    assert iso8601.ISO8601_REGEX.match("2006-10-11T00:14:33Z") + +def test_timezone_regex(): +    assert iso8601.TIMEZONE_REGEX.match("+01:00") +    assert iso8601.TIMEZONE_REGEX.match("+00:00") +    assert iso8601.TIMEZONE_REGEX.match("+01:20") +    assert iso8601.TIMEZONE_REGEX.match("-01:00") + +def test_parse_date(): +    d = iso8601.parse_date("2006-10-20T15:34:56Z") +    assert d.year == 2006 +    assert d.month == 10 +    assert d.day == 20 +    assert d.hour == 15 +    assert d.minute == 34 +    assert d.second == 56 +    assert d.tzinfo == iso8601.UTC + +def test_parse_date_fraction(): +    d = iso8601.parse_date("2006-10-20T15:34:56.123Z") +    assert d.year == 2006 +    assert d.month == 10 +    assert d.day == 20 +    assert d.hour == 15 +    assert d.minute == 34 +    assert d.second == 56 +    assert d.microsecond == 123000 +    assert d.tzinfo == iso8601.UTC + +def test_parse_date_fraction_2(): +    """From bug 6 +     +    """ +    d = iso8601.parse_date("2007-5-7T11:43:55.328Z'") +    assert d.year == 2007 +    assert d.month == 5 +    assert d.day == 7 +    assert d.hour == 11 +    assert d.minute == 43 +    assert d.second == 55 +    assert d.microsecond == 328000 +    assert d.tzinfo == iso8601.UTC + +def test_parse_date_tz(): +    d = iso8601.parse_date("2006-10-20T15:34:56.123+02:30") +    assert d.year == 2006 +    assert d.month == 10 +    assert d.day == 20 +    assert d.hour == 15 +    assert d.minute == 34 +    assert d.second == 56 +    assert d.microsecond == 123000 +    assert d.tzinfo.tzname(None) == "+02:30" +    offset = d.tzinfo.utcoffset(None) +    assert offset.days == 0 +    assert offset.seconds == 60 * 60 * 2.5 + +def test_parse_invalid_date(): +    try: +        iso8601.parse_date(None) +    except iso8601.ParseError: +        pass +    else: +        assert 1 == 2 + +def test_parse_invalid_date2(): +    try: +        iso8601.parse_date("23") +    except iso8601.ParseError: +        pass +    else: +        assert 1 == 2 + +def test_parse_no_timezone(): +    """issue 4 - Handle datetime string without timezone +     +    This tests what happens when you parse a date with no timezone. While not +    strictly correct this is quite common. I'll assume UTC for the time zone +    in this case. +    """ +    d = iso8601.parse_date("2007-01-01T08:00:00") +    assert d.year == 2007 +    assert d.month == 1 +    assert d.day == 1 +    assert d.hour == 8 +    assert d.minute == 0 +    assert d.second == 0 +    assert d.microsecond == 0 +    assert d.tzinfo == iso8601.UTC + +def test_parse_no_timezone_different_default(): +    tz = iso8601.FixedOffset(2, 0, "test offset") +    d = iso8601.parse_date("2007-01-01T08:00:00", default_timezone=tz) +    assert d.tzinfo == tz + +def test_space_separator(): +    """Handle a separator other than T +     +    """ +    d = iso8601.parse_date("2007-06-23 06:40:34.00Z") +    assert d.year == 2007 +    assert d.month == 6 +    assert d.day == 23 +    assert d.hour == 6 +    assert d.minute == 40 +    assert d.second == 34 +    assert d.microsecond == 0 +    assert d.tzinfo == iso8601.UTC diff --git a/lib/subunit/python/subunit/__init__.py b/lib/subunit/python/subunit/__init__.py index 406cd8765b..6e8df90317 100644 --- a/lib/subunit/python/subunit/__init__.py +++ b/lib/subunit/python/subunit/__init__.py @@ -1,28 +1,151 @@  # -#  subunit: extensions to python unittest to get test results from subprocesses. +#  subunit: extensions to Python unittest to get test results from subprocesses.  #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net> -#  Copyright (C) 2007  Jelmer Vernooij <jelmer@samba.org>  # -#  This program is free software; you can redistribute it and/or modify -#  it under the terms of the GNU General Public License as published by -#  the Free Software Foundation; either version 3 of the License, or -#  (at your option) any later version. -# -#  This program is distributed in the hope that it will be useful, -#  but WITHOUT ANY WARRANTY; without even the implied warranty of -#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -#  GNU General Public License for more details. -# -#  You should have received a copy of the GNU General Public License -#  along with this program; if not, write to the Free Software -#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA +#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +#  license at the users choice. A copy of both licenses are available in the +#  project source as Apache-2.0 and BSD. You may not use this file except in +#  compliance with one of these two licences. +#   +#  Unless required by applicable law or agreed to in writing, software +#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +#  license you chose for the specific language governing permissions and +#  limitations under that license.  # +"""Subunit - a streaming test protocol + +Overview +++++++++ + +The ``subunit`` Python package provides a number of ``unittest`` extensions +which can be used to cause tests to output Subunit, to parse Subunit streams +into test activity, perform seamless test isolation within a regular test +case and variously sort, filter and report on test runs. + + +Key Classes +----------- + +The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult`` +extension which will translate a test run into a Subunit stream. + +The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire +protocol and the ``unittest.TestCase`` object protocol. It is used to translate +a stream into a test run, which regular ``unittest.TestResult`` objects can +process and report/inspect. + +Subunit has support for non-blocking usage too, for use with asyncore or +Twisted. See the ``TestProtocolServer`` parser class for more details. + +Subunit includes extensions to the Python ``TestResult`` protocol. These are +all done in a compatible manner: ``TestResult`` objects that do not implement +the extension methods will not cause errors to be raised, instead the extension +will either lose fidelity (for instance, folding expected failures to success +in Python versions < 2.7 or 3.1), or discard the extended data (for extra +details, tags, timestamping and progress markers). + +The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``, +``addFailure``, ``addSkip`` take an optional keyword parameter ``details`` +which can be used instead of the usual python unittest parameter. +When used the value of details should be a dict from ``string`` to  +``testtools.content.Content`` objects. This is a draft API being worked on with +the Python Testing In Python mail list, with the goal of permitting a common +way to provide additional data beyond a traceback, such as captured data from +disk, logging messages etc. The reference for this API is in testtools (0.9.0 +and newer). + +The ``tags(new_tags, gone_tags)`` method is called (if present) to add or +remove tags in the test run that is currently executing. If called when no +test is in progress (that is, if called outside of the ``startTest``,  +``stopTest`` pair), the the tags apply to all sebsequent tests. If called +when a test is in progress, then the tags only apply to that test. + +The ``time(a_datetime)`` method is called (if present) when a ``time:`` +directive is encountered in a Subunit stream. This is used to tell a TestResult +about the time that events in the stream occured at, to allow reconstructing +test timing from a stream. + +The ``progress(offset, whence)`` method controls progress data for a stream. +The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR, +subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations +ignore the offset parameter. + + +Python test support +------------------- + +``subunit.run`` is a convenience wrapper to run a Python test suite via +the command line, reporting via Subunit:: + +  $ python -m subunit.run mylib.tests.test_suite + +The ``IsolatedTestSuite`` class is a TestSuite that forks before running its +tests, allowing isolation between the test runner and some tests. + +Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get +tests that will fork() before that individual test is run. + +`ExecTestCase`` is a convenience wrapper for running an external  +program to get a Subunit stream and then report that back to an arbitrary +result object:: + + class AggregateTests(subunit.ExecTestCase): + +     def test_script_one(self): +         './bin/script_one' + +     def test_script_two(self): +         './bin/script_two' +  + # Normally your normal test loading would take of this automatically, + # It is only spelt out in detail here for clarity. + suite = unittest.TestSuite([AggregateTests("test_script_one"), +     AggregateTests("test_script_two")]) + # Create any TestResult class you like. + result = unittest._TextTestResult(sys.stdout) + # And run your suite as normal, Subunit will exec each external script as + # needed and report to your result object. + suite.run(result) + +Utility modules +--------------- + +* subunit.chunked contains HTTP chunked encoding/decoding logic. +* subunit.test_results contains TestResult helper classes. +""" + +import datetime  import os +import re  from StringIO import StringIO +import subprocess  import sys  import unittest +import iso8601 +from testtools import content, content_type, ExtendedToOriginalDecorator +try: +    from testtools.testresult.real import _StringException +    RemoteException = _StringException +    _remote_exception_str = '_StringException' # For testing. +except ImportError: +    raise ImportError ("testtools.testresult.real does not contain " +        "_StringException, check your version.") + + +from testtools.testresult.real import _StringException + +import chunked, details, test_results + + +PROGRESS_SET = 0 +PROGRESS_CUR = 1 +PROGRESS_PUSH = 2 +PROGRESS_POP = 3 + +  def test_suite():      import subunit.tests      return subunit.tests.test_suite() @@ -42,211 +165,558 @@ def join_dir(base_path, path):      return os.path.join(os.path.dirname(os.path.abspath(base_path)), path) -class TestProtocolServer(object): -    """A class for receiving results from a TestProtocol client.""" - -    OUTSIDE_TEST = 0 -    TEST_STARTED = 1 -    READING_FAILURE = 2 -    READING_ERROR = 3 - -    def __init__(self, client, stream=sys.stdout): -        """Create a TestProtocol server instance. - -        client should be an object that provides -         - startTest -         - addSuccess -         - addFailure -         - addError -         - stopTest -        methods, i.e. a TestResult. -        """ -        self.state = TestProtocolServer.OUTSIDE_TEST -        self.client = client -        self._stream = stream - -    def _addError(self, offset, line): -        if (self.state == TestProtocolServer.TEST_STARTED and -            self.current_test_description == line[offset:-1]): -            self.state = TestProtocolServer.OUTSIDE_TEST -            self.current_test_description = None -            self.client.addError(self._current_test, RemoteError("")) -            self.client.stopTest(self._current_test) -            self._current_test = None -        elif (self.state == TestProtocolServer.TEST_STARTED and -            self.current_test_description + " [" == line[offset:-1]): -            self.state = TestProtocolServer.READING_ERROR -            self._message = "" -        else: -            self.stdOutLineReceived(line) - -    def _addFailure(self, offset, line): -        if (self.state == TestProtocolServer.TEST_STARTED and -            self.current_test_description == line[offset:-1]): -            self.state = TestProtocolServer.OUTSIDE_TEST -            self.current_test_description = None -            self.client.addFailure(self._current_test, RemoteError()) -            self.client.stopTest(self._current_test) -        elif (self.state == TestProtocolServer.TEST_STARTED and -            self.current_test_description + " [" == line[offset:-1]): -            self.state = TestProtocolServer.READING_FAILURE -            self._message = "" +def tags_to_new_gone(tags): +    """Split a list of tags into a new_set and a gone_set.""" +    new_tags = set() +    gone_tags = set() +    for tag in tags: +        if tag[0] == '-': +            gone_tags.add(tag[1:])          else: -            self.stdOutLineReceived(line) - -    def _addSuccess(self, offset, line): -        if (self.state == TestProtocolServer.TEST_STARTED and -            self.current_test_description == line[offset:-1]): -            self.client.addSuccess(self._current_test) -            self.client.stopTest(self._current_test) -            self.current_test_description = None -            self._current_test = None -            self.state = TestProtocolServer.OUTSIDE_TEST +            new_tags.add(tag) +    return new_tags, gone_tags + + +class DiscardStream(object): +    """A filelike object which discards what is written to it.""" + +    def write(self, bytes): +        pass + + +class _ParserState(object): +    """State for the subunit parser.""" + +    def __init__(self, parser): +        self.parser = parser + +    def addError(self, offset, line): +        """An 'error:' directive has been read.""" +        self.parser.stdOutLineReceived(line) + +    def addExpectedFail(self, offset, line): +        """An 'xfail:' directive has been read.""" +        self.parser.stdOutLineReceived(line) + +    def addFailure(self, offset, line): +        """A 'failure:' directive has been read.""" +        self.parser.stdOutLineReceived(line) + +    def addSkip(self, offset, line): +        """A 'skip:' directive has been read.""" +        self.parser.stdOutLineReceived(line) + +    def addSuccess(self, offset, line): +        """A 'success:' directive has been read.""" +        self.parser.stdOutLineReceived(line) + +    def lineReceived(self, line): +        """a line has been received.""" +        parts = line.split(None, 1) +        if len(parts) == 2: +            cmd, rest = parts +            offset = len(cmd) + 1 +            cmd = cmd.strip(':') +            if cmd in ('test', 'testing'): +                self.startTest(offset, line) +            elif cmd == 'error': +                self.addError(offset, line) +            elif cmd == 'failure': +                self.addFailure(offset, line) +            elif cmd == 'progress': +                self.parser._handleProgress(offset, line) +            elif cmd == 'skip': +                self.addSkip(offset, line) +            elif cmd in ('success', 'successful'): +                self.addSuccess(offset, line) +            elif cmd in ('tags',): +                self.parser._handleTags(offset, line) +                self.parser.subunitLineReceived(line) +            elif cmd in ('time',): +                self.parser._handleTime(offset, line) +                self.parser.subunitLineReceived(line) +            elif cmd == 'xfail': +                self.addExpectedFail(offset, line) +            else: +                self.parser.stdOutLineReceived(line)          else: -            self.stdOutLineReceived(line) +            self.parser.stdOutLineReceived(line) + +    def lostConnection(self): +        """Connection lost.""" +        self.parser._lostConnectionInTest('unknown state of ') + +    def startTest(self, offset, line): +        """A test start command received.""" +        self.parser.stdOutLineReceived(line) + -    def _appendMessage(self, line): -        if line[0:2] == " ]": -            # quoted ] start -            self._message += line[1:] +class _InTest(_ParserState): +    """State for the subunit parser after reading a test: directive.""" + +    def _outcome(self, offset, line, no_details, details_state): +        """An outcome directive has been read. +         +        :param no_details: Callable to call when no details are presented. +        :param details_state: The state to switch to for details +            processing of this outcome. +        """ +        if self.parser.current_test_description == line[offset:-1]: +            self.parser._state = self.parser._outside_test +            self.parser.current_test_description = None +            no_details() +            self.parser.client.stopTest(self.parser._current_test) +            self.parser._current_test = None +            self.parser.subunitLineReceived(line) +        elif self.parser.current_test_description + " [" == line[offset:-1]: +            self.parser._state = details_state +            details_state.set_simple() +            self.parser.subunitLineReceived(line) +        elif self.parser.current_test_description + " [ multipart" == \ +            line[offset:-1]: +            self.parser._state = details_state +            details_state.set_multipart() +            self.parser.subunitLineReceived(line)          else: -            self._message += line - -    def endQuote(self, line): -        if self.state == TestProtocolServer.READING_FAILURE: -            self.state = TestProtocolServer.OUTSIDE_TEST -            self.current_test_description = None -            self.client.addFailure(self._current_test, -                                   RemoteError(self._message)) -            self.client.stopTest(self._current_test) -        elif self.state == TestProtocolServer.READING_ERROR: -            self.state = TestProtocolServer.OUTSIDE_TEST -            self.current_test_description = None -            self.client.addError(self._current_test, -                                 RemoteError(self._message)) -            self.client.stopTest(self._current_test) +            self.parser.stdOutLineReceived(line) + +    def _error(self): +        self.parser.client.addError(self.parser._current_test, +            details={}) + +    def addError(self, offset, line): +        """An 'error:' directive has been read.""" +        self._outcome(offset, line, self._error, +            self.parser._reading_error_details) + +    def _xfail(self): +        self.parser.client.addExpectedFailure(self.parser._current_test, +            details={}) + +    def addExpectedFail(self, offset, line): +        """An 'xfail:' directive has been read.""" +        self._outcome(offset, line, self._xfail, +            self.parser._reading_xfail_details) + +    def _failure(self): +        self.parser.client.addFailure(self.parser._current_test, details={}) + +    def addFailure(self, offset, line): +        """A 'failure:' directive has been read.""" +        self._outcome(offset, line, self._failure, +            self.parser._reading_failure_details) + +    def _skip(self): +        self.parser.client.addSkip(self.parser._current_test, details={}) + +    def addSkip(self, offset, line): +        """A 'skip:' directive has been read.""" +        self._outcome(offset, line, self._skip, +            self.parser._reading_skip_details) + +    def _succeed(self): +        self.parser.client.addSuccess(self.parser._current_test, details={}) + +    def addSuccess(self, offset, line): +        """A 'success:' directive has been read.""" +        self._outcome(offset, line, self._succeed, +            self.parser._reading_success_details) + +    def lostConnection(self): +        """Connection lost.""" +        self.parser._lostConnectionInTest('') + + +class _OutSideTest(_ParserState): +    """State for the subunit parser outside of a test context.""" + +    def lostConnection(self): +        """Connection lost.""" + +    def startTest(self, offset, line): +        """A test start command received.""" +        self.parser._state = self.parser._in_test +        self.parser._current_test = RemotedTestCase(line[offset:-1]) +        self.parser.current_test_description = line[offset:-1] +        self.parser.client.startTest(self.parser._current_test) +        self.parser.subunitLineReceived(line) + + +class _ReadingDetails(_ParserState): +    """Common logic for readin state details.""" + +    def endDetails(self): +        """The end of a details section has been reached.""" +        self.parser._state = self.parser._outside_test +        self.parser.current_test_description = None +        self._report_outcome() +        self.parser.client.stopTest(self.parser._current_test) + +    def lineReceived(self, line): +        """a line has been received.""" +        self.details_parser.lineReceived(line) +        self.parser.subunitLineReceived(line) + +    def lostConnection(self): +        """Connection lost.""" +        self.parser._lostConnectionInTest('%s report of ' % +            self._outcome_label()) + +    def _outcome_label(self): +        """The label to describe this outcome.""" +        raise NotImplementedError(self._outcome_label) + +    def set_simple(self): +        """Start a simple details parser.""" +        self.details_parser = details.SimpleDetailsParser(self) + +    def set_multipart(self): +        """Start a multipart details parser.""" +        self.details_parser = details.MultipartDetailsParser(self) + + +class _ReadingFailureDetails(_ReadingDetails): +    """State for the subunit parser when reading failure details.""" + +    def _report_outcome(self): +        self.parser.client.addFailure(self.parser._current_test, +            details=self.details_parser.get_details()) + +    def _outcome_label(self): +        return "failure" +  + +class _ReadingErrorDetails(_ReadingDetails): +    """State for the subunit parser when reading error details.""" + +    def _report_outcome(self): +        self.parser.client.addError(self.parser._current_test, +            details=self.details_parser.get_details()) + +    def _outcome_label(self): +        return "error" + + +class _ReadingExpectedFailureDetails(_ReadingDetails): +    """State for the subunit parser when reading xfail details.""" + +    def _report_outcome(self): +        self.parser.client.addExpectedFailure(self.parser._current_test, +            details=self.details_parser.get_details()) + +    def _outcome_label(self): +        return "xfail" + + +class _ReadingSkipDetails(_ReadingDetails): +    """State for the subunit parser when reading skip details.""" + +    def _report_outcome(self): +        self.parser.client.addSkip(self.parser._current_test, +            details=self.details_parser.get_details("skip")) + +    def _outcome_label(self): +        return "skip" + + +class _ReadingSuccessDetails(_ReadingDetails): +    """State for the subunit parser when reading success details.""" + +    def _report_outcome(self): +        self.parser.client.addSuccess(self.parser._current_test, +            details=self.details_parser.get_details("success")) + +    def _outcome_label(self): +        return "success" + + +class TestProtocolServer(object): +    """A parser for subunit. +     +    :ivar tags: The current tags associated with the protocol stream. +    """ + +    def __init__(self, client, stream=None, forward_stream=None): +        """Create a TestProtocolServer instance. + +        :param client: An object meeting the unittest.TestResult protocol. +        :param stream: The stream that lines received which are not part of the +            subunit protocol should be written to. This allows custom handling +            of mixed protocols. By default, sys.stdout will be used for +            convenience. +        :param forward_stream: A stream to forward subunit lines to. This  +            allows a filter to forward the entire stream while still parsing +            and acting on it. By default forward_stream is set to +            DiscardStream() and no forwarding happens. +        """ +        self.client = ExtendedToOriginalDecorator(client) +        if stream is None: +            stream = sys.stdout +        self._stream = stream +        self._forward_stream = forward_stream or DiscardStream() +        # state objects we can switch too +        self._in_test = _InTest(self) +        self._outside_test = _OutSideTest(self) +        self._reading_error_details = _ReadingErrorDetails(self) +        self._reading_failure_details = _ReadingFailureDetails(self) +        self._reading_skip_details = _ReadingSkipDetails(self) +        self._reading_success_details = _ReadingSuccessDetails(self) +        self._reading_xfail_details = _ReadingExpectedFailureDetails(self) +        # start with outside test. +        self._state = self._outside_test + +    def _handleProgress(self, offset, line): +        """Process a progress directive.""" +        line = line[offset:].strip() +        if line[0] in '+-': +            whence = PROGRESS_CUR +            delta = int(line) +        elif line == "push": +            whence = PROGRESS_PUSH +            delta = None +        elif line == "pop": +            whence = PROGRESS_POP +            delta = None          else: -            self.stdOutLineReceived(line) +            whence = PROGRESS_SET +            delta = int(line) +        self.client.progress(delta, whence) + +    def _handleTags(self, offset, line): +        """Process a tags command.""" +        tags = line[offset:].split() +        new_tags, gone_tags = tags_to_new_gone(tags) +        self.client.tags(new_tags, gone_tags) + +    def _handleTime(self, offset, line): +        # Accept it, but do not do anything with it yet. +        try: +            event_time = iso8601.parse_date(line[offset:-1]) +        except TypeError, e: +            raise TypeError("Failed to parse %r, got %r" % (line, e)) +        self.client.time(event_time)      def lineReceived(self, line):          """Call the appropriate local method for the received line.""" -        if line == "]\n": -            self.endQuote(line) -        elif (self.state == TestProtocolServer.READING_FAILURE or -              self.state == TestProtocolServer.READING_ERROR): -            self._appendMessage(line) -        else: -            parts = line.split(None, 1) -            if len(parts) == 2: -                cmd, rest = parts -                offset = len(cmd) + 1 -                cmd = cmd.strip(':') -                if cmd in ('test', 'testing'): -                    self._startTest(offset, line) -                elif cmd == 'error': -                    self._addError(offset, line) -                elif cmd == 'failure': -                    self._addFailure(offset, line) -                elif cmd in ('success', 'successful'): -                    self._addSuccess(offset, line) -                else: -                    self.stdOutLineReceived(line) -            else: -                self.stdOutLineReceived(line) +        self._state.lineReceived(line) + +    def _lostConnectionInTest(self, state_string): +        error_string = "lost connection during %stest '%s'" % ( +            state_string, self.current_test_description) +        self.client.addError(self._current_test, RemoteError(error_string)) +        self.client.stopTest(self._current_test)      def lostConnection(self):          """The input connection has finished.""" -        if self.state == TestProtocolServer.TEST_STARTED: -            self.client.addError(self._current_test, -                                 RemoteError("lost connection during test '%s'" -                                             % self.current_test_description)) -            self.client.stopTest(self._current_test) -        elif self.state == TestProtocolServer.READING_ERROR: -            self.client.addError(self._current_test, -                                 RemoteError("lost connection during " -                                             "error report of test " -                                             "'%s'" % -                                             self.current_test_description)) -            self.client.stopTest(self._current_test) -        elif self.state == TestProtocolServer.READING_FAILURE: -            self.client.addError(self._current_test, -                                 RemoteError("lost connection during " -                                             "failure report of test " -                                             "'%s'" % -                                             self.current_test_description)) -            self.client.stopTest(self._current_test) +        self._state.lostConnection()      def readFrom(self, pipe): +        """Blocking convenience API to parse an entire stream. +         +        :param pipe: A file-like object supporting readlines(). +        :return: None. +        """          for line in pipe.readlines():              self.lineReceived(line)          self.lostConnection()      def _startTest(self, offset, line):          """Internal call to change state machine. Override startTest().""" -        if self.state == TestProtocolServer.OUTSIDE_TEST: -            self.state = TestProtocolServer.TEST_STARTED -            self._current_test = RemotedTestCase(line[offset:-1]) -            self.current_test_description = line[offset:-1] -            self.client.startTest(self._current_test) -        else: -            self.stdOutLineReceived(line) +        self._state.startTest(offset, line) + +    def subunitLineReceived(self, line): +        self._forward_stream.write(line)      def stdOutLineReceived(self, line):          self._stream.write(line) -class RemoteException(Exception): -    """An exception that occured remotely to python.""" - -    def __eq__(self, other): -        try: -            return self.args == other.args -        except AttributeError: -            return False - -  class TestProtocolClient(unittest.TestResult): -    """A class that looks like a TestResult and informs a TestProtocolServer.""" +    """A TestResult which generates a subunit stream for a test run. +     +    # Get a TestSuite or TestCase to run +    suite = make_suite() +    # Create a stream (any object with a 'write' method) +    stream = file('tests.log', 'wb') +    # Create a subunit result object which will output to the stream +    result = subunit.TestProtocolClient(stream) +    # Optionally, to get timing data for performance analysis, wrap the +    # serialiser with a timing decorator +    result = subunit.test_results.AutoTimingTestResultDecorator(result) +    # Run the test suite reporting to the subunit result object +    suite.run(result) +    # Close the stream. +    stream.close() +    """      def __init__(self, stream): -        super(TestProtocolClient, self).__init__() +        unittest.TestResult.__init__(self)          self._stream = stream -    def addError(self, test, error): -        """Report an error in test test.""" -        self._stream.write("error: %s [\n" % (test.shortDescription() or str(test))) -        for line in self._exc_info_to_string(error, test).splitlines(): -            self._stream.write("%s\n" % line) +    def addError(self, test, error=None, details=None): +        """Report an error in test test. +         +        Only one of error and details should be provided: conceptually there +        are two separate methods: +            addError(self, test, error) +            addError(self, test, details) + +        :param error: Standard unittest positional argument form - an +            exc_info tuple. +        :param details: New Testing-in-python drafted API; a dict from string +            to subunit.Content objects. +        """ +        self._addOutcome("error", test, error=error, details=details) + +    def addExpectedFailure(self, test, error=None, details=None): +        """Report an expected failure in test test. +         +        Only one of error and details should be provided: conceptually there +        are two separate methods: +            addError(self, test, error) +            addError(self, test, details) + +        :param error: Standard unittest positional argument form - an +            exc_info tuple. +        :param details: New Testing-in-python drafted API; a dict from string +            to subunit.Content objects. +        """ +        self._addOutcome("xfail", test, error=error, details=details) + +    def addFailure(self, test, error=None, details=None): +        """Report a failure in test test. +         +        Only one of error and details should be provided: conceptually there +        are two separate methods: +            addFailure(self, test, error) +            addFailure(self, test, details) + +        :param error: Standard unittest positional argument form - an +            exc_info tuple. +        :param details: New Testing-in-python drafted API; a dict from string +            to subunit.Content objects. +        """ +        self._addOutcome("failure", test, error=error, details=details) + +    def _addOutcome(self, outcome, test, error=None, details=None): +        """Report a failure in test test. +         +        Only one of error and details should be provided: conceptually there +        are two separate methods: +            addOutcome(self, test, error) +            addOutcome(self, test, details) + +        :param outcome: A string describing the outcome - used as the +            event name in the subunit stream. +        :param error: Standard unittest positional argument form - an +            exc_info tuple. +        :param details: New Testing-in-python drafted API; a dict from string +            to subunit.Content objects. +        """ +        self._stream.write("%s: %s" % (outcome, test.id())) +        if error is None and details is None: +            raise ValueError +        if error is not None: +            self._stream.write(" [\n") +            for line in self._exc_info_to_string(error, test).splitlines(): +                self._stream.write("%s\n" % line) +        else: +            self._write_details(details)          self._stream.write("]\n") -        super(TestProtocolClient, self).addError(test, error) -    def addFailure(self, test, error): -        """Report a failure in test test.""" -        self._stream.write("failure: %s [\n" % (test.shortDescription() or str(test))) -        for line in self._exc_info_to_string(error, test).splitlines(): -            self._stream.write("%s\n" % line) -        self._stream.write("]\n") -        super(TestProtocolClient, self).addFailure(test, error) +    def addSkip(self, test, reason=None, details=None): +        """Report a skipped test.""" +        if reason is None: +            self._addOutcome("skip", test, error=None, details=details) +        else: +            self._stream.write("skip: %s [\n" % test.id()) +            self._stream.write("%s\n" % reason) +            self._stream.write("]\n") -    def addSuccess(self, test): +    def addSuccess(self, test, details=None):          """Report a success in a test.""" -        self._stream.write("successful: %s\n" % (test.shortDescription() or str(test))) -        super(TestProtocolClient, self).addSuccess(test) +        self._stream.write("successful: %s" % test.id()) +        if not details: +            self._stream.write("\n") +        else: +            self._write_details(details) +            self._stream.write("]\n") +    addUnexpectedSuccess = addSuccess      def startTest(self, test):          """Mark a test as starting its test run.""" -        self._stream.write("test: %s\n" % (test.shortDescription() or str(test))) -        super(TestProtocolClient, self).startTest(test) +        self._stream.write("test: %s\n" % test.id()) + +    def progress(self, offset, whence): +        """Provide indication about the progress/length of the test run. + +        :param offset: Information about the number of tests remaining. If +            whence is PROGRESS_CUR, then offset increases/decreases the +            remaining test count. If whence is PROGRESS_SET, then offset +            specifies exactly the remaining test count. +        :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH, +            PROGRESS_POP. +        """ +        if whence == PROGRESS_CUR and offset > -1: +            prefix = "+" +        elif whence == PROGRESS_PUSH: +            prefix = "" +            offset = "push" +        elif whence == PROGRESS_POP: +            prefix = "" +            offset = "pop" +        else: +            prefix = "" +        self._stream.write("progress: %s%s\n" % (prefix, offset)) + +    def time(self, a_datetime): +        """Inform the client of the time. + +        ":param datetime: A datetime.datetime object. +        """ +        time = a_datetime.astimezone(iso8601.Utc()) +        self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % ( +            time.year, time.month, time.day, time.hour, time.minute, +            time.second, time.microsecond)) + +    def _write_details(self, details): +        """Output details to the stream. + +        :param details: An extended details dict for a test outcome. +        """ +        self._stream.write(" [ multipart\n") +        for name, content in sorted(details.iteritems()): +            self._stream.write("Content-Type: %s/%s" % +                (content.content_type.type, content.content_type.subtype)) +            parameters = content.content_type.parameters +            if parameters: +                self._stream.write(";") +                param_strs = [] +                for param, value in parameters.iteritems(): +                    param_strs.append("%s=%s" % (param, value)) +                self._stream.write(",".join(param_strs)) +            self._stream.write("\n%s\n" % name) +            encoder = chunked.Encoder(self._stream) +            map(encoder.write, content.iter_bytes()) +            encoder.close() + +    def done(self): +        """Obey the testtools result.done() interface."""  def RemoteError(description=""): -    if description == "": -        description = "\n" -    return (RemoteException, RemoteException(description), None) +    return (_StringException, _StringException(description), None)  class RemotedTestCase(unittest.TestCase): -    """A class to represent test cases run in child processes.""" +    """A class to represent test cases run in child processes. +     +    Instances of this class are used to provide the Python test API a TestCase +    that can be printed to the screen, introspected for metadata and so on. +    However, as they are a simply a memoisation of a test that was actually +    run in the past by a separate process, they cannot perform any interactive +    actions. +    """      def __eq__ (self, other):          try: @@ -272,7 +742,7 @@ class RemotedTestCase(unittest.TestCase):          return self.__description      def id(self): -        return "%s.%s" % (self._strclass(), self.__description) +        return "%s" % (self.__description,)      def __str__(self):          return "%s (%s)" % (self.__description, self._strclass()) @@ -318,12 +788,18 @@ class ExecTestCase(unittest.TestCase):      def _run(self, result):          protocol = TestProtocolServer(result) -        output = os.popen(self.script, mode='r') -        protocol.readFrom(output) +        output = subprocess.Popen(self.script, shell=True, +            stdout=subprocess.PIPE).communicate()[0] +        protocol.readFrom(StringIO(output))  class IsolatedTestCase(unittest.TestCase): -    """A TestCase which runs its tests in a forked process.""" +    """A TestCase which executes in a forked process. +     +    Each test gets its own process, which has a performance overhead but will +    provide excellent isolation from global state (such as django configs, +    zope utilities and so on). +    """      def run(self, result=None):          if result is None: result = self.defaultTestResult() @@ -331,7 +807,13 @@ class IsolatedTestCase(unittest.TestCase):  class IsolatedTestSuite(unittest.TestSuite): -    """A TestCase which runs its tests in a forked process.""" +    """A TestSuite which runs its tests in a forked process. +     +    This decorator that will fork() before running the tests and report the +    results from the child process using a Subunit stream.  This is useful for +    handling tests that mutate global state, or are testing C extensions that +    could crash the VM. +    """      def run(self, result=None):          if result is None: result = unittest.TestResult() @@ -376,13 +858,256 @@ def run_isolated(klass, self, result):      return result -class SubunitTestRunner(object): -    def __init__(self, stream=sys.stdout): -        self.stream = stream +def TAP2SubUnit(tap, subunit): +    """Filter a TAP pipe into a subunit pipe. +     +    :param tap: A tap pipe/stream/file object. +    :param subunit: A pipe/stream/file object to write subunit results to. +    :return: The exit code to exit with. +    """ +    BEFORE_PLAN = 0 +    AFTER_PLAN = 1 +    SKIP_STREAM = 2 +    client = TestProtocolClient(subunit) +    state = BEFORE_PLAN +    plan_start = 1 +    plan_stop = 0 +    def _skipped_test(subunit, plan_start): +        # Some tests were skipped. +        subunit.write('test test %d\n' % plan_start) +        subunit.write('error test %d [\n' % plan_start) +        subunit.write('test missing from TAP output\n') +        subunit.write(']\n') +        return plan_start + 1 +    # Test data for the next test to emit +    test_name = None +    log = [] +    result = None +    def _emit_test(): +        "write out a test" +        if test_name is None: +            return +        subunit.write("test %s\n" % test_name) +        if not log: +            subunit.write("%s %s\n" % (result, test_name)) +        else: +            subunit.write("%s %s [\n" % (result, test_name)) +        if log: +            for line in log: +                subunit.write("%s\n" % line) +            subunit.write("]\n") +        del log[:] +    for line in tap: +        if state == BEFORE_PLAN: +            match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line) +            if match: +                state = AFTER_PLAN +                _, plan_stop, comment = match.groups() +                plan_stop = int(plan_stop) +                if plan_start > plan_stop and plan_stop == 0: +                    # skipped file +                    state = SKIP_STREAM +                    subunit.write("test file skip\n") +                    subunit.write("skip file skip [\n") +                    subunit.write("%s\n" % comment) +                    subunit.write("]\n") +                continue +        # not a plan line, or have seen one before +        match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP)(?:\s+(.*))?)?\n", line) +        if match: +            # new test, emit current one. +            _emit_test() +            status, number, description, directive, directive_comment = match.groups() +            if status == 'ok': +                result = 'success' +            else: +                result = "failure" +            if description is None: +                description = '' +            else: +                description = ' ' + description +            if directive is not None: +                if directive == 'TODO': +                    result = 'xfail' +                elif directive == 'SKIP': +                    result = 'skip' +                if directive_comment is not None: +                    log.append(directive_comment) +            if number is not None: +                number = int(number) +                while plan_start < number: +                    plan_start = _skipped_test(subunit, plan_start) +            test_name = "test %d%s" % (plan_start, description) +            plan_start += 1 +            continue +        match = re.match("Bail out\!(?:\s*(.*))?\n", line) +        if match: +            reason, = match.groups() +            if reason is None: +                extra = '' +            else: +                extra = ' %s' % reason +            _emit_test() +            test_name = "Bail out!%s" % extra +            result = "error" +            state = SKIP_STREAM +            continue +        match = re.match("\#.*\n", line) +        if match: +            log.append(line[:-1]) +            continue +        subunit.write(line) +    _emit_test() +    while plan_start <= plan_stop: +        # record missed tests +        plan_start = _skipped_test(subunit, plan_start) +    return 0 + + +def tag_stream(original, filtered, tags): +    """Alter tags on a stream. + +    :param original: The input stream. +    :param filtered: The output stream. +    :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or +        '-TAG' commands. + +        A 'TAG' command will add the tag to the output stream, +        and override any existing '-TAG' command in that stream. +        Specifically: +         * A global 'tags: TAG' will be added to the start of the stream. +         * Any tags commands with -TAG will have the -TAG removed. + +        A '-TAG' command will remove the TAG command from the stream. +        Specifically: +         * A 'tags: -TAG' command will be added to the start of the stream. +         * Any 'tags: TAG' command will have 'TAG' removed from it. +        Additionally, any redundant tagging commands (adding a tag globally +        present, or removing a tag globally removed) are stripped as a +        by-product of the filtering. +    :return: 0 +    """ +    new_tags, gone_tags = tags_to_new_gone(tags) +    def write_tags(new_tags, gone_tags): +        if new_tags or gone_tags: +            filtered.write("tags: " + ' '.join(new_tags)) +            if gone_tags: +                for tag in gone_tags: +                    filtered.write("-" + tag) +            filtered.write("\n") +    write_tags(new_tags, gone_tags) +    # TODO: use the protocol parser and thus don't mangle test comments. +    for line in original: +        if line.startswith("tags:"): +            line_tags = line[5:].split() +            line_new, line_gone = tags_to_new_gone(line_tags) +            line_new = line_new - gone_tags +            line_gone = line_gone - new_tags +            write_tags(line_new, line_gone) +        else: +            filtered.write(line) +    return 0 + + +class ProtocolTestCase(object): +    """Subunit wire protocol to unittest.TestCase adapter. + +    ProtocolTestCase honours the core of ``unittest.TestCase`` protocol - +    calling a ProtocolTestCase or invoking the run() method will make a 'test +    run' happen. The 'test run' will simply be a replay of the test activity +    that has been encoded into the stream. The ``unittest.TestCase`` ``debug`` +    and ``countTestCases`` methods are not supported because there isn't a +    sensible mapping for those methods. +     +    # Get a stream (any object with a readline() method), in this case the +    # stream output by the example from ``subunit.TestProtocolClient``. +    stream = file('tests.log', 'rb') +    # Create a parser which will read from the stream and emit  +    # activity to a unittest.TestResult when run() is called. +    suite = subunit.ProtocolTestCase(stream) +    # Create a result object to accept the contents of that stream. +    result = unittest._TextTestResult(sys.stdout) +    # 'run' the tests - process the stream and feed its contents to result. +    suite.run(result) +    stream.close() + +    :seealso: TestProtocolServer (the subunit wire protocol parser). +    """ + +    def __init__(self, stream, passthrough=None, forward=False): +        """Create a ProtocolTestCase reading from stream. + +        :param stream: A filelike object which a subunit stream can be read +            from. +        :param passthrough: A stream pass non subunit input on to. If not +            supplied, the TestProtocolServer default is used. +        :param forward: A stream to pass subunit input on to. If not supplied +            subunit input is not forwarded. +        """ +        self._stream = stream +        self._passthrough = passthrough +        self._forward = forward + +    def __call__(self, result=None): +        return self.run(result) -    def run(self, test): -        "Run the given test case or test suite." -        result = TestProtocolClient(self.stream) -        test(result) -        return result +    def run(self, result=None): +        if result is None: +            result = self.defaultTestResult() +        protocol = TestProtocolServer(result, self._passthrough, self._forward) +        line = self._stream.readline() +        while line: +            protocol.lineReceived(line) +            line = self._stream.readline() +        protocol.lostConnection() + + +class TestResultStats(unittest.TestResult): +    """A pyunit TestResult interface implementation for making statistics. +     +    :ivar total_tests: The total tests seen. +    :ivar passed_tests: The tests that passed. +    :ivar failed_tests: The tests that failed. +    :ivar seen_tags: The tags seen across all tests. +    """ +    def __init__(self, stream): +        """Create a TestResultStats which outputs to stream.""" +        unittest.TestResult.__init__(self) +        self._stream = stream +        self.failed_tests = 0 +        self.skipped_tests = 0 +        self.seen_tags = set() + +    @property +    def total_tests(self): +        return self.testsRun + +    def addError(self, test, err, details=None): +        self.failed_tests += 1 + +    def addFailure(self, test, err, details=None): +        self.failed_tests += 1 + +    def addSkip(self, test, reason, details=None): +        self.skipped_tests += 1 + +    def formatStats(self): +        self._stream.write("Total tests:   %5d\n" % self.total_tests) +        self._stream.write("Passed tests:  %5d\n" % self.passed_tests) +        self._stream.write("Failed tests:  %5d\n" % self.failed_tests) +        self._stream.write("Skipped tests: %5d\n" % self.skipped_tests) +        tags = sorted(self.seen_tags) +        self._stream.write("Seen tags: %s\n" % (", ".join(tags))) + +    @property +    def passed_tests(self): +        return self.total_tests - self.failed_tests - self.skipped_tests + +    def tags(self, new_tags, gone_tags): +        """Accumulate the seen tags.""" +        self.seen_tags.update(new_tags) + +    def wasSuccessful(self): +        """Tells whether or not this result was a success""" +        return self.failed_tests == 0 diff --git a/lib/subunit/python/subunit/chunked.py b/lib/subunit/python/subunit/chunked.py new file mode 100644 index 0000000000..82e4b0ddfc --- /dev/null +++ b/lib/subunit/python/subunit/chunked.py @@ -0,0 +1,164 @@ +# +#  subunit: extensions to python unittest to get test results from subprocesses. +#  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net> +# +#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +#  license at the users choice. A copy of both licenses are available in the +#  project source as Apache-2.0 and BSD. You may not use this file except in +#  compliance with one of these two licences. +#   +#  Unless required by applicable law or agreed to in writing, software +#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +#  license you chose for the specific language governing permissions and +#  limitations under that license. +# + +"""Encoder/decoder for http style chunked encoding.""" + +class Decoder(object): +    """Decode chunked content to a byte stream.""" + +    def __init__(self, output): +        """Create a decoder decoding to output. + +        :param output: A file-like object. Bytes written to the Decoder are +            decoded to strip off the chunking and written to the output. +            Up to a full write worth of data or a single control line  may be +            buffered (whichever is larger). The close method should be called +            when no more data is available, to detect short streams; the +            write method will return none-None when the end of a stream is +            detected. +        """ +        self.output = output +        self.buffered_bytes = [] +        self.state = self._read_length +        self.body_length = 0 + +    def close(self): +        """Close the decoder. + +        :raises ValueError: If the stream is incomplete ValueError is raised. +        """ +        if self.state != self._finished: +            raise ValueError("incomplete stream") + +    def _finished(self): +        """Finished reading, return any remaining bytes.""" +        if self.buffered_bytes: +            buffered_bytes = self.buffered_bytes +            self.buffered_bytes = [] +            return ''.join(buffered_bytes) +        else: +            raise ValueError("stream is finished") + +    def _read_body(self): +        """Pass body bytes to the output.""" +        while self.body_length and self.buffered_bytes: +            if self.body_length >= len(self.buffered_bytes[0]): +                self.output.write(self.buffered_bytes[0]) +                self.body_length -= len(self.buffered_bytes[0]) +                del self.buffered_bytes[0] +                # No more data available. +                if not self.body_length: +                    self.state = self._read_length +            else: +                self.output.write(self.buffered_bytes[0][:self.body_length]) +                self.buffered_bytes[0] = \ +                    self.buffered_bytes[0][self.body_length:] +                self.body_length = 0 +                self.state = self._read_length +                return self.state() + +    def _read_length(self): +        """Try to decode a length from the bytes.""" +        count = -1 +        match_chars = "0123456789abcdefABCDEF\r\n" +        count_chars = [] +        for bytes in self.buffered_bytes: +            for byte in bytes: +                if byte not in match_chars: +                    break +                count_chars.append(byte) +                if byte == '\n': +                    break +        if not count_chars: +            return +        if count_chars[-1][-1] != '\n': +            return +        count_str = ''.join(count_chars) +        self.body_length = int(count_str[:-2], 16) +        excess_bytes = len(count_str) +        while excess_bytes: +            if excess_bytes >= len(self.buffered_bytes[0]): +                excess_bytes -= len(self.buffered_bytes[0]) +                del self.buffered_bytes[0] +            else: +                self.buffered_bytes[0] = self.buffered_bytes[0][excess_bytes:] +                excess_bytes = 0 +        if not self.body_length: +            self.state = self._finished +            if not self.buffered_bytes: +                # May not call into self._finished with no buffered data. +                return '' +        else: +            self.state = self._read_body +        return self.state() + +    def write(self, bytes): +        """Decode bytes to the output stream. +         +        :raises ValueError: If the stream has already seen the end of file +            marker. +        :returns: None, or the excess bytes beyond the end of file marker. +        """ +        if bytes: +            self.buffered_bytes.append(bytes) +        return self.state() + + +class Encoder(object): +    """Encode content to a stream using HTTP Chunked coding.""" + +    def __init__(self, output): +        """Create an encoder encoding to output. + +        :param output: A file-like object. Bytes written to the Encoder +            will be encoded using HTTP chunking. Small writes may be buffered +            and the ``close`` method must be called to finish the stream. +        """ +        self.output = output +        self.buffered_bytes = [] +        self.buffer_size = 0 + +    def flush(self, extra_len=0): +        """Flush the encoder to the output stream. +         +        :param extra_len: Increase the size of the chunk by this many bytes +            to allow for a subsequent write. +        """ +        if not self.buffer_size and not extra_len: +            return +        buffered_bytes = self.buffered_bytes +        buffer_size = self.buffer_size +        self.buffered_bytes = [] +        self.buffer_size = 0 +        self.output.write("%X\r\n" % (buffer_size + extra_len)) +        if buffer_size: +            self.output.write(''.join(buffered_bytes)) +        return True + +    def write(self, bytes): +        """Encode bytes to the output stream.""" +        bytes_len = len(bytes) +        if self.buffer_size + bytes_len >= 65536: +            self.flush(bytes_len) +            self.output.write(bytes) +        else: +            self.buffered_bytes.append(bytes) +            self.buffer_size += bytes_len + +    def close(self): +        """Finish the stream. This does not close the output stream.""" +        self.flush() +        self.output.write("0\r\n") diff --git a/lib/subunit/python/subunit/details.py b/lib/subunit/python/subunit/details.py new file mode 100644 index 0000000000..65a04046d9 --- /dev/null +++ b/lib/subunit/python/subunit/details.py @@ -0,0 +1,109 @@ +# +#  subunit: extensions to Python unittest to get test results from subprocesses. +#  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net> +# +#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +#  license at the users choice. A copy of both licenses are available in the +#  project source as Apache-2.0 and BSD. You may not use this file except in +#  compliance with one of these two licences. +#   +#  Unless required by applicable law or agreed to in writing, software +#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +#  license you chose for the specific language governing permissions and +#  limitations under that license. +# + +"""Handlers for outcome details.""" + +from cStringIO import StringIO + +from testtools import content, content_type + +import chunked + + +class DetailsParser(object): +    """Base class/API reference for details parsing.""" + + +class SimpleDetailsParser(DetailsParser): +    """Parser for single-part [] delimited details.""" + +    def __init__(self, state): +        self._message = "" +        self._state = state + +    def lineReceived(self, line): +        if line == "]\n": +            self._state.endDetails() +            return +        if line[0:2] == " ]": +            # quoted ] start +            self._message += line[1:] +        else: +            self._message += line + +    def get_details(self, style=None): +        result = {} +        if not style: +            result['traceback'] = content.Content( +                content_type.ContentType("text", "x-traceback"), +                lambda:[self._message]) +        else: +            if style == 'skip': +                name = 'reason' +            else: +                name = 'message' +            result[name] = content.Content( +                content_type.ContentType("text", "plain"), +                lambda:[self._message]) +        return result + +    def get_message(self): +        return self._message + + +class MultipartDetailsParser(DetailsParser): +    """Parser for multi-part [] surrounded MIME typed chunked details.""" + +    def __init__(self, state): +        self._state = state +        self._details = {} +        self._parse_state = self._look_for_content + +    def _look_for_content(self, line): +        if line == "]\n": +            self._state.endDetails() +            return +        # TODO error handling +        field, value = line[:-1].split(' ', 1) +        main, sub = value.split('/') +        self._content_type = content_type.ContentType(main, sub) +        self._parse_state = self._get_name + +    def _get_name(self, line): +        self._name = line[:-1] +        self._body = StringIO() +        self._chunk_parser = chunked.Decoder(self._body) +        self._parse_state = self._feed_chunks + +    def _feed_chunks(self, line): +        residue = self._chunk_parser.write(line) +        if residue is not None: +            # Line based use always ends on no residue. +            assert residue == '' +            body = self._body +            self._details[self._name] = content.Content( +                self._content_type, lambda:[body.getvalue()]) +            self._chunk_parser.close() +            self._parse_state = self._look_for_content + +    def get_details(self, for_skip=False): +        return self._details + +    def get_message(self): +        return None + +    def lineReceived(self, line): +        self._parse_state(line) diff --git a/lib/subunit/python/subunit/iso8601.py b/lib/subunit/python/subunit/iso8601.py new file mode 100644 index 0000000000..93c92fb516 --- /dev/null +++ b/lib/subunit/python/subunit/iso8601.py @@ -0,0 +1,123 @@ +# Copyright (c) 2007 Michael Twomey +#  +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +#  +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +#  +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +"""ISO 8601 date time string parsing + +Basic usage: +>>> import iso8601 +>>> iso8601.parse_date("2007-01-25T12:00:00Z") +datetime.datetime(2007, 1, 25, 12, 0, tzinfo=<iso8601.iso8601.Utc ...>) +>>> + +""" + +from datetime import datetime, timedelta, tzinfo +import re + +__all__ = ["parse_date", "ParseError"] + +# Adapted from http://delete.me.uk/2005/03/iso8601.html +ISO8601_REGEX = re.compile(r"(?P<year>[0-9]{4})(-(?P<month>[0-9]{1,2})(-(?P<day>[0-9]{1,2})" +    r"((?P<separator>.)(?P<hour>[0-9]{2}):(?P<minute>[0-9]{2})(:(?P<second>[0-9]{2})(\.(?P<fraction>[0-9]+))?)?" +    r"(?P<timezone>Z|(([-+])([0-9]{2}):([0-9]{2})))?)?)?)?" +) +TIMEZONE_REGEX = re.compile("(?P<prefix>[+-])(?P<hours>[0-9]{2}).(?P<minutes>[0-9]{2})") + +class ParseError(Exception): +    """Raised when there is a problem parsing a date string""" + +# Yoinked from python docs +ZERO = timedelta(0) +class Utc(tzinfo): +    """UTC +     +    """ +    def utcoffset(self, dt): +        return ZERO + +    def tzname(self, dt): +        return "UTC" + +    def dst(self, dt): +        return ZERO +UTC = Utc() + +class FixedOffset(tzinfo): +    """Fixed offset in hours and minutes from UTC +     +    """ +    def __init__(self, offset_hours, offset_minutes, name): +        self.__offset = timedelta(hours=offset_hours, minutes=offset_minutes) +        self.__name = name + +    def utcoffset(self, dt): +        return self.__offset + +    def tzname(self, dt): +        return self.__name + +    def dst(self, dt): +        return ZERO +     +    def __repr__(self): +        return "<FixedOffset %r>" % self.__name + +def parse_timezone(tzstring, default_timezone=UTC): +    """Parses ISO 8601 time zone specs into tzinfo offsets +     +    """ +    if tzstring == "Z": +        return default_timezone +    # This isn't strictly correct, but it's common to encounter dates without +    # timezones so I'll assume the default (which defaults to UTC). +    # Addresses issue 4. +    if tzstring is None: +        return default_timezone +    m = TIMEZONE_REGEX.match(tzstring) +    prefix, hours, minutes = m.groups() +    hours, minutes = int(hours), int(minutes) +    if prefix == "-": +        hours = -hours +        minutes = -minutes +    return FixedOffset(hours, minutes, tzstring) + +def parse_date(datestring, default_timezone=UTC): +    """Parses ISO 8601 dates into datetime objects +     +    The timezone is parsed from the date string. However it is quite common to +    have dates without a timezone (not strictly correct). In this case the +    default timezone specified in default_timezone is used. This is UTC by +    default. +    """ +    if not isinstance(datestring, basestring): +        raise ParseError("Expecting a string %r" % datestring) +    m = ISO8601_REGEX.match(datestring) +    if not m: +        raise ParseError("Unable to parse date string %r" % datestring) +    groups = m.groupdict() +    tz = parse_timezone(groups["timezone"], default_timezone=default_timezone) +    if groups["fraction"] is None: +        groups["fraction"] = 0 +    else: +        groups["fraction"] = int(float("0.%s" % groups["fraction"]) * 1e6) +    return datetime(int(groups["year"]), int(groups["month"]), int(groups["day"]), +        int(groups["hour"]), int(groups["minute"]), int(groups["second"]), +        int(groups["fraction"]), tz) diff --git a/lib/subunit/python/subunit/progress_model.py b/lib/subunit/python/subunit/progress_model.py new file mode 100644 index 0000000000..3a6af89a33 --- /dev/null +++ b/lib/subunit/python/subunit/progress_model.py @@ -0,0 +1,106 @@ +# +#  subunit: extensions to Python unittest to get test results from subprocesses. +#  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net> +# +#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +#  license at the users choice. A copy of both licenses are available in the +#  project source as Apache-2.0 and BSD. You may not use this file except in +#  compliance with one of these two licences. +#   +#  Unless required by applicable law or agreed to in writing, software +#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +#  license you chose for the specific language governing permissions and +#  limitations under that license. +# + +"""Support for dealing with progress state.""" + +class ProgressModel(object): +    """A model of progress indicators as subunit defines it. +     +    Instances of this class represent a single logical operation that is +    progressing. The operation may have many steps, and some of those steps may +    supply their own progress information. ProgressModel uses a nested concept +    where the overall state can be pushed, creating new starting state, and +    later pushed to return to the prior state. Many user interfaces will want +    to display an overall summary though, and accordingly the pos() and width() +    methods return overall summary information rather than information on the +    current subtask. + +    The default state is 0/0 - indicating that the overall progress is unknown. +    Anytime the denominator of pos/width is 0, rendering of a ProgressModel +    should should take this into consideration. + +    :ivar: _tasks. This private attribute stores the subtasks. Each is a tuple: +        pos, width, overall_numerator, overall_denominator. The overall fields +        store the calculated overall numerator and denominator for the state +        that was pushed. +    """ + +    def __init__(self): +        """Create a ProgressModel. +         +        The new model has no progress data at all - it will claim a summary +        width of zero and position of 0. +        """ +        self._tasks = [] +        self.push() + +    def adjust_width(self, offset): +        """Adjust the with of the current subtask.""" +        self._tasks[-1][1] += offset + +    def advance(self): +        """Advance the current subtask.""" +        self._tasks[-1][0] += 1 + +    def pop(self): +        """Pop a subtask off the ProgressModel. + +        See push for a description of how push and pop work. +        """ +        self._tasks.pop() + +    def pos(self): +        """Return how far through the operation has progressed.""" +        if not self._tasks: +            return 0 +        task = self._tasks[-1] +        if len(self._tasks) > 1: +            # scale up the overall pos by the current task or preserve it if +            # no current width is known. +            offset = task[2] * (task[1] or 1) +        else: +            offset = 0 +        return offset + task[0] + +    def push(self): +        """Push a new subtask. + +        After pushing a new subtask, the overall progress hasn't changed. Calls +        to adjust_width, advance, set_width will only after the progress within +        the range that calling 'advance' would have before - the subtask +        represents progressing one step in the earlier task. + +        Call pop() to restore the progress model to the state before push was +        called. +        """ +        self._tasks.append([0, 0, self.pos(), self.width()]) + +    def set_width(self, width): +        """Set the width of the current subtask.""" +        self._tasks[-1][1] = width + +    def width(self): +        """Return the total width of the operation.""" +        if not self._tasks: +            return 0 +        task = self._tasks[-1] +        if len(self._tasks) > 1: +            # scale up the overall width by the current task or preserve it if +            # no current width is known. +            return task[3] * (task[1] or 1) +        else: +            return task[1] + diff --git a/lib/subunit/python/subunit/run.py b/lib/subunit/python/subunit/run.py new file mode 100755 index 0000000000..2b90791d69 --- /dev/null +++ b/lib/subunit/python/subunit/run.py @@ -0,0 +1,46 @@ +#!/usr/bin/python +# +# Simple subunit testrunner for python +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007 +#    +#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +#  license at the users choice. A copy of both licenses are available in the +#  project source as Apache-2.0 and BSD. You may not use this file except in +#  compliance with one of these two licences. +#   +#  Unless required by applicable law or agreed to in writing, software +#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +#  license you chose for the specific language governing permissions and +#  limitations under that license. +# + +"""Run a unittest testcase reporting results as Subunit. + +  $ python -m subunit.run mylib.tests.test_suite +""" + +import sys + +from subunit import TestProtocolClient + + +class SubunitTestRunner(object): +    def __init__(self, stream=sys.stdout): +        self.stream = stream + +    def run(self, test): +        "Run the given test case or test suite." +        result = TestProtocolClient(self.stream) +        test(result) +        return result + + +if __name__ == '__main__': +    import optparse +    from unittest import TestProgram +    parser = optparse.OptionParser(__doc__) +    args = parser.parse_args()[1] +    runner = SubunitTestRunner() +    program = TestProgram(module=None, argv=[sys.argv[0]] + args, +                          testRunner=runner) diff --git a/lib/subunit/python/subunit/test_results.py b/lib/subunit/python/subunit/test_results.py new file mode 100644 index 0000000000..4ccc2aab35 --- /dev/null +++ b/lib/subunit/python/subunit/test_results.py @@ -0,0 +1,334 @@ +# +#  subunit: extensions to Python unittest to get test results from subprocesses. +#  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net> +# +#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +#  license at the users choice. A copy of both licenses are available in the +#  project source as Apache-2.0 and BSD. You may not use this file except in +#  compliance with one of these two licences. +#   +#  Unless required by applicable law or agreed to in writing, software +#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +#  license you chose for the specific language governing permissions and +#  limitations under that license. +# + +"""TestResult helper classes used to by subunit.""" + +import datetime + +import iso8601 +import testtools + +import subunit + + +# NOT a TestResult, because we are implementing the interface, not inheriting +# it. +class TestResultDecorator(object): +    """General pass-through decorator. + +    This provides a base that other TestResults can inherit from to  +    gain basic forwarding functionality. It also takes care of  +    handling the case where the target doesn't support newer methods +    or features by degrading them. +    """ + +    def __init__(self, decorated): +        """Create a TestResultDecorator forwarding to decorated.""" +        # Make every decorator degrade gracefully. +        self.decorated = testtools.ExtendedToOriginalDecorator(decorated) + +    def startTest(self, test): +        return self.decorated.startTest(test) + +    def startTestRun(self): +        return self.decorated.startTestRun() + +    def stopTest(self, test): +        return self.decorated.stopTest(test) + +    def stopTestRun(self): +        return self.decorated.stopTestRun() + +    def addError(self, test, err=None, details=None): +        return self.decorated.addError(test, err, details=details) + +    def addFailure(self, test, err=None, details=None): +        return self.decorated.addFailure(test, err, details=details) + +    def addSuccess(self, test, details=None): +        return self.decorated.addSuccess(test, details=details) + +    def addSkip(self, test, reason=None, details=None): +        return self.decorated.addSkip(test, reason, details=details) + +    def addExpectedFailure(self, test, err=None, details=None): +        return self.decorated.addExpectedFailure(test, err, details=details) + +    def addUnexpectedSuccess(self, test, details=None): +        return self.decorated.addUnexpectedSuccess(test, details=details) + +    def progress(self, offset, whence): +        return self.decorated.progress(offset, whence) + +    def wasSuccessful(self): +        return self.decorated.wasSuccessful() + +    @property +    def shouldStop(self): +        return self.decorated.shouldStop + +    def stop(self): +        return self.decorated.stop() + +    def tags(self, gone_tags, new_tags): +        return self.decorated.time(gone_tags, new_tags) + +    def time(self, a_datetime): +        return self.decorated.time(a_datetime) + + +class HookedTestResultDecorator(TestResultDecorator): +    """A TestResult which calls a hook on every event.""" + +    def __init__(self, decorated): +        self.super = super(HookedTestResultDecorator, self) +        self.super.__init__(decorated) + +    def startTest(self, test): +        self._before_event() +        return self.super.startTest(test) + +    def startTestRun(self): +        self._before_event() +        return self.super.startTestRun() + +    def stopTest(self, test): +        self._before_event() +        return self.super.stopTest(test) + +    def stopTestRun(self): +        self._before_event() +        return self.super.stopTestRun() + +    def addError(self, test, err=None, details=None): +        self._before_event() +        return self.super.addError(test, err, details=details) + +    def addFailure(self, test, err=None, details=None): +        self._before_event() +        return self.super.addFailure(test, err, details=details) + +    def addSuccess(self, test, details=None): +        self._before_event() +        return self.super.addSuccess(test, details=details) + +    def addSkip(self, test, reason=None, details=None): +        self._before_event() +        return self.super.addSkip(test, reason, details=details) + +    def addExpectedFailure(self, test, err=None, details=None): +        self._before_event() +        return self.super.addExpectedFailure(test, err, details=details) + +    def addUnexpectedSuccess(self, test, details=None): +        self._before_event() +        return self.super.addUnexpectedSuccess(test, details=details) + +    def progress(self, offset, whence): +        self._before_event() +        return self.super.progress(offset, whence) + +    def wasSuccessful(self): +        self._before_event() +        return self.super.wasSuccessful() + +    @property +    def shouldStop(self): +        self._before_event() +        return self.super.shouldStop + +    def stop(self): +        self._before_event() +        return self.super.stop() + +    def time(self, a_datetime): +        self._before_event() +        return self.super.time(a_datetime) + + +class AutoTimingTestResultDecorator(HookedTestResultDecorator): +    """Decorate a TestResult to add time events to a test run. + +    By default this will cause a time event before every test event, +    but if explicit time data is being provided by the test run, then +    this decorator will turn itself off to prevent causing confusion. +    """ + +    def __init__(self, decorated): +        self._time = None +        super(AutoTimingTestResultDecorator, self).__init__(decorated) + +    def _before_event(self): +        time = self._time +        if time is not None: +            return +        time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc()) +        self.decorated.time(time) + +    def progress(self, offset, whence): +        return self.decorated.progress(offset, whence) + +    @property +    def shouldStop(self): +        return self.decorated.shouldStop + +    def time(self, a_datetime): +        """Provide a timestamp for the current test activity. + +        :param a_datetime: If None, automatically add timestamps before every +            event (this is the default behaviour if time() is not called at +            all).  If not None, pass the provided time onto the decorated +            result object and disable automatic timestamps. +        """ +        self._time = a_datetime +        return self.decorated.time(a_datetime) + + +class TestResultFilter(TestResultDecorator): +    """A pyunit TestResult interface implementation which filters tests. + +    Tests that pass the filter are handed on to another TestResult instance +    for further processing/reporting. To obtain the filtered results,  +    the other instance must be interrogated. + +    :ivar result: The result that tests are passed to after filtering. +    :ivar filter_predicate: The callback run to decide whether to pass  +        a result. +    """ + +    def __init__(self, result, filter_error=False, filter_failure=False, +        filter_success=True, filter_skip=False, +        filter_predicate=None): +        """Create a FilterResult object filtering to result. +         +        :param filter_error: Filter out errors. +        :param filter_failure: Filter out failures. +        :param filter_success: Filter out successful tests. +        :param filter_skip: Filter out skipped tests. +        :param filter_predicate: A callable taking (test, outcome, err, +            details) and returning True if the result should be passed +            through.  err and details may be none if no error or extra +            metadata is available. outcome is the name of the outcome such +            as 'success' or 'failure'. +        """ +        TestResultDecorator.__init__(self, result) +        self._filter_error = filter_error +        self._filter_failure = filter_failure +        self._filter_success = filter_success +        self._filter_skip = filter_skip +        if filter_predicate is None: +            filter_predicate = lambda test, outcome, err, details: True +        self.filter_predicate = filter_predicate +        # The current test (for filtering tags) +        self._current_test = None +        # Has the current test been filtered (for outputting test tags) +        self._current_test_filtered = None +        # The (new, gone) tags for the current test. +        self._current_test_tags = None +         +    def addError(self, test, err=None, details=None): +        if (not self._filter_error and  +            self.filter_predicate(test, 'error', err, details)): +            self.decorated.startTest(test) +            self.decorated.addError(test, err, details=details) +        else: +            self._filtered() + +    def addFailure(self, test, err=None, details=None): +        if (not self._filter_failure and +            self.filter_predicate(test, 'failure', err, details)): +            self.decorated.startTest(test) +            self.decorated.addFailure(test, err, details=details) +        else: +            self._filtered() + +    def addSkip(self, test, reason=None, details=None): +        if (not self._filter_skip and +            self.filter_predicate(test, 'skip', reason, details)): +            self.decorated.startTest(test) +            self.decorated.addSkip(test, reason, details=details) +        else: +            self._filtered() + +    def addSuccess(self, test, details=None): +        if (not self._filter_success and +            self.filter_predicate(test, 'success', None, details)): +            self.decorated.startTest(test) +            self.decorated.addSuccess(test, details=details) +        else: +            self._filtered() + +    def addExpectedFailure(self, test, err=None, details=None): +        if self.filter_predicate(test, 'expectedfailure', err, details): +            self.decorated.startTest(test) +            return self.decorated.addExpectedFailure(test, err, +                details=details) +        else: +            self._filtered() + +    def addUnexpectedSuccess(self, test, details=None): +        self.decorated.startTest(test) +        return self.decorated.addUnexpectedSuccess(test, details=details) + +    def _filtered(self): +        self._current_test_filtered = True + +    def startTest(self, test): +        """Start a test. +         +        Not directly passed to the client, but used for handling of tags +        correctly. +        """ +        self._current_test = test +        self._current_test_filtered = False +        self._current_test_tags = set(), set() +     +    def stopTest(self, test): +        """Stop a test. +         +        Not directly passed to the client, but used for handling of tags +        correctly. +        """ +        if not self._current_test_filtered: +            # Tags to output for this test. +            if self._current_test_tags[0] or self._current_test_tags[1]: +                self.decorated.tags(*self._current_test_tags) +            self.decorated.stopTest(test) +        self._current_test = None +        self._current_test_filtered = None +        self._current_test_tags = None + +    def tags(self, new_tags, gone_tags): +        """Handle tag instructions. + +        Adds and removes tags as appropriate. If a test is currently running, +        tags are not affected for subsequent tests. +         +        :param new_tags: Tags to add, +        :param gone_tags: Tags to remove. +        """ +        if self._current_test is not None: +            # gather the tags until the test stops. +            self._current_test_tags[0].update(new_tags) +            self._current_test_tags[0].difference_update(gone_tags) +            self._current_test_tags[1].update(gone_tags) +            self._current_test_tags[1].difference_update(new_tags) +        return self.decorated.tags(new_tags, gone_tags) + +    def id_to_orig_id(self, id): +        if id.startswith("subunit.RemotedTestCase."): +            return id[len("subunit.RemotedTestCase."):] +        return id diff --git a/lib/subunit/python/subunit/tests/__init__.py b/lib/subunit/python/subunit/tests/__init__.py index 544d0e704f..a78cec8572 100644 --- a/lib/subunit/python/subunit/tests/__init__.py +++ b/lib/subunit/python/subunit/tests/__init__.py @@ -2,24 +2,40 @@  #  subunit: extensions to python unittest to get test results from subprocesses.  #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>  # -#  This program is free software; you can redistribute it and/or modify -#  it under the terms of the GNU General Public License as published by -#  the Free Software Foundation; either version 2 of the License, or -#  (at your option) any later version. -# -#  This program is distributed in the hope that it will be useful, -#  but WITHOUT ANY WARRANTY; without even the implied warranty of -#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -#  GNU General Public License for more details. -# -#  You should have received a copy of the GNU General Public License -#  along with this program; if not, write to the Free Software -#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA +#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +#  license at the users choice. A copy of both licenses are available in the +#  project source as Apache-2.0 and BSD. You may not use this file except in +#  compliance with one of these two licences. +#   +#  Unless required by applicable law or agreed to in writing, software +#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +#  license you chose for the specific language governing permissions and +#  limitations under that license.  # -from subunit.tests import TestUtil, test_test_protocol +from subunit.tests import ( +    TestUtil, +    test_chunked, +    test_details, +    test_progress_model, +    test_subunit_filter, +    test_subunit_stats, +    test_subunit_tags, +    test_tap2subunit, +    test_test_protocol, +    test_test_results, +    )  def test_suite():      result = TestUtil.TestSuite() +    result.addTest(test_chunked.test_suite()) +    result.addTest(test_details.test_suite()) +    result.addTest(test_progress_model.test_suite()) +    result.addTest(test_test_results.test_suite())      result.addTest(test_test_protocol.test_suite()) +    result.addTest(test_tap2subunit.test_suite()) +    result.addTest(test_subunit_filter.test_suite()) +    result.addTest(test_subunit_tags.test_suite()) +    result.addTest(test_subunit_stats.test_suite())      return result diff --git a/lib/subunit/python/subunit/tests/sample-script.py b/lib/subunit/python/subunit/tests/sample-script.py index 223d2f5d9f..0ee019ae4a 100755 --- a/lib/subunit/python/subunit/tests/sample-script.py +++ b/lib/subunit/python/subunit/tests/sample-script.py @@ -1,5 +1,12 @@  #!/usr/bin/env python  import sys +if len(sys.argv) == 2: +    # subunit.tests.test_test_protocol.TestExecTestCase.test_sample_method_args  +    # uses this code path to be sure that the arguments were passed to +    # sample-script.py +    print "test fail" +    print "error fail" +    sys.exit(0)  print "test old mcdonald"  print "success old mcdonald"  print "test bing crosby" diff --git a/lib/subunit/python/subunit/tests/test_chunked.py b/lib/subunit/python/subunit/tests/test_chunked.py new file mode 100644 index 0000000000..a24e31e0c2 --- /dev/null +++ b/lib/subunit/python/subunit/tests/test_chunked.py @@ -0,0 +1,127 @@ +# +#  subunit: extensions to python unittest to get test results from subprocesses. +#  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net> +# +#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +#  license at the users choice. A copy of both licenses are available in the +#  project source as Apache-2.0 and BSD. You may not use this file except in +#  compliance with one of these two licences. +#   +#  Unless required by applicable law or agreed to in writing, software +#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +#  license you chose for the specific language governing permissions and +#  limitations under that license. +# + +from cStringIO import StringIO +import unittest + +import subunit.chunked + + +def test_suite(): +    loader = subunit.tests.TestUtil.TestLoader() +    result = loader.loadTestsFromName(__name__) +    return result + + +class TestDecode(unittest.TestCase): + +    def setUp(self): +        unittest.TestCase.setUp(self) +        self.output = StringIO() +        self.decoder = subunit.chunked.Decoder(self.output) + +    def test_close_read_length_short_errors(self): +        self.assertRaises(ValueError, self.decoder.close) + +    def test_close_body_short_errors(self): +        self.assertEqual(None, self.decoder.write('2\r\na')) +        self.assertRaises(ValueError, self.decoder.close) + +    def test_close_body_buffered_data_errors(self): +        self.assertEqual(None, self.decoder.write('2\r')) +        self.assertRaises(ValueError, self.decoder.close) + +    def test_close_after_finished_stream_safe(self): +        self.assertEqual(None, self.decoder.write('2\r\nab')) +        self.assertEqual('', self.decoder.write('0\r\n')) +        self.decoder.close() + +    def test_decode_nothing(self): +        self.assertEqual('', self.decoder.write('0\r\n')) +        self.assertEqual('', self.output.getvalue()) + +    def test_decode_serialised_form(self): +        self.assertEqual(None, self.decoder.write("F\r\n")) +        self.assertEqual(None, self.decoder.write("serialised\n")) +        self.assertEqual('', self.decoder.write("form0\r\n")) + +    def test_decode_short(self): +        self.assertEqual('', self.decoder.write('3\r\nabc0\r\n')) +        self.assertEqual('abc', self.output.getvalue()) + +    def test_decode_combines_short(self): +        self.assertEqual('', self.decoder.write('6\r\nabcdef0\r\n')) +        self.assertEqual('abcdef', self.output.getvalue()) + +    def test_decode_excess_bytes_from_write(self): +        self.assertEqual('1234', self.decoder.write('3\r\nabc0\r\n1234')) +        self.assertEqual('abc', self.output.getvalue()) + +    def test_decode_write_after_finished_errors(self): +        self.assertEqual('1234', self.decoder.write('3\r\nabc0\r\n1234')) +        self.assertRaises(ValueError, self.decoder.write, '') + +    def test_decode_hex(self): +        self.assertEqual('', self.decoder.write('A\r\n12345678900\r\n')) +        self.assertEqual('1234567890', self.output.getvalue()) + +    def test_decode_long_ranges(self): +        self.assertEqual(None, self.decoder.write('10000\r\n')) +        self.assertEqual(None, self.decoder.write('1' * 65536)) +        self.assertEqual(None, self.decoder.write('10000\r\n')) +        self.assertEqual(None, self.decoder.write('2' * 65536)) +        self.assertEqual('', self.decoder.write('0\r\n')) +        self.assertEqual('1' * 65536 + '2' * 65536, self.output.getvalue()) + + +class TestEncode(unittest.TestCase): + +    def setUp(self): +        unittest.TestCase.setUp(self) +        self.output = StringIO() +        self.encoder = subunit.chunked.Encoder(self.output) + +    def test_encode_nothing(self): +        self.encoder.close() +        self.assertEqual('0\r\n', self.output.getvalue()) + +    def test_encode_empty(self): +        self.encoder.write('') +        self.encoder.close() +        self.assertEqual('0\r\n', self.output.getvalue()) + +    def test_encode_short(self): +        self.encoder.write('abc') +        self.encoder.close() +        self.assertEqual('3\r\nabc0\r\n', self.output.getvalue()) + +    def test_encode_combines_short(self): +        self.encoder.write('abc') +        self.encoder.write('def') +        self.encoder.close() +        self.assertEqual('6\r\nabcdef0\r\n', self.output.getvalue()) + +    def test_encode_over_9_is_in_hex(self): +        self.encoder.write('1234567890') +        self.encoder.close() +        self.assertEqual('A\r\n12345678900\r\n', self.output.getvalue()) + +    def test_encode_long_ranges_not_combined(self): +        self.encoder.write('1' * 65536) +        self.encoder.write('2' * 65536) +        self.encoder.close() +        self.assertEqual('10000\r\n' + '1' * 65536 + '10000\r\n' + +            '2' * 65536 + '0\r\n', self.output.getvalue()) diff --git a/lib/subunit/python/subunit/tests/test_details.py b/lib/subunit/python/subunit/tests/test_details.py new file mode 100644 index 0000000000..2700d4afc7 --- /dev/null +++ b/lib/subunit/python/subunit/tests/test_details.py @@ -0,0 +1,110 @@ +# +#  subunit: extensions to python unittest to get test results from subprocesses. +#  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net> +# +#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +#  license at the users choice. A copy of both licenses are available in the +#  project source as Apache-2.0 and BSD. You may not use this file except in +#  compliance with one of these two licences. +#   +#  Unless required by applicable law or agreed to in writing, software +#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +#  license you chose for the specific language governing permissions and +#  limitations under that license. +# + +from cStringIO import StringIO +import unittest + +import subunit.tests +from subunit import content, content_type, details + + +def test_suite(): +    loader = subunit.tests.TestUtil.TestLoader() +    result = loader.loadTestsFromName(__name__) +    return result + + +class TestSimpleDetails(unittest.TestCase): + +    def test_lineReceived(self): +        parser = details.SimpleDetailsParser(None) +        parser.lineReceived("foo\n") +        parser.lineReceived("bar\n") +        self.assertEqual("foo\nbar\n", parser._message) + +    def test_lineReceived_escaped_bracket(self): +        parser = details.SimpleDetailsParser(None) +        parser.lineReceived("foo\n") +        parser.lineReceived(" ]are\n") +        parser.lineReceived("bar\n") +        self.assertEqual("foo\n]are\nbar\n", parser._message) + +    def test_get_message(self): +        parser = details.SimpleDetailsParser(None) +        self.assertEqual("", parser.get_message()) + +    def test_get_details(self): +        parser = details.SimpleDetailsParser(None) +        traceback = "" +        expected = {} +        expected['traceback'] = content.Content( +            content_type.ContentType("text", "x-traceback"), +            lambda:[""]) +        found = parser.get_details() +        self.assertEqual(expected.keys(), found.keys()) +        self.assertEqual(expected['traceback'].content_type, +            found['traceback'].content_type) +        self.assertEqual(''.join(expected['traceback'].iter_bytes()), +            ''.join(found['traceback'].iter_bytes())) + +    def test_get_details_skip(self): +        parser = details.SimpleDetailsParser(None) +        traceback = "" +        expected = {} +        expected['reason'] = content.Content( +            content_type.ContentType("text", "plain"), +            lambda:[""]) +        found = parser.get_details("skip") +        self.assertEqual(expected, found) + +    def test_get_details_success(self): +        parser = details.SimpleDetailsParser(None) +        traceback = "" +        expected = {} +        expected['message'] = content.Content( +            content_type.ContentType("text", "plain"), +            lambda:[""]) +        found = parser.get_details("success") +        self.assertEqual(expected, found) + + +class TestMultipartDetails(unittest.TestCase): + +    def test_get_message_is_None(self): +        parser = details.MultipartDetailsParser(None) +        self.assertEqual(None, parser.get_message()) + +    def test_get_details(self): +        parser = details.MultipartDetailsParser(None) +        self.assertEqual({}, parser.get_details()) + +    def test_parts(self): +        parser = details.MultipartDetailsParser(None) +        parser.lineReceived("Content-Type: text/plain\n") +        parser.lineReceived("something\n") +        parser.lineReceived("F\r\n") +        parser.lineReceived("serialised\n") +        parser.lineReceived("form0\r\n") +        expected = {} +        expected['something'] = content.Content( +            content_type.ContentType("text", "plain"), +            lambda:["serialised\nform"]) +        found = parser.get_details() +        self.assertEqual(expected.keys(), found.keys()) +        self.assertEqual(expected['something'].content_type, +            found['something'].content_type) +        self.assertEqual(''.join(expected['something'].iter_bytes()), +            ''.join(found['something'].iter_bytes())) diff --git a/lib/subunit/python/subunit/tests/test_progress_model.py b/lib/subunit/python/subunit/tests/test_progress_model.py new file mode 100644 index 0000000000..76200c6107 --- /dev/null +++ b/lib/subunit/python/subunit/tests/test_progress_model.py @@ -0,0 +1,118 @@ +# +#  subunit: extensions to Python unittest to get test results from subprocesses. +#  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net> +# +#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +#  license at the users choice. A copy of both licenses are available in the +#  project source as Apache-2.0 and BSD. You may not use this file except in +#  compliance with one of these two licences. +#   +#  Unless required by applicable law or agreed to in writing, software +#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +#  license you chose for the specific language governing permissions and +#  limitations under that license. +# + +import unittest + +import subunit +from subunit.progress_model import ProgressModel + + +class TestProgressModel(unittest.TestCase): + +    def assertProgressSummary(self, pos, total, progress): +        """Assert that a progress model has reached a particular point.""" +        self.assertEqual(pos, progress.pos()) +        self.assertEqual(total, progress.width()) + +    def test_new_progress_0_0(self): +        progress = ProgressModel() +        self.assertProgressSummary(0, 0, progress) + +    def test_advance_0_0(self): +        progress = ProgressModel() +        progress.advance() +        self.assertProgressSummary(1, 0, progress) + +    def test_advance_1_0(self): +        progress = ProgressModel() +        progress.advance() +        self.assertProgressSummary(1, 0, progress) + +    def test_set_width_absolute(self): +        progress = ProgressModel() +        progress.set_width(10) +        self.assertProgressSummary(0, 10, progress) + +    def test_set_width_absolute_preserves_pos(self): +        progress = ProgressModel() +        progress.advance() +        progress.set_width(2) +        self.assertProgressSummary(1, 2, progress) + +    def test_adjust_width(self): +        progress = ProgressModel() +        progress.adjust_width(10) +        self.assertProgressSummary(0, 10, progress) +        progress.adjust_width(-10) +        self.assertProgressSummary(0, 0, progress) + +    def test_adjust_width_preserves_pos(self): +        progress = ProgressModel() +        progress.advance() +        progress.adjust_width(10) +        self.assertProgressSummary(1, 10, progress) +        progress.adjust_width(-10) +        self.assertProgressSummary(1, 0, progress) + +    def test_push_preserves_progress(self): +        progress = ProgressModel() +        progress.adjust_width(3) +        progress.advance() +        progress.push() +        self.assertProgressSummary(1, 3, progress) + +    def test_advance_advances_substack(self): +        progress = ProgressModel() +        progress.adjust_width(3) +        progress.advance() +        progress.push() +        progress.adjust_width(1) +        progress.advance() +        self.assertProgressSummary(2, 3, progress) + +    def test_adjust_width_adjusts_substack(self): +        progress = ProgressModel() +        progress.adjust_width(3) +        progress.advance() +        progress.push() +        progress.adjust_width(2) +        progress.advance() +        self.assertProgressSummary(3, 6, progress) + +    def test_set_width_adjusts_substack(self): +        progress = ProgressModel() +        progress.adjust_width(3) +        progress.advance() +        progress.push() +        progress.set_width(2) +        progress.advance() +        self.assertProgressSummary(3, 6, progress) + +    def test_pop_restores_progress(self): +        progress = ProgressModel() +        progress.adjust_width(3) +        progress.advance() +        progress.push() +        progress.adjust_width(1) +        progress.advance() +        progress.pop() +        self.assertProgressSummary(1, 3, progress) + + +def test_suite(): +    loader = subunit.tests.TestUtil.TestLoader() +    result = loader.loadTestsFromName(__name__) +    return result diff --git a/lib/subunit/python/subunit/tests/test_subunit_filter.py b/lib/subunit/python/subunit/tests/test_subunit_filter.py new file mode 100644 index 0000000000..3c65ed3afc --- /dev/null +++ b/lib/subunit/python/subunit/tests/test_subunit_filter.py @@ -0,0 +1,136 @@ +# +#  subunit: extensions to python unittest to get test results from subprocesses. +#  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net> +# +#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +#  license at the users choice. A copy of both licenses are available in the +#  project source as Apache-2.0 and BSD. You may not use this file except in +#  compliance with one of these two licences. +#   +#  Unless required by applicable law or agreed to in writing, software +#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +#  license you chose for the specific language governing permissions and +#  limitations under that license. +# + +"""Tests for subunit.TestResultFilter.""" + +import unittest +from StringIO import StringIO + +import subunit +from subunit.test_results import TestResultFilter + + +class TestTestResultFilter(unittest.TestCase): +    """Test for TestResultFilter, a TestResult object which filters tests.""" + +    def _setUp(self): +        self.output = StringIO() + +    def test_default(self): +        """The default is to exclude success and include everything else.""" +        self.filtered_result = unittest.TestResult() +        self.filter = TestResultFilter(self.filtered_result) +        self.run_tests() +        # skips are seen as success by default python TestResult. +        self.assertEqual(['error'], +            [error[0].id() for error in self.filtered_result.errors]) +        self.assertEqual(['failed'], +            [failure[0].id() for failure in +            self.filtered_result.failures]) +        self.assertEqual(4, self.filtered_result.testsRun) + +    def test_exclude_errors(self): +        self.filtered_result = unittest.TestResult() +        self.filter = TestResultFilter(self.filtered_result, +            filter_error=True) +        self.run_tests() +        # skips are seen as errors by default python TestResult. +        self.assertEqual([], self.filtered_result.errors) +        self.assertEqual(['failed'], +            [failure[0].id() for failure in +            self.filtered_result.failures]) +        self.assertEqual(3, self.filtered_result.testsRun) + +    def test_exclude_failure(self): +        self.filtered_result = unittest.TestResult() +        self.filter = TestResultFilter(self.filtered_result, +            filter_failure=True) +        self.run_tests() +        self.assertEqual(['error'], +            [error[0].id() for error in self.filtered_result.errors]) +        self.assertEqual([], +            [failure[0].id() for failure in +            self.filtered_result.failures]) +        self.assertEqual(3, self.filtered_result.testsRun) + +    def test_exclude_skips(self): +        self.filtered_result = subunit.TestResultStats(None) +        self.filter = TestResultFilter(self.filtered_result, +            filter_skip=True) +        self.run_tests() +        self.assertEqual(0, self.filtered_result.skipped_tests) +        self.assertEqual(2, self.filtered_result.failed_tests) +        self.assertEqual(3, self.filtered_result.testsRun) + +    def test_include_success(self): +        """Success's can be included if requested.""" +        self.filtered_result = unittest.TestResult() +        self.filter = TestResultFilter(self.filtered_result, +            filter_success=False) +        self.run_tests() +        self.assertEqual(['error'], +            [error[0].id() for error in self.filtered_result.errors]) +        self.assertEqual(['failed'], +            [failure[0].id() for failure in +            self.filtered_result.failures]) +        self.assertEqual(5, self.filtered_result.testsRun) + +    def test_filter_predicate(self): +        """You can filter by predicate callbacks""" +        self.filtered_result = unittest.TestResult() +        def filter_cb(test, outcome, err, details): +            return outcome == 'success' +        self.filter = TestResultFilter(self.filtered_result, +            filter_predicate=filter_cb, +            filter_success=False) +        self.run_tests() +        # Only success should pass +        self.assertEqual(1, self.filtered_result.testsRun) + +    def run_tests(self): +        self.setUpTestStream() +        self.test = subunit.ProtocolTestCase(self.input_stream) +        self.test.run(self.filter) + +    def setUpTestStream(self): +        # While TestResultFilter works on python objects, using a subunit +        # stream is an easy pithy way of getting a series of test objects to +        # call into the TestResult, and as TestResultFilter is intended for +        # use with subunit also has the benefit of detecting any interface +        # skew issues. +        self.input_stream = StringIO() +        self.input_stream.write("""tags: global +test passed +success passed +test failed +tags: local +failure failed +test error +error error [ +error details +] +test skipped +skip skipped +test todo +xfail todo +""") +        self.input_stream.seek(0) +     + +def test_suite(): +    loader = subunit.tests.TestUtil.TestLoader() +    result = loader.loadTestsFromName(__name__) +    return result diff --git a/lib/subunit/python/subunit/tests/test_subunit_stats.py b/lib/subunit/python/subunit/tests/test_subunit_stats.py new file mode 100644 index 0000000000..a7f8fca675 --- /dev/null +++ b/lib/subunit/python/subunit/tests/test_subunit_stats.py @@ -0,0 +1,83 @@ +# +#  subunit: extensions to python unittest to get test results from subprocesses. +#  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net> +# +#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +#  license at the users choice. A copy of both licenses are available in the +#  project source as Apache-2.0 and BSD. You may not use this file except in +#  compliance with one of these two licences. +#   +#  Unless required by applicable law or agreed to in writing, software +#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +#  license you chose for the specific language governing permissions and +#  limitations under that license. +# + +"""Tests for subunit.TestResultStats.""" + +import unittest +from StringIO import StringIO + +import subunit + + +class TestTestResultStats(unittest.TestCase): +    """Test for TestResultStats, a TestResult object that generates stats.""" + +    def setUp(self): +        self.output = StringIO() +        self.result = subunit.TestResultStats(self.output) +        self.input_stream = StringIO() +        self.test = subunit.ProtocolTestCase(self.input_stream) + +    def test_stats_empty(self): +        self.test.run(self.result) +        self.assertEqual(0, self.result.total_tests) +        self.assertEqual(0, self.result.passed_tests) +        self.assertEqual(0, self.result.failed_tests) +        self.assertEqual(set(), self.result.seen_tags) + +    def setUpUsedStream(self): +        self.input_stream.write("""tags: global +test passed +success passed +test failed +tags: local +failure failed +test error +error error +test skipped +skip skipped +test todo +xfail todo +""") +        self.input_stream.seek(0) +        self.test.run(self.result) +     +    def test_stats_smoke_everything(self): +        # Statistics are calculated usefully. +        self.setUpUsedStream() +        self.assertEqual(5, self.result.total_tests) +        self.assertEqual(2, self.result.passed_tests) +        self.assertEqual(2, self.result.failed_tests) +        self.assertEqual(1, self.result.skipped_tests) +        self.assertEqual(set(["global", "local"]), self.result.seen_tags) + +    def test_stat_formatting(self): +        expected = (""" +Total tests:       5 +Passed tests:      2 +Failed tests:      2 +Skipped tests:     1 +Seen tags: global, local +""")[1:] +        self.setUpUsedStream() +        self.result.formatStats() +        self.assertEqual(expected, self.output.getvalue()) + + +def test_suite(): +    loader = subunit.tests.TestUtil.TestLoader() +    result = loader.loadTestsFromName(__name__) +    return result diff --git a/lib/subunit/python/subunit/tests/test_subunit_tags.py b/lib/subunit/python/subunit/tests/test_subunit_tags.py new file mode 100644 index 0000000000..227e2b7475 --- /dev/null +++ b/lib/subunit/python/subunit/tests/test_subunit_tags.py @@ -0,0 +1,68 @@ +# +#  subunit: extensions to python unittest to get test results from subprocesses. +#  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net> +# +#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +#  license at the users choice. A copy of both licenses are available in the +#  project source as Apache-2.0 and BSD. You may not use this file except in +#  compliance with one of these two licences. +#   +#  Unless required by applicable law or agreed to in writing, software +#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +#  license you chose for the specific language governing permissions and +#  limitations under that license. +# + +"""Tests for subunit.tag_stream.""" + +import unittest +from StringIO import StringIO + +import subunit +import subunit.test_results + + +class TestSubUnitTags(unittest.TestCase): + +    def setUp(self): +        self.original = StringIO() +        self.filtered = StringIO() + +    def test_add_tag(self): +        self.original.write("tags: foo\n") +        self.original.write("test: test\n") +        self.original.write("tags: bar -quux\n") +        self.original.write("success: test\n") +        self.original.seek(0) +        result = subunit.tag_stream(self.original, self.filtered, ["quux"]) +        self.assertEqual([ +            "tags: quux", +            "tags: foo", +            "test: test", +            "tags: bar", +            "success: test", +            ], +            self.filtered.getvalue().splitlines()) + +    def test_remove_tag(self): +        self.original.write("tags: foo\n") +        self.original.write("test: test\n") +        self.original.write("tags: bar -quux\n") +        self.original.write("success: test\n") +        self.original.seek(0) +        result = subunit.tag_stream(self.original, self.filtered, ["-bar"]) +        self.assertEqual([ +            "tags: -bar", +            "tags: foo", +            "test: test", +            "tags: -quux", +            "success: test", +            ], +            self.filtered.getvalue().splitlines()) + + +def test_suite(): +    loader = subunit.tests.TestUtil.TestLoader() +    result = loader.loadTestsFromName(__name__) +    return result diff --git a/lib/subunit/python/subunit/tests/test_tap2subunit.py b/lib/subunit/python/subunit/tests/test_tap2subunit.py new file mode 100644 index 0000000000..febfe9d441 --- /dev/null +++ b/lib/subunit/python/subunit/tests/test_tap2subunit.py @@ -0,0 +1,432 @@ +# +#  subunit: extensions to python unittest to get test results from subprocesses. +#  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net> +# +#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +#  license at the users choice. A copy of both licenses are available in the +#  project source as Apache-2.0 and BSD. You may not use this file except in +#  compliance with one of these two licences. +#   +#  Unless required by applicable law or agreed to in writing, software +#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +#  license you chose for the specific language governing permissions and +#  limitations under that license. +# + +"""Tests for TAP2SubUnit.""" + +import unittest +from StringIO import StringIO +import os +import subunit +import sys + + +class TestTAP2SubUnit(unittest.TestCase): +    """Tests for TAP2SubUnit. + +    These tests test TAP string data in, and subunit string data out. +    This is ok because the subunit protocol is intended to be stable, +    but it might be easier/pithier to write tests against TAP string in, +    parsed subunit objects out (by hooking the subunit stream to a subunit +    protocol server. +    """ + +    def setUp(self): +        self.tap = StringIO() +        self.subunit = StringIO() + +    def test_skip_entire_file(self): +        # A file +        # 1..- # Skipped: comment +        # results in a single skipped test. +        self.tap.write("1..0 # Skipped: entire file skipped\n") +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            "test file skip", +            "skip file skip [", +            "Skipped: entire file skipped", +            "]", +            ], +            self.subunit.getvalue().splitlines()) + +    def test_ok_test_pass(self): +        # A file +        # ok +        # results in a passed test with name 'test 1' (a synthetic name as tap +        # does not require named fixtures - it is the first test in the tap +        # stream). +        self.tap.write("ok\n") +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            "test test 1", +            "success test 1", +            ], +            self.subunit.getvalue().splitlines()) + +    def test_ok_test_number_pass(self): +        # A file +        # ok 1 +        # results in a passed test with name 'test 1' +        self.tap.write("ok 1\n") +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            "test test 1", +            "success test 1", +            ], +            self.subunit.getvalue().splitlines()) + +    def test_ok_test_number_description_pass(self): +        # A file +        # ok 1 - There is a description +        # results in a passed test with name 'test 1 - There is a description' +        self.tap.write("ok 1 - There is a description\n") +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            "test test 1 - There is a description", +            "success test 1 - There is a description", +            ], +            self.subunit.getvalue().splitlines()) + +    def test_ok_test_description_pass(self): +        # A file +        # ok There is a description +        # results in a passed test with name 'test 1 There is a description' +        self.tap.write("ok There is a description\n") +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            "test test 1 There is a description", +            "success test 1 There is a description", +            ], +            self.subunit.getvalue().splitlines()) + +    def test_ok_SKIP_skip(self): +        # A file +        # ok # SKIP +        # results in a skkip test with name 'test 1' +        self.tap.write("ok # SKIP\n") +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            "test test 1", +            "skip test 1", +            ], +            self.subunit.getvalue().splitlines()) + +    def test_ok_number_description_SKIP_skip_comment(self): +        # A file +        # ok 1 foo  # SKIP Not done yet +        # results in a skip test with name 'test 1 foo' and a log of +        # Not done yet +        self.tap.write("ok 1 foo  # SKIP Not done yet\n") +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            "test test 1 foo", +            "skip test 1 foo [", +            "Not done yet", +            "]", +            ], +            self.subunit.getvalue().splitlines()) + +    def test_ok_SKIP_skip_comment(self): +        # A file +        # ok # SKIP Not done yet +        # results in a skip test with name 'test 1' and a log of Not done yet +        self.tap.write("ok # SKIP Not done yet\n") +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            "test test 1", +            "skip test 1 [", +            "Not done yet", +            "]", +            ], +            self.subunit.getvalue().splitlines()) + +    def test_ok_TODO_xfail(self): +        # A file +        # ok # TODO +        # results in a xfail test with name 'test 1' +        self.tap.write("ok # TODO\n") +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            "test test 1", +            "xfail test 1", +            ], +            self.subunit.getvalue().splitlines()) + +    def test_ok_TODO_xfail_comment(self): +        # A file +        # ok # TODO Not done yet +        # results in a xfail test with name 'test 1' and a log of Not done yet +        self.tap.write("ok # TODO Not done yet\n") +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            "test test 1", +            "xfail test 1 [", +            "Not done yet", +            "]", +            ], +            self.subunit.getvalue().splitlines()) + +    def test_bail_out_errors(self): +        # A file with line in it +        # Bail out! COMMENT +        # is treated as an error +        self.tap.write("ok 1 foo\n") +        self.tap.write("Bail out! Lifejacket engaged\n") +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            "test test 1 foo", +            "success test 1 foo", +            "test Bail out! Lifejacket engaged", +            "error Bail out! Lifejacket engaged", +            ], +            self.subunit.getvalue().splitlines()) + +    def test_missing_test_at_end_with_plan_adds_error(self): +        # A file +        # 1..3 +        # ok first test +        # not ok third test +        # results in three tests, with the third being created +        self.tap.write('1..3\n') +        self.tap.write('ok first test\n') +        self.tap.write('not ok second test\n') +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            'test test 1 first test', +            'success test 1 first test', +            'test test 2 second test', +            'failure test 2 second test', +            'test test 3', +            'error test 3 [', +            'test missing from TAP output', +            ']', +            ], +            self.subunit.getvalue().splitlines()) + +    def test_missing_test_with_plan_adds_error(self): +        # A file +        # 1..3 +        # ok first test +        # not ok 3 third test +        # results in three tests, with the second being created +        self.tap.write('1..3\n') +        self.tap.write('ok first test\n') +        self.tap.write('not ok 3 third test\n') +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            'test test 1 first test', +            'success test 1 first test', +            'test test 2', +            'error test 2 [', +            'test missing from TAP output', +            ']', +            'test test 3 third test', +            'failure test 3 third test', +            ], +            self.subunit.getvalue().splitlines()) + +    def test_missing_test_no_plan_adds_error(self): +        # A file +        # ok first test +        # not ok 3 third test +        # results in three tests, with the second being created +        self.tap.write('ok first test\n') +        self.tap.write('not ok 3 third test\n') +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            'test test 1 first test', +            'success test 1 first test', +            'test test 2', +            'error test 2 [', +            'test missing from TAP output', +            ']', +            'test test 3 third test', +            'failure test 3 third test', +            ], +            self.subunit.getvalue().splitlines()) + +    def test_four_tests_in_a_row_trailing_plan(self): +        # A file +        # ok 1 - first test in a script with no plan at all +        # not ok 2 - second +        # ok 3 - third +        # not ok 4 - fourth +        # 1..4 +        # results in four tests numbered and named +        self.tap.write('ok 1 - first test in a script with trailing plan\n') +        self.tap.write('not ok 2 - second\n') +        self.tap.write('ok 3 - third\n') +        self.tap.write('not ok 4 - fourth\n') +        self.tap.write('1..4\n') +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            'test test 1 - first test in a script with trailing plan', +            'success test 1 - first test in a script with trailing plan', +            'test test 2 - second', +            'failure test 2 - second', +            'test test 3 - third', +            'success test 3 - third', +            'test test 4 - fourth', +            'failure test 4 - fourth' +            ], +            self.subunit.getvalue().splitlines()) + +    def test_four_tests_in_a_row_with_plan(self): +        # A file +        # 1..4 +        # ok 1 - first test in a script with no plan at all +        # not ok 2 - second +        # ok 3 - third +        # not ok 4 - fourth +        # results in four tests numbered and named +        self.tap.write('1..4\n') +        self.tap.write('ok 1 - first test in a script with a plan\n') +        self.tap.write('not ok 2 - second\n') +        self.tap.write('ok 3 - third\n') +        self.tap.write('not ok 4 - fourth\n') +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            'test test 1 - first test in a script with a plan', +            'success test 1 - first test in a script with a plan', +            'test test 2 - second', +            'failure test 2 - second', +            'test test 3 - third', +            'success test 3 - third', +            'test test 4 - fourth', +            'failure test 4 - fourth' +            ], +            self.subunit.getvalue().splitlines()) + +    def test_four_tests_in_a_row_no_plan(self): +        # A file +        # ok 1 - first test in a script with no plan at all +        # not ok 2 - second +        # ok 3 - third +        # not ok 4 - fourth +        # results in four tests numbered and named +        self.tap.write('ok 1 - first test in a script with no plan at all\n') +        self.tap.write('not ok 2 - second\n') +        self.tap.write('ok 3 - third\n') +        self.tap.write('not ok 4 - fourth\n') +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            'test test 1 - first test in a script with no plan at all', +            'success test 1 - first test in a script with no plan at all', +            'test test 2 - second', +            'failure test 2 - second', +            'test test 3 - third', +            'success test 3 - third', +            'test test 4 - fourth', +            'failure test 4 - fourth' +            ], +            self.subunit.getvalue().splitlines()) + +    def test_todo_and_skip(self): +        # A file +        # not ok 1 - a fail but # TODO but is TODO +        # not ok 2 - another fail # SKIP instead +        # results in two tests, numbered and commented. +        self.tap.write("not ok 1 - a fail but # TODO but is TODO\n") +        self.tap.write("not ok 2 - another fail # SKIP instead\n") +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            'test test 1 - a fail but', +            'xfail test 1 - a fail but [', +            'but is TODO', +            ']', +            'test test 2 - another fail', +            'skip test 2 - another fail [', +            'instead', +            ']', +            ], +            self.subunit.getvalue().splitlines()) + +    def test_leading_comments_add_to_next_test_log(self): +        # A file +        # # comment +        # ok  +        # ok +        # results in a single test with the comment included +        # in the first test and not the second. +        self.tap.write("# comment\n") +        self.tap.write("ok\n") +        self.tap.write("ok\n") +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            'test test 1', +            'success test 1 [', +            '# comment', +            ']', +            'test test 2', +            'success test 2', +            ], +            self.subunit.getvalue().splitlines()) +     +    def test_trailing_comments_are_included_in_last_test_log(self): +        # A file +        # ok foo +        # ok foo +        # # comment +        # results in a two tests, with the second having the comment +        # attached to its log. +        self.tap.write("ok\n") +        self.tap.write("ok\n") +        self.tap.write("# comment\n") +        self.tap.seek(0) +        result = subunit.TAP2SubUnit(self.tap, self.subunit) +        self.assertEqual(0, result) +        self.assertEqual([ +            'test test 1', +            'success test 1', +            'test test 2', +            'success test 2 [', +            '# comment', +            ']', +            ], +            self.subunit.getvalue().splitlines()) + + +def test_suite(): +    loader = subunit.tests.TestUtil.TestLoader() +    result = loader.loadTestsFromName(__name__) +    return result diff --git a/lib/subunit/python/subunit/tests/test_test_protocol.py b/lib/subunit/python/subunit/tests/test_test_protocol.py index af31584a97..9e9db18163 100644 --- a/lib/subunit/python/subunit/tests/test_test_protocol.py +++ b/lib/subunit/python/subunit/tests/test_test_protocol.py @@ -1,126 +1,80 @@  # -#  subunit: extensions to python unittest to get test results from subprocesses. +#  subunit: extensions to Python unittest to get test results from subprocesses.  #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>  # -#  This program is free software; you can redistribute it and/or modify -#  it under the terms of the GNU General Public License as published by -#  the Free Software Foundation; either version 2 of the License, or -#  (at your option) any later version. -# -#  This program is distributed in the hope that it will be useful, -#  but WITHOUT ANY WARRANTY; without even the implied warranty of -#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -#  GNU General Public License for more details. -# -#  You should have received a copy of the GNU General Public License -#  along with this program; if not, write to the Free Software -#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA +#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +#  license at the users choice. A copy of both licenses are available in the +#  project source as Apache-2.0 and BSD. You may not use this file except in +#  compliance with one of these two licences. +#   +#  Unless required by applicable law or agreed to in writing, software +#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +#  license you chose for the specific language governing permissions and +#  limitations under that license.  # +import datetime  import unittest  from StringIO import StringIO  import os -import subunit  import sys -try: -    class MockTestProtocolServerClient(object): -        """A mock protocol server client to test callbacks.""" - -        def __init__(self): -            self.end_calls = [] -            self.error_calls = [] -            self.failure_calls = [] -            self.start_calls = [] -            self.success_calls = [] -            super(MockTestProtocolServerClient, self).__init__() - -        def addError(self, test, error): -            self.error_calls.append((test, error)) - -        def addFailure(self, test, error): -            self.failure_calls.append((test, error)) - -        def addSuccess(self, test): -            self.success_calls.append(test) - -        def stopTest(self, test): -            self.end_calls.append(test) - -        def startTest(self, test): -            self.start_calls.append(test) - -except AttributeError: -    MockTestProtocolServer = None - - -class TestMockTestProtocolServer(unittest.TestCase): - -    def test_start_test(self): -        protocol = MockTestProtocolServerClient() -        protocol.startTest(subunit.RemotedTestCase("test old mcdonald")) -        self.assertEqual(protocol.start_calls, -                         [subunit.RemotedTestCase("test old mcdonald")]) -        self.assertEqual(protocol.end_calls, []) -        self.assertEqual(protocol.error_calls, []) -        self.assertEqual(protocol.failure_calls, []) -        self.assertEqual(protocol.success_calls, []) - -    def test_add_error(self): -        protocol = MockTestProtocolServerClient() -        protocol.addError(subunit.RemotedTestCase("old mcdonald"), -                          subunit.RemoteError("omg it works")) -        self.assertEqual(protocol.start_calls, []) -        self.assertEqual(protocol.end_calls, []) -        self.assertEqual(protocol.error_calls, [( -                            subunit.RemotedTestCase("old mcdonald"), -                            subunit.RemoteError("omg it works"))]) -        self.assertEqual(protocol.failure_calls, []) -        self.assertEqual(protocol.success_calls, []) - -    def test_add_failure(self): -        protocol = MockTestProtocolServerClient() -        protocol.addFailure(subunit.RemotedTestCase("old mcdonald"), -                            subunit.RemoteError("omg it works")) -        self.assertEqual(protocol.start_calls, []) -        self.assertEqual(protocol.end_calls, []) -        self.assertEqual(protocol.error_calls, []) -        self.assertEqual(protocol.failure_calls, [ -                            (subunit.RemotedTestCase("old mcdonald"), -                             subunit.RemoteError("omg it works"))]) -        self.assertEqual(protocol.success_calls, []) +from testtools.content import Content, TracebackContent +from testtools.content_type import ContentType +from testtools.tests.helpers import ( +    Python26TestResult, +    Python27TestResult, +    ExtendedTestResult, +    ) -    def test_add_success(self): -        protocol = MockTestProtocolServerClient() -        protocol.addSuccess(subunit.RemotedTestCase("test old mcdonald")) -        self.assertEqual(protocol.start_calls, []) -        self.assertEqual(protocol.end_calls, []) -        self.assertEqual(protocol.error_calls, []) -        self.assertEqual(protocol.failure_calls, []) -        self.assertEqual(protocol.success_calls, -                         [subunit.RemotedTestCase("test old mcdonald")]) - -    def test_end_test(self): -        protocol = MockTestProtocolServerClient() -        protocol.stopTest(subunit.RemotedTestCase("test old mcdonald")) -        self.assertEqual(protocol.end_calls, -                         [subunit.RemotedTestCase("test old mcdonald")]) -        self.assertEqual(protocol.error_calls, []) -        self.assertEqual(protocol.failure_calls, []) -        self.assertEqual(protocol.success_calls, []) -        self.assertEqual(protocol.start_calls, []) +import subunit +from subunit import _remote_exception_str +import subunit.iso8601 as iso8601  class TestTestImports(unittest.TestCase):      def test_imports(self): +        from subunit import DiscardStream          from subunit import TestProtocolServer          from subunit import RemotedTestCase          from subunit import RemoteError          from subunit import ExecTestCase          from subunit import IsolatedTestCase          from subunit import TestProtocolClient +        from subunit import ProtocolTestCase + + +class TestDiscardStream(unittest.TestCase): + +    def test_write(self): +        subunit.DiscardStream().write("content") + + +class TestProtocolServerForward(unittest.TestCase): + +    def test_story(self): +        client = unittest.TestResult() +        out = StringIO() +        protocol = subunit.TestProtocolServer(client, forward_stream=out) +        pipe = StringIO("test old mcdonald\n" +                        "success old mcdonald\n") +        protocol.readFrom(pipe) +        mcdonald = subunit.RemotedTestCase("old mcdonald") +        self.assertEqual(client.testsRun, 1) +        self.assertEqual(pipe.getvalue(), out.getvalue()) +    def test_not_command(self): +        client = unittest.TestResult() +        out = StringIO() +        protocol = subunit.TestProtocolServer(client, +            stream=subunit.DiscardStream(), forward_stream=out) +        pipe = StringIO("success old mcdonald\n") +        protocol.readFrom(pipe) +        self.assertEqual(client.testsRun, 0) +        self.assertEqual("", out.getvalue()) +          class TestTestProtocolServerPipe(unittest.TestCase): @@ -140,47 +94,48 @@ class TestTestProtocolServerPipe(unittest.TestCase):          bing = subunit.RemotedTestCase("bing crosby")          an_error = subunit.RemotedTestCase("an error")          self.assertEqual(client.errors, -                         [(an_error, 'RemoteException: \n\n')]) +                         [(an_error, _remote_exception_str + '\n')])          self.assertEqual(              client.failures, -            [(bing, "RemoteException: foo.c:53:ERROR invalid state\n\n")]) +            [(bing, _remote_exception_str + ": Text attachment: traceback\n" +                "------------\nfoo.c:53:ERROR invalid state\n" +                "------------\n\n")])          self.assertEqual(client.testsRun, 3)  class TestTestProtocolServerStartTest(unittest.TestCase):      def setUp(self): -        self.client = MockTestProtocolServerClient() +        self.client = Python26TestResult()          self.protocol = subunit.TestProtocolServer(self.client)      def test_start_test(self):          self.protocol.lineReceived("test old mcdonald\n") -        self.assertEqual(self.client.start_calls, -                         [subunit.RemotedTestCase("old mcdonald")]) +        self.assertEqual(self.client._events, +            [('startTest', subunit.RemotedTestCase("old mcdonald"))])      def test_start_testing(self):          self.protocol.lineReceived("testing old mcdonald\n") -        self.assertEqual(self.client.start_calls, -                         [subunit.RemotedTestCase("old mcdonald")]) +        self.assertEqual(self.client._events, +            [('startTest', subunit.RemotedTestCase("old mcdonald"))])      def test_start_test_colon(self):          self.protocol.lineReceived("test: old mcdonald\n") -        self.assertEqual(self.client.start_calls, -                         [subunit.RemotedTestCase("old mcdonald")]) +        self.assertEqual(self.client._events, +            [('startTest', subunit.RemotedTestCase("old mcdonald"))])      def test_start_testing_colon(self):          self.protocol.lineReceived("testing: old mcdonald\n") -        self.assertEqual(self.client.start_calls, -                         [subunit.RemotedTestCase("old mcdonald")]) +        self.assertEqual(self.client._events, +            [('startTest', subunit.RemotedTestCase("old mcdonald"))])  class TestTestProtocolServerPassThrough(unittest.TestCase):      def setUp(self): -        from StringIO import StringIO          self.stdout = StringIO()          self.test = subunit.RemotedTestCase("old mcdonald") -        self.client = MockTestProtocolServerClient() +        self.client = ExtendedTestResult()          self.protocol = subunit.TestProtocolServer(self.client, self.stdout)      def keywords_before_test(self): @@ -205,42 +160,37 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):      def test_keywords_before_test(self):          self.keywords_before_test() -        self.assertEqual(self.client.start_calls, []) -        self.assertEqual(self.client.error_calls, []) -        self.assertEqual(self.client.failure_calls, []) -        self.assertEqual(self.client.success_calls, []) +        self.assertEqual(self.client._events, [])      def test_keywords_after_error(self):          self.protocol.lineReceived("test old mcdonald\n")          self.protocol.lineReceived("error old mcdonald\n")          self.keywords_before_test() -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.error_calls, -                         [(self.test, subunit.RemoteError(""))]) -        self.assertEqual(self.client.failure_calls, []) -        self.assertEqual(self.client.success_calls, []) +        self.assertEqual([ +            ('startTest', self.test), +            ('addError', self.test, {}), +            ('stopTest', self.test), +            ], self.client._events)      def test_keywords_after_failure(self):          self.protocol.lineReceived("test old mcdonald\n")          self.protocol.lineReceived("failure old mcdonald\n")          self.keywords_before_test() -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.error_calls, []) -        self.assertEqual(self.client.failure_calls, -                         [(self.test, subunit.RemoteError())]) -        self.assertEqual(self.client.success_calls, []) +        self.assertEqual(self.client._events, [ +            ('startTest', self.test), +            ('addFailure', self.test, {}), +            ('stopTest', self.test), +            ])      def test_keywords_after_success(self):          self.protocol.lineReceived("test old mcdonald\n")          self.protocol.lineReceived("success old mcdonald\n")          self.keywords_before_test() -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.error_calls, []) -        self.assertEqual(self.client.failure_calls, []) -        self.assertEqual(self.client.success_calls, [self.test]) +        self.assertEqual([ +            ('startTest', self.test), +            ('addSuccess', self.test), +            ('stopTest', self.test), +            ], self.client._events)      def test_keywords_after_test(self):          self.protocol.lineReceived("test old mcdonald\n") @@ -265,14 +215,15 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):                                                   "successful a\n"                                                   "successful: a\n"                                                   "]\n") -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.failure_calls, -                         [(self.test, subunit.RemoteError())]) -        self.assertEqual(self.client.error_calls, []) -        self.assertEqual(self.client.success_calls, []) +        self.assertEqual(self.client._events, [ +            ('startTest', self.test), +            ('addFailure', self.test, {}), +            ('stopTest', self.test), +            ])      def test_keywords_during_failure(self): +        # A smoke test to make sure that the details parsers have control +        # appropriately.          self.protocol.lineReceived("test old mcdonald\n")          self.protocol.lineReceived("failure: old mcdonald [\n")          self.protocol.lineReceived("test old mcdonald\n") @@ -287,21 +238,24 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):          self.protocol.lineReceived(" ]\n")          self.protocol.lineReceived("]\n")          self.assertEqual(self.stdout.getvalue(), "") -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.failure_calls, -                         [(self.test, subunit.RemoteError("test old mcdonald\n" -                                                  "failure a\n" -                                                  "failure: a\n" -                                                  "error a\n" -                                                  "error: a\n" -                                                  "success a\n" -                                                  "success: a\n" -                                                  "successful a\n" -                                                  "successful: a\n" -                                                  "]\n"))]) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.error_calls, []) -        self.assertEqual(self.client.success_calls, []) +        details = {} +        details['traceback'] = Content(ContentType("text", "x-traceback"), +            lambda:[ +            "test old mcdonald\n" +            "failure a\n" +            "failure: a\n" +            "error a\n" +            "error: a\n" +            "success a\n" +            "success: a\n" +            "successful a\n" +            "successful: a\n" +            "]\n"]) +        self.assertEqual(self.client._events, [ +            ('startTest', self.test), +            ('addFailure', self.test, details), +            ('stopTest', self.test), +            ])      def test_stdout_passthrough(self):          """Lines received which cannot be interpreted as any protocol action @@ -315,103 +269,133 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):  class TestTestProtocolServerLostConnection(unittest.TestCase):      def setUp(self): -        self.client = MockTestProtocolServerClient() +        self.client = Python26TestResult()          self.protocol = subunit.TestProtocolServer(self.client)          self.test = subunit.RemotedTestCase("old mcdonald")      def test_lost_connection_no_input(self):          self.protocol.lostConnection() -        self.assertEqual(self.client.start_calls, []) -        self.assertEqual(self.client.error_calls, []) -        self.assertEqual(self.client.failure_calls, []) -        self.assertEqual(self.client.success_calls, []) +        self.assertEqual([], self.client._events)      def test_lost_connection_after_start(self):          self.protocol.lineReceived("test old mcdonald\n")          self.protocol.lostConnection() -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.error_calls, [ -            (self.test, subunit.RemoteError("lost connection during " -                                            "test 'old mcdonald'"))]) -        self.assertEqual(self.client.failure_calls, []) -        self.assertEqual(self.client.success_calls, []) +        failure = subunit.RemoteError( +            "lost connection during test 'old mcdonald'") +        self.assertEqual([ +            ('startTest', self.test), +            ('addError', self.test, failure), +            ('stopTest', self.test), +            ], self.client._events)      def test_lost_connected_after_error(self):          self.protocol.lineReceived("test old mcdonald\n")          self.protocol.lineReceived("error old mcdonald\n")          self.protocol.lostConnection() -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.failure_calls, []) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.error_calls, [ -            (self.test, subunit.RemoteError(""))]) -        self.assertEqual(self.client.success_calls, []) +        self.assertEqual([ +            ('startTest', self.test), +            ('addError', self.test, subunit.RemoteError("")), +            ('stopTest', self.test), +            ], self.client._events) -    def test_lost_connection_during_error(self): +    def do_connection_lost(self, outcome, opening):          self.protocol.lineReceived("test old mcdonald\n") -        self.protocol.lineReceived("error old mcdonald [\n") +        self.protocol.lineReceived("%s old mcdonald %s" % (outcome, opening))          self.protocol.lostConnection() -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.error_calls, [ -            (self.test, subunit.RemoteError("lost connection during error " -                                            "report of test 'old mcdonald'"))]) -        self.assertEqual(self.client.failure_calls, []) -        self.assertEqual(self.client.success_calls, []) +        failure = subunit.RemoteError( +            "lost connection during %s report of test 'old mcdonald'" %  +            outcome) +        self.assertEqual([ +            ('startTest', self.test), +            ('addError', self.test, failure), +            ('stopTest', self.test), +            ], self.client._events) + +    def test_lost_connection_during_error(self): +        self.do_connection_lost("error", "[\n") + +    def test_lost_connection_during_error_details(self): +        self.do_connection_lost("error", "[ multipart\n")      def test_lost_connected_after_failure(self):          self.protocol.lineReceived("test old mcdonald\n")          self.protocol.lineReceived("failure old mcdonald\n")          self.protocol.lostConnection() -        test = subunit.RemotedTestCase("old mcdonald") -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.error_calls, []) -        self.assertEqual(self.client.failure_calls, -                         [(self.test, subunit.RemoteError())]) -        self.assertEqual(self.client.success_calls, []) +        self.assertEqual([ +            ('startTest', self.test), +            ('addFailure', self.test, subunit.RemoteError("")), +            ('stopTest', self.test), +            ], self.client._events)      def test_lost_connection_during_failure(self): -        self.protocol.lineReceived("test old mcdonald\n") -        self.protocol.lineReceived("failure old mcdonald [\n") -        self.protocol.lostConnection() -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.error_calls, -                         [(self.test, -                           subunit.RemoteError("lost connection during " -                                               "failure report" -                                               " of test 'old mcdonald'"))]) -        self.assertEqual(self.client.failure_calls, []) -        self.assertEqual(self.client.success_calls, []) +        self.do_connection_lost("failure", "[\n") + +    def test_lost_connection_during_failure_details(self): +        self.do_connection_lost("failure", "[ multipart\n")      def test_lost_connection_after_success(self):          self.protocol.lineReceived("test old mcdonald\n")          self.protocol.lineReceived("success old mcdonald\n")          self.protocol.lostConnection() -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.error_calls, []) -        self.assertEqual(self.client.failure_calls, []) -        self.assertEqual(self.client.success_calls, [self.test]) +        self.assertEqual([ +            ('startTest', self.test), +            ('addSuccess', self.test), +            ('stopTest', self.test), +            ], self.client._events) + +    def test_lost_connection_during_success(self): +        self.do_connection_lost("success", "[\n") + +    def test_lost_connection_during_success_details(self): +        self.do_connection_lost("success", "[ multipart\n") + +    def test_lost_connection_during_skip(self): +        self.do_connection_lost("skip", "[\n") + +    def test_lost_connection_during_skip_details(self): +        self.do_connection_lost("skip", "[ multipart\n") + +    def test_lost_connection_during_xfail(self): +        self.do_connection_lost("xfail", "[\n") + +    def test_lost_connection_during_xfail_details(self): +        self.do_connection_lost("xfail", "[ multipart\n") + + +class TestInTestMultipart(unittest.TestCase): + +    def setUp(self): +        self.client = ExtendedTestResult() +        self.protocol = subunit.TestProtocolServer(self.client) +        self.protocol.lineReceived("test mcdonalds farm\n") +        self.test = subunit.RemotedTestCase("mcdonalds farm") + +    def test__outcome_sets_details_parser(self): +        self.protocol._reading_success_details.details_parser = None +        self.protocol._state._outcome(0, "mcdonalds farm [ multipart\n", +            None, self.protocol._reading_success_details) +        parser = self.protocol._reading_success_details.details_parser +        self.assertNotEqual(None, parser) +        self.assertTrue(isinstance(parser, +            subunit.details.MultipartDetailsParser))  class TestTestProtocolServerAddError(unittest.TestCase):      def setUp(self): -        self.client = MockTestProtocolServerClient() +        self.client = ExtendedTestResult()          self.protocol = subunit.TestProtocolServer(self.client)          self.protocol.lineReceived("test mcdonalds farm\n")          self.test = subunit.RemotedTestCase("mcdonalds farm")      def simple_error_keyword(self, keyword):          self.protocol.lineReceived("%s mcdonalds farm\n" % keyword) -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.error_calls, [ -            (self.test, subunit.RemoteError(""))]) -        self.assertEqual(self.client.failure_calls, []) +        details = {} +        self.assertEqual([ +            ('startTest', self.test), +            ('addError', self.test, details), +            ('stopTest', self.test), +            ], self.client._events)      def test_simple_error(self):          self.simple_error_keyword("error") @@ -422,21 +406,27 @@ class TestTestProtocolServerAddError(unittest.TestCase):      def test_error_empty_message(self):          self.protocol.lineReceived("error mcdonalds farm [\n")          self.protocol.lineReceived("]\n") -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.error_calls, [ -            (self.test, subunit.RemoteError(""))]) -        self.assertEqual(self.client.failure_calls, []) +        details = {} +        details['traceback'] = Content(ContentType("text", "x-traceback"), +            lambda:[""]) +        self.assertEqual([ +            ('startTest', self.test), +            ('addError', self.test, details), +            ('stopTest', self.test), +            ], self.client._events)      def error_quoted_bracket(self, keyword):          self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword)          self.protocol.lineReceived(" ]\n")          self.protocol.lineReceived("]\n") -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.error_calls, [ -            (self.test, subunit.RemoteError("]\n"))]) -        self.assertEqual(self.client.failure_calls, []) +        details = {} +        details['traceback'] = Content(ContentType("text", "x-traceback"), +            lambda:["]\n"]) +        self.assertEqual([ +            ('startTest', self.test), +            ('addError', self.test, details), +            ('stopTest', self.test), +            ], self.client._events)      def test_error_quoted_bracket(self):          self.error_quoted_bracket("error") @@ -448,18 +438,22 @@ class TestTestProtocolServerAddError(unittest.TestCase):  class TestTestProtocolServerAddFailure(unittest.TestCase):      def setUp(self): -        self.client = MockTestProtocolServerClient() +        self.client = ExtendedTestResult()          self.protocol = subunit.TestProtocolServer(self.client)          self.protocol.lineReceived("test mcdonalds farm\n")          self.test = subunit.RemotedTestCase("mcdonalds farm") +    def assertFailure(self, details): +        self.assertEqual([ +            ('startTest', self.test), +            ('addFailure', self.test, details), +            ('stopTest', self.test), +            ], self.client._events) +      def simple_failure_keyword(self, keyword):          self.protocol.lineReceived("%s mcdonalds farm\n" % keyword) -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.error_calls, []) -        self.assertEqual(self.client.failure_calls, -                         [(self.test, subunit.RemoteError())]) +        details = {} +        self.assertFailure(details)      def test_simple_failure(self):          self.simple_failure_keyword("failure") @@ -470,21 +464,19 @@ class TestTestProtocolServerAddFailure(unittest.TestCase):      def test_failure_empty_message(self):          self.protocol.lineReceived("failure mcdonalds farm [\n")          self.protocol.lineReceived("]\n") -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.error_calls, []) -        self.assertEqual(self.client.failure_calls, -                         [(self.test, subunit.RemoteError())]) +        details = {} +        details['traceback'] = Content(ContentType("text", "x-traceback"), +            lambda:[""]) +        self.assertFailure(details)      def failure_quoted_bracket(self, keyword):          self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword)          self.protocol.lineReceived(" ]\n")          self.protocol.lineReceived("]\n") -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.error_calls, []) -        self.assertEqual(self.client.failure_calls, -                         [(self.test, subunit.RemoteError("]\n"))]) +        details = {} +        details['traceback'] = Content(ContentType("text", "x-traceback"), +            lambda:["]\n"]) +        self.assertFailure(details)      def test_failure_quoted_bracket(self):          self.failure_quoted_bracket("failure") @@ -493,20 +485,191 @@ class TestTestProtocolServerAddFailure(unittest.TestCase):          self.failure_quoted_bracket("failure:") +class TestTestProtocolServerAddxFail(unittest.TestCase): +    """Tests for the xfail keyword. + +    In Python this can thunk through to Success due to stdlib limitations (see +    README). +    """ + +    def capture_expected_failure(self, test, err): +        self._events.append((test, err)) + +    def setup_python26(self): +        """Setup a test object ready to be xfailed and thunk to success.""" +        self.client = Python26TestResult() +        self.setup_protocol() + +    def setup_python27(self): +        """Setup a test object ready to be xfailed.""" +        self.client = Python27TestResult() +        self.setup_protocol() + +    def setup_python_ex(self): +        """Setup a test object ready to be xfailed with details.""" +        self.client = ExtendedTestResult() +        self.setup_protocol() + +    def setup_protocol(self): +        """Setup the protocol based on self.client.""" +        self.protocol = subunit.TestProtocolServer(self.client) +        self.protocol.lineReceived("test mcdonalds farm\n") +        self.test = self.client._events[-1][-1] + +    def simple_xfail_keyword(self, keyword, as_success): +        self.protocol.lineReceived("%s mcdonalds farm\n" % keyword) +        self.check_success_or_xfail(as_success) + +    def check_success_or_xfail(self, as_success, error_message=None): +        if as_success: +            self.assertEqual([ +                ('startTest', self.test), +                ('addSuccess', self.test), +                ('stopTest', self.test), +                ], self.client._events) +        else: +            details = {} +            if error_message is not None: +                details['traceback'] = Content( +                    ContentType("text", "x-traceback"), lambda:[error_message]) +            if isinstance(self.client, ExtendedTestResult): +                value = details +            else: +                if error_message is not None: +                    value = subunit.RemoteError('Text attachment: traceback\n' +                        '------------\n' + error_message + '------------\n') +                else: +                    value = subunit.RemoteError() +            self.assertEqual([ +                ('startTest', self.test), +                ('addExpectedFailure', self.test, value), +                ('stopTest', self.test), +                ], self.client._events) + +    def test_simple_xfail(self): +        self.setup_python26() +        self.simple_xfail_keyword("xfail", True) +        self.setup_python27() +        self.simple_xfail_keyword("xfail",  False) +        self.setup_python_ex() +        self.simple_xfail_keyword("xfail",  False) + +    def test_simple_xfail_colon(self): +        self.setup_python26() +        self.simple_xfail_keyword("xfail:", True) +        self.setup_python27() +        self.simple_xfail_keyword("xfail:", False) +        self.setup_python_ex() +        self.simple_xfail_keyword("xfail:", False) + +    def test_xfail_empty_message(self): +        self.setup_python26() +        self.empty_message(True) +        self.setup_python27() +        self.empty_message(False) +        self.setup_python_ex() +        self.empty_message(False, error_message="") + +    def empty_message(self, as_success, error_message="\n"): +        self.protocol.lineReceived("xfail mcdonalds farm [\n") +        self.protocol.lineReceived("]\n") +        self.check_success_or_xfail(as_success, error_message) + +    def xfail_quoted_bracket(self, keyword, as_success): +        # This tests it is accepted, but cannot test it is used today, because +        # of not having a way to expose it in Python so far. +        self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword) +        self.protocol.lineReceived(" ]\n") +        self.protocol.lineReceived("]\n") +        self.check_success_or_xfail(as_success, "]\n") + +    def test_xfail_quoted_bracket(self): +        self.setup_python26() +        self.xfail_quoted_bracket("xfail", True) +        self.setup_python27() +        self.xfail_quoted_bracket("xfail", False) +        self.setup_python_ex() +        self.xfail_quoted_bracket("xfail", False) + +    def test_xfail_colon_quoted_bracket(self): +        self.setup_python26() +        self.xfail_quoted_bracket("xfail:", True) +        self.setup_python27() +        self.xfail_quoted_bracket("xfail:", False) +        self.setup_python_ex() +        self.xfail_quoted_bracket("xfail:", False) + + +class TestTestProtocolServerAddSkip(unittest.TestCase): +    """Tests for the skip keyword. + +    In Python this meets the testtools extended TestResult contract. +    (See https://launchpad.net/testtools). +    """ + +    def setUp(self): +        """Setup a test object ready to be skipped.""" +        self.client = ExtendedTestResult() +        self.protocol = subunit.TestProtocolServer(self.client) +        self.protocol.lineReceived("test mcdonalds farm\n") +        self.test = self.client._events[-1][-1] + +    def assertSkip(self, reason): +        details = {} +        if reason is not None: +            details['reason'] = Content( +                ContentType("text", "plain"), lambda:[reason]) +        self.assertEqual([ +            ('startTest', self.test), +            ('addSkip', self.test, details), +            ('stopTest', self.test), +            ], self.client._events) + +    def simple_skip_keyword(self, keyword): +        self.protocol.lineReceived("%s mcdonalds farm\n" % keyword) +        self.assertSkip(None) + +    def test_simple_skip(self): +        self.simple_skip_keyword("skip") + +    def test_simple_skip_colon(self): +        self.simple_skip_keyword("skip:") + +    def test_skip_empty_message(self): +        self.protocol.lineReceived("skip mcdonalds farm [\n") +        self.protocol.lineReceived("]\n") +        self.assertSkip("") + +    def skip_quoted_bracket(self, keyword): +        # This tests it is accepted, but cannot test it is used today, because +        # of not having a way to expose it in Python so far. +        self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword) +        self.protocol.lineReceived(" ]\n") +        self.protocol.lineReceived("]\n") +        self.assertSkip("]\n") + +    def test_skip_quoted_bracket(self): +        self.skip_quoted_bracket("skip") + +    def test_skip_colon_quoted_bracket(self): +        self.skip_quoted_bracket("skip:") + +  class TestTestProtocolServerAddSuccess(unittest.TestCase):      def setUp(self): -        self.client = MockTestProtocolServerClient() +        self.client = ExtendedTestResult()          self.protocol = subunit.TestProtocolServer(self.client)          self.protocol.lineReceived("test mcdonalds farm\n")          self.test = subunit.RemotedTestCase("mcdonalds farm")      def simple_success_keyword(self, keyword):          self.protocol.lineReceived("%s mcdonalds farm\n" % keyword) -        self.assertEqual(self.client.start_calls, [self.test]) -        self.assertEqual(self.client.end_calls, [self.test]) -        self.assertEqual(self.client.error_calls, []) -        self.assertEqual(self.client.success_calls, [self.test]) +        self.assertEqual([ +            ('startTest', self.test), +            ('addSuccess', self.test), +            ('stopTest', self.test), +            ], self.client._events)      def test_simple_success(self):          self.simple_success_keyword("failure") @@ -520,6 +683,134 @@ class TestTestProtocolServerAddSuccess(unittest.TestCase):      def test_simple_success_colon(self):          self.simple_success_keyword("successful:") +    def assertSuccess(self, details): +        self.assertEqual([ +            ('startTest', self.test), +            ('addSuccess', self.test, details), +            ('stopTest', self.test), +            ], self.client._events) + +    def test_success_empty_message(self): +        self.protocol.lineReceived("success mcdonalds farm [\n") +        self.protocol.lineReceived("]\n") +        details = {} +        details['message'] = Content(ContentType("text", "plain"), +            lambda:[""]) +        self.assertSuccess(details) + +    def success_quoted_bracket(self, keyword): +        # This tests it is accepted, but cannot test it is used today, because +        # of not having a way to expose it in Python so far. +        self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword) +        self.protocol.lineReceived(" ]\n") +        self.protocol.lineReceived("]\n") +        details = {} +        details['message'] = Content(ContentType("text", "plain"), +            lambda:["]\n"]) +        self.assertSuccess(details) + +    def test_success_quoted_bracket(self): +        self.success_quoted_bracket("success") + +    def test_success_colon_quoted_bracket(self): +        self.success_quoted_bracket("success:") + + +class TestTestProtocolServerProgress(unittest.TestCase): +    """Test receipt of progress: directives.""" + +    def test_progress_accepted_stdlib(self): +        self.result = Python26TestResult() +        self.stream = StringIO() +        self.protocol = subunit.TestProtocolServer(self.result, +            stream=self.stream) +        self.protocol.lineReceived("progress: 23") +        self.protocol.lineReceived("progress: -2") +        self.protocol.lineReceived("progress: +4") +        self.assertEqual("", self.stream.getvalue()) + +    def test_progress_accepted_extended(self): +        # With a progress capable TestResult, progress events are emitted. +        self.result = ExtendedTestResult() +        self.stream = StringIO() +        self.protocol = subunit.TestProtocolServer(self.result, +            stream=self.stream) +        self.protocol.lineReceived("progress: 23") +        self.protocol.lineReceived("progress: push") +        self.protocol.lineReceived("progress: -2") +        self.protocol.lineReceived("progress: pop") +        self.protocol.lineReceived("progress: +4") +        self.assertEqual("", self.stream.getvalue()) +        self.assertEqual([ +            ('progress', 23, subunit.PROGRESS_SET), +            ('progress', None, subunit.PROGRESS_PUSH), +            ('progress', -2, subunit.PROGRESS_CUR), +            ('progress', None, subunit.PROGRESS_POP), +            ('progress', 4, subunit.PROGRESS_CUR), +            ], self.result._events) + + +class TestTestProtocolServerStreamTags(unittest.TestCase): +    """Test managing tags on the protocol level.""" + +    def setUp(self): +        self.client = ExtendedTestResult() +        self.protocol = subunit.TestProtocolServer(self.client) + +    def test_initial_tags(self): +        self.protocol.lineReceived("tags: foo bar:baz  quux\n") +        self.assertEqual([ +            ('tags', set(["foo", "bar:baz", "quux"]), set()), +            ], self.client._events) + +    def test_minus_removes_tags(self): +        self.protocol.lineReceived("tags: -bar quux\n") +        self.assertEqual([ +            ('tags', set(["quux"]), set(["bar"])), +            ], self.client._events) + +    def test_tags_do_not_get_set_on_test(self): +        self.protocol.lineReceived("test mcdonalds farm\n") +        test = self.client._events[0][-1] +        self.assertEqual(None, getattr(test, 'tags', None)) + +    def test_tags_do_not_get_set_on_global_tags(self): +        self.protocol.lineReceived("tags: foo bar\n") +        self.protocol.lineReceived("test mcdonalds farm\n") +        test = self.client._events[-1][-1] +        self.assertEqual(None, getattr(test, 'tags', None)) + +    def test_tags_get_set_on_test_tags(self): +        self.protocol.lineReceived("test mcdonalds farm\n") +        test = self.client._events[-1][-1] +        self.protocol.lineReceived("tags: foo bar\n") +        self.protocol.lineReceived("success mcdonalds farm\n") +        self.assertEqual(None, getattr(test, 'tags', None)) + + +class TestTestProtocolServerStreamTime(unittest.TestCase): +    """Test managing time information at the protocol level.""" + +    def test_time_accepted_stdlib(self): +        self.result = Python26TestResult() +        self.stream = StringIO() +        self.protocol = subunit.TestProtocolServer(self.result, +            stream=self.stream) +        self.protocol.lineReceived("time: 2001-12-12 12:59:59Z\n") +        self.assertEqual("", self.stream.getvalue()) + +    def test_time_accepted_extended(self): +        self.result = ExtendedTestResult() +        self.stream = StringIO() +        self.protocol = subunit.TestProtocolServer(self.result, +            stream=self.stream) +        self.protocol.lineReceived("time: 2001-12-12 12:59:59Z\n") +        self.assertEqual("", self.stream.getvalue()) +        self.assertEqual([ +            ('time', datetime.datetime(2001, 12, 12, 12, 59, 59, 0, +            iso8601.Utc())) +            ], self.result._events) +  class TestRemotedTestCase(unittest.TestCase): @@ -529,14 +820,14 @@ class TestRemotedTestCase(unittest.TestCase):          self.assertRaises(NotImplementedError, test.tearDown)          self.assertEqual("A test description",                           test.shortDescription()) -        self.assertEqual("subunit.RemotedTestCase.A test description", +        self.assertEqual("A test description",                           test.id())          self.assertEqual("A test description (subunit.RemotedTestCase)", "%s" % test)          self.assertEqual("<subunit.RemotedTestCase description="                           "'A test description'>", "%r" % test)          result = unittest.TestResult()          test.run(result) -        self.assertEqual([(test, "RemoteException: " +        self.assertEqual([(test, _remote_exception_str + ": "                                   "Cannot run RemotedTestCases.\n\n")],                           result.errors)          self.assertEqual(1, result.testsRun) @@ -570,27 +861,43 @@ class TestExecTestCase(unittest.TestCase):              # the sample script runs three tests, one each              # that fails, errors and succeeds +        def test_sample_method_args(self): +            """sample-script.py foo""" +            # sample that will run just one test.      def test_construct(self):          test = self.SampleExecTestCase("test_sample_method")          self.assertEqual(test.script,                           subunit.join_dir(__file__, 'sample-script.py')) +    def test_args(self): +        result = unittest.TestResult() +        test = self.SampleExecTestCase("test_sample_method_args") +        test.run(result) +        self.assertEqual(1, result.testsRun) +      def test_run(self): -        runner = MockTestProtocolServerClient() +        result = ExtendedTestResult()          test = self.SampleExecTestCase("test_sample_method") -        test.run(runner) +        test.run(result)          mcdonald = subunit.RemotedTestCase("old mcdonald")          bing = subunit.RemotedTestCase("bing crosby") +        bing_details = {} +        bing_details['traceback'] = Content(ContentType("text", "x-traceback"), +            lambda:["foo.c:53:ERROR invalid state\n"])          an_error = subunit.RemotedTestCase("an error") -        self.assertEqual(runner.error_calls, -                         [(an_error, subunit.RemoteError())]) -        self.assertEqual(runner.failure_calls, -                         [(bing, -                           subunit.RemoteError( -                            "foo.c:53:ERROR invalid state\n"))]) -        self.assertEqual(runner.start_calls, [mcdonald, bing, an_error]) -        self.assertEqual(runner.end_calls, [mcdonald, bing, an_error]) +        error_details = {} +        self.assertEqual([ +            ('startTest', mcdonald), +            ('addSuccess', mcdonald), +            ('stopTest', mcdonald), +            ('startTest', bing), +            ('addFailure', bing, bing_details), +            ('stopTest', bing), +            ('startTest', an_error), +            ('addError', an_error, error_details), +            ('stopTest', an_error), +            ], result._events)      def test_debug(self):          test = self.SampleExecTestCase("test_sample_method") @@ -689,7 +996,11 @@ class TestTestProtocolClient(unittest.TestCase):          self.io = StringIO()          self.protocol = subunit.TestProtocolClient(self.io)          self.test = TestTestProtocolClient("test_start_test") - +        self.sample_details = {'something':Content( +            ContentType('text', 'plain'), lambda:['serialised\nform'])} +        self.sample_tb_details = dict(self.sample_details) +        self.sample_tb_details['traceback'] = TracebackContent( +            subunit.RemoteError("boo qux"), self.test)      def test_start_test(self):          """Test startTest on a TestProtocolClient.""" @@ -697,7 +1008,7 @@ class TestTestProtocolClient(unittest.TestCase):          self.assertEqual(self.io.getvalue(), "test: %s\n" % self.test.id())      def test_stop_test(self): -        """Test stopTest on a TestProtocolClient.""" +        # stopTest doesn't output anything.          self.protocol.stopTest(self.test)          self.assertEqual(self.io.getvalue(), "") @@ -707,22 +1018,154 @@ class TestTestProtocolClient(unittest.TestCase):          self.assertEqual(              self.io.getvalue(), "successful: %s\n" % self.test.id()) +    def test_add_success_details(self): +        """Test addSuccess on a TestProtocolClient with details.""" +        self.protocol.addSuccess(self.test, details=self.sample_details) +        self.assertEqual( +            self.io.getvalue(), "successful: %s [ multipart\n" +                "Content-Type: text/plain\n" +                "something\n" +                "F\r\nserialised\nform0\r\n]\n" % self.test.id()) +      def test_add_failure(self):          """Test addFailure on a TestProtocolClient.""" -        self.protocol.addFailure(self.test, subunit.RemoteError("boo")) +        self.protocol.addFailure( +            self.test, subunit.RemoteError("boo qux")) +        self.assertEqual( +            self.io.getvalue(), +            ('failure: %s [\n' + _remote_exception_str + ': boo qux\n]\n') +            % self.test.id()) + +    def test_add_failure_details(self): +        """Test addFailure on a TestProtocolClient with details.""" +        self.protocol.addFailure( +            self.test, details=self.sample_tb_details)          self.assertEqual(              self.io.getvalue(), -            'failure: %s [\nRemoteException: boo\n]\n' % self.test.id()) +            ("failure: %s [ multipart\n" +            "Content-Type: text/plain\n" +            "something\n" +            "F\r\nserialised\nform0\r\n" +            "Content-Type: text/x-traceback;charset=utf8,language=python\n" +            "traceback\n" +            "1A\r\n" + _remote_exception_str + ": boo qux\n0\r\n" +            "]\n") % self.test.id())      def test_add_error(self):          """Test stopTest on a TestProtocolClient.""" -        self.protocol.addError(self.test, subunit.RemoteError("phwoar")) +        self.protocol.addError( +            self.test, subunit.RemoteError("phwoar crikey")) +        self.assertEqual( +            self.io.getvalue(), +            ('error: %s [\n' + +            _remote_exception_str + ": phwoar crikey\n" +            "]\n") % self.test.id()) + +    def test_add_error_details(self): +        """Test stopTest on a TestProtocolClient with details.""" +        self.protocol.addError( +            self.test, details=self.sample_tb_details) +        self.assertEqual( +            self.io.getvalue(), +            ("error: %s [ multipart\n" +            "Content-Type: text/plain\n" +            "something\n" +            "F\r\nserialised\nform0\r\n" +            "Content-Type: text/x-traceback;charset=utf8,language=python\n" +            "traceback\n" +            "1A\r\n" + _remote_exception_str + ": boo qux\n0\r\n" +            "]\n") % self.test.id()) + +    def test_add_expected_failure(self): +        """Test addExpectedFailure on a TestProtocolClient.""" +        self.protocol.addExpectedFailure( +            self.test, subunit.RemoteError("phwoar crikey"))          self.assertEqual(              self.io.getvalue(), -            'error: %s [\n' -            "RemoteException: phwoar\n" +            ('xfail: %s [\n' + +            _remote_exception_str + ": phwoar crikey\n" +            "]\n") % self.test.id()) + +    def test_add_expected_failure_details(self): +        """Test addExpectedFailure on a TestProtocolClient with details.""" +        self.protocol.addExpectedFailure( +            self.test, details=self.sample_tb_details) +        self.assertEqual( +            self.io.getvalue(), +            ("xfail: %s [ multipart\n" +            "Content-Type: text/plain\n" +            "something\n" +            "F\r\nserialised\nform0\r\n" +            "Content-Type: text/x-traceback;charset=utf8,language=python\n" +            "traceback\n" +            "1A\r\n"+ _remote_exception_str + ": boo qux\n0\r\n" +            "]\n") % self.test.id()) + +    def test_add_skip(self): +        """Test addSkip on a TestProtocolClient.""" +        self.protocol.addSkip( +            self.test, "Has it really?") +        self.assertEqual( +            self.io.getvalue(), +            'skip: %s [\nHas it really?\n]\n' % self.test.id()) +     +    def test_add_skip_details(self): +        """Test addSkip on a TestProtocolClient with details.""" +        details = {'reason':Content( +            ContentType('text', 'plain'), lambda:['Has it really?'])} +        self.protocol.addSkip( +            self.test, details=details) +        self.assertEqual( +            self.io.getvalue(), +            "skip: %s [ multipart\n" +            "Content-Type: text/plain\n" +            "reason\n" +            "E\r\nHas it really?0\r\n"              "]\n" % self.test.id()) +    def test_progress_set(self): +        self.protocol.progress(23, subunit.PROGRESS_SET) +        self.assertEqual(self.io.getvalue(), 'progress: 23\n') + +    def test_progress_neg_cur(self): +        self.protocol.progress(-23, subunit.PROGRESS_CUR) +        self.assertEqual(self.io.getvalue(), 'progress: -23\n') + +    def test_progress_pos_cur(self): +        self.protocol.progress(23, subunit.PROGRESS_CUR) +        self.assertEqual(self.io.getvalue(), 'progress: +23\n') + +    def test_progress_pop(self): +        self.protocol.progress(1234, subunit.PROGRESS_POP) +        self.assertEqual(self.io.getvalue(), 'progress: pop\n') + +    def test_progress_push(self): +        self.protocol.progress(1234, subunit.PROGRESS_PUSH) +        self.assertEqual(self.io.getvalue(), 'progress: push\n') + +    def test_time(self): +        # Calling time() outputs a time signal immediately. +        self.protocol.time( +            datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc())) +        self.assertEqual( +            "time: 2009-10-11 12:13:14.000015Z\n", +            self.io.getvalue()) + +    def test_add_unexpected_success(self): +        """Test addUnexpectedSuccess on a TestProtocolClient.""" +        self.protocol.addUnexpectedSuccess(self.test) +        self.assertEqual( +            self.io.getvalue(), "successful: %s\n" % self.test.id()) + +    def test_add_unexpected_success_details(self): +        """Test addUnexpectedSuccess on a TestProtocolClient with details.""" +        self.protocol.addUnexpectedSuccess(self.test, details=self.sample_details) +        self.assertEqual( +            self.io.getvalue(), "successful: %s [ multipart\n" +                "Content-Type: text/plain\n" +                "something\n" +                "F\r\nserialised\nform0\r\n]\n" % self.test.id()) +  def test_suite():      loader = subunit.tests.TestUtil.TestLoader() diff --git a/lib/subunit/python/subunit/tests/test_test_results.py b/lib/subunit/python/subunit/tests/test_test_results.py new file mode 100644 index 0000000000..fe82c04b06 --- /dev/null +++ b/lib/subunit/python/subunit/tests/test_test_results.py @@ -0,0 +1,199 @@ +# +#  subunit: extensions to Python unittest to get test results from subprocesses. +#  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net> +# +#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +#  license at the users choice. A copy of both licenses are available in the +#  project source as Apache-2.0 and BSD. You may not use this file except in +#  compliance with one of these two licences. +#   +#  Unless required by applicable law or agreed to in writing, software +#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the +#  license you chose for the specific language governing permissions and +#  limitations under that license. +# + +import datetime +import unittest +from StringIO import StringIO +import os +import sys + +from testtools.content_type import ContentType +from testtools.content import Content + +import subunit +import subunit.iso8601 as iso8601 +import subunit.test_results + + +class LoggingDecorator(subunit.test_results.HookedTestResultDecorator): + +    def __init__(self, decorated): +        self._calls = 0 +        super(LoggingDecorator, self).__init__(decorated) + +    def _before_event(self): +        self._calls += 1 + + +class AssertBeforeTestResult(LoggingDecorator): +    """A TestResult for checking preconditions.""" + +    def __init__(self, decorated, test): +        self.test = test +        super(AssertBeforeTestResult, self).__init__(decorated) + +    def _before_event(self): +        self.test.assertEqual(1, self.earlier._calls) +        super(AssertBeforeTestResult, self)._before_event() + + +class TimeCapturingResult(unittest.TestResult): + +    def __init__(self): +        super(TimeCapturingResult, self).__init__() +        self._calls = [] + +    def time(self, a_datetime): +        self._calls.append(a_datetime) + + +class TestHookedTestResultDecorator(unittest.TestCase): + +    def setUp(self): +        # An end to the chain +        terminal = unittest.TestResult() +        # Asserts that the call was made to self.result before asserter was +        # called. +        asserter = AssertBeforeTestResult(terminal, self) +        # The result object we call, which much increase its call count. +        self.result = LoggingDecorator(asserter) +        asserter.earlier = self.result +        self.decorated = asserter + +    def tearDown(self): +        # The hook in self.result must have been called +        self.assertEqual(1, self.result._calls) +        # The hook in asserter must have been called too, otherwise the +        # assertion about ordering won't have completed. +        self.assertEqual(1, self.decorated._calls) + +    def test_startTest(self): +        self.result.startTest(self) +         +    def test_startTestRun(self): +        self.result.startTestRun() +         +    def test_stopTest(self): +        self.result.stopTest(self) +         +    def test_stopTestRun(self): +        self.result.stopTestRun() + +    def test_addError(self): +        self.result.addError(self, subunit.RemoteError()) +         +    def test_addError_details(self): +        self.result.addError(self, details={}) +         +    def test_addFailure(self): +        self.result.addFailure(self, subunit.RemoteError()) + +    def test_addFailure_details(self): +        self.result.addFailure(self, details={}) + +    def test_addSuccess(self): +        self.result.addSuccess(self) + +    def test_addSuccess_details(self): +        self.result.addSuccess(self, details={}) + +    def test_addSkip(self): +        self.result.addSkip(self, "foo") + +    def test_addSkip_details(self): +        self.result.addSkip(self, details={}) + +    def test_addExpectedFailure(self): +        self.result.addExpectedFailure(self, subunit.RemoteError()) + +    def test_addExpectedFailure_details(self): +        self.result.addExpectedFailure(self, details={}) + +    def test_addUnexpectedSuccess(self): +        self.result.addUnexpectedSuccess(self) + +    def test_addUnexpectedSuccess_details(self): +        self.result.addUnexpectedSuccess(self, details={}) + +    def test_progress(self): +        self.result.progress(1, subunit.PROGRESS_SET) + +    def test_wasSuccessful(self): +        self.result.wasSuccessful() + +    def test_shouldStop(self): +        self.result.shouldStop + +    def test_stop(self): +        self.result.stop() + +    def test_time(self): +        self.result.time(None) +  + +class TestAutoTimingTestResultDecorator(unittest.TestCase): + +    def setUp(self): +        # And end to the chain which captures time events. +        terminal = TimeCapturingResult() +        # The result object under test. +        self.result = subunit.test_results.AutoTimingTestResultDecorator( +            terminal) +        self.decorated = terminal + +    def test_without_time_calls_time_is_called_and_not_None(self): +        self.result.startTest(self) +        self.assertEqual(1, len(self.decorated._calls)) +        self.assertNotEqual(None, self.decorated._calls[0]) + +    def test_no_time_from_progress(self): +        self.result.progress(1, subunit.PROGRESS_CUR) +        self.assertEqual(0, len(self.decorated._calls)) + +    def test_no_time_from_shouldStop(self): +        self.decorated.stop() +        self.result.shouldStop +        self.assertEqual(0, len(self.decorated._calls)) + +    def test_calling_time_inhibits_automatic_time(self): +        # Calling time() outputs a time signal immediately and prevents +        # automatically adding one when other methods are called. +        time = datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc()) +        self.result.time(time) +        self.result.startTest(self) +        self.result.stopTest(self) +        self.assertEqual(1, len(self.decorated._calls)) +        self.assertEqual(time, self.decorated._calls[0]) + +    def test_calling_time_None_enables_automatic_time(self): +        time = datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc()) +        self.result.time(time) +        self.assertEqual(1, len(self.decorated._calls)) +        self.assertEqual(time, self.decorated._calls[0]) +        # Calling None passes the None through, in case other results care. +        self.result.time(None) +        self.assertEqual(2, len(self.decorated._calls)) +        self.assertEqual(None, self.decorated._calls[1]) +        # Calling other methods doesn't generate an automatic time event. +        self.result.startTest(self) +        self.assertEqual(3, len(self.decorated._calls)) +        self.assertNotEqual(None, self.decorated._calls[2]) + + +def test_suite(): +    loader = subunit.tests.TestUtil.TestLoader() +    result = loader.loadTestsFromName(__name__) +    return result diff --git a/lib/subunit/tap2subunit b/lib/subunit/tap2subunit deleted file mode 100755 index 9e335168f5..0000000000 --- a/lib/subunit/tap2subunit +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/perl -# Simple script that converts Perl test harness output to  -# Subunit -# Copyright (C) 2008 Jelmer Vernooij <jelmer@samba.org> -# Published under the GNU GPL, v3 or later - -my $firstline = 1; -my $error = 0; -while(<STDIN>) { -	if ($firstline) { -		$firstline = 0; -		next; -	} -	if (/^not ok (\d+) - (.*)$/) { -		print "test: $2\n"; -		print "failure: $2\n"; -		$error = 1; -	} elsif (/^ok (\d+) - (.*)$/) { -		print "test: $2\n"; -		print "success: $2\n"; -	} elsif (/^ok (\d+)$/) { -		print "test: $1\n"; -		print "success: $1\n"; -	} elsif (/^ok (\d+) # skip (.*)$/) { -		print "test: $1\n"; -		print "skip: $1 [\n$2\n]\n"; -	} elsif (/^not ok (\d+)$/) { -		print "test: $1\n"; -		print "failure: $1\n"; -		$error = 1; -	} else { -		print; -	} -} -exit $error; diff --git a/lib/subunit/update.sh b/lib/subunit/update.sh index 6ef9859c82..6d996cdfd5 100755 --- a/lib/subunit/update.sh +++ b/lib/subunit/update.sh @@ -5,9 +5,9 @@ TARGETDIR="`dirname $0`"  WORKDIR="`mktemp -d`"  bzr branch lp:subunit "$WORKDIR/subunit" -for p in python filters;  +for p in python/ filters/tap2subunit;  do -	rsync -avz --delete "$WORKDIR/subunit/$p/" "$TARGETDIR/$p/" +	rsync -avz --delete "$WORKDIR/subunit/$p" "$TARGETDIR/$p"  done  rm -rf "$WORKDIR" diff --git a/source4/selftest/tests.sh b/source4/selftest/tests.sh index cad97b7ad3..0737b615a5 100755 --- a/source4/selftest/tests.sh +++ b/source4/selftest/tests.sh @@ -85,7 +85,7 @@ smb4torture="$samba4bindir/smbtorture${EXEEXT}"  if which tap2subunit 2>/dev/null; then  	TAP2SUBUNIT=tap2subunit  else -	TAP2SUBUNIT="$PERL $samba4srcdir/../lib/subunit/tap2subunit" +	TAP2SUBUNIT="PYTHONPATH=$samba4srcdir/../lib/subunit/python $PYTHON $samba4srcdir/../lib/subunit/filters/tap2subunit"  fi  $smb4torture -V  | 
