summaryrefslogtreecommitdiff
path: root/lib/subunit/python/testtools/testsuite.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/subunit/python/testtools/testsuite.py')
-rw-r--r--lib/subunit/python/testtools/testsuite.py74
1 files changed, 0 insertions, 74 deletions
diff --git a/lib/subunit/python/testtools/testsuite.py b/lib/subunit/python/testtools/testsuite.py
deleted file mode 100644
index 26b193799b..0000000000
--- a/lib/subunit/python/testtools/testsuite.py
+++ /dev/null
@@ -1,74 +0,0 @@
-# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details.
-
-"""Test suites and related things."""
-
-__metaclass__ = type
-__all__ = [
- 'ConcurrentTestSuite',
- ]
-
-try:
- import Queue
-except ImportError:
- import queue as Queue
-import threading
-import unittest
-
-import testtools
-
-
-class ConcurrentTestSuite(unittest.TestSuite):
- """A TestSuite whose run() calls out to a concurrency strategy."""
-
- def __init__(self, suite, make_tests):
- """Create a ConcurrentTestSuite to execute suite.
-
- :param suite: A suite to run concurrently.
- :param make_tests: A helper function to split the tests in the
- ConcurrentTestSuite into some number of concurrently executing
- sub-suites. make_tests must take a suite, and return an iterable
- of TestCase-like object, each of which must have a run(result)
- method.
- """
- super(ConcurrentTestSuite, self).__init__([suite])
- self.make_tests = make_tests
-
- def run(self, result):
- """Run the tests concurrently.
-
- This calls out to the provided make_tests helper, and then serialises
- the results so that result only sees activity from one TestCase at
- a time.
-
- ConcurrentTestSuite provides no special mechanism to stop the tests
- returned by make_tests, it is up to the make_tests to honour the
- shouldStop attribute on the result object they are run with, which will
- be set if an exception is raised in the thread which
- ConcurrentTestSuite.run is called in.
- """
- tests = self.make_tests(self)
- try:
- threads = {}
- queue = Queue.Queue()
- result_semaphore = threading.Semaphore(1)
- for test in tests:
- process_result = testtools.ThreadsafeForwardingResult(result,
- result_semaphore)
- reader_thread = threading.Thread(
- target=self._run_test, args=(test, process_result, queue))
- threads[test] = reader_thread, process_result
- reader_thread.start()
- while threads:
- finished_test = queue.get()
- threads[finished_test][0].join()
- del threads[finished_test]
- except:
- for thread, process_result in threads.values():
- process_result.stop()
- raise
-
- def _run_test(self, test, process_result, queue):
- try:
- test.run(process_result)
- finally:
- queue.put(test)