summaryrefslogtreecommitdiff
path: root/lib/subunit/python/testtools/testsuite.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/subunit/python/testtools/testsuite.py')
-rw-r--r--lib/subunit/python/testtools/testsuite.py74
1 files changed, 74 insertions, 0 deletions
diff --git a/lib/subunit/python/testtools/testsuite.py b/lib/subunit/python/testtools/testsuite.py
new file mode 100644
index 0000000000..26b193799b
--- /dev/null
+++ b/lib/subunit/python/testtools/testsuite.py
@@ -0,0 +1,74 @@
+# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details.
+
+"""Test suites and related things."""
+
+__metaclass__ = type
+__all__ = [
+ 'ConcurrentTestSuite',
+ ]
+
+try:
+ import Queue
+except ImportError:
+ import queue as Queue
+import threading
+import unittest
+
+import testtools
+
+
+class ConcurrentTestSuite(unittest.TestSuite):
+ """A TestSuite whose run() calls out to a concurrency strategy."""
+
+ def __init__(self, suite, make_tests):
+ """Create a ConcurrentTestSuite to execute suite.
+
+ :param suite: A suite to run concurrently.
+ :param make_tests: A helper function to split the tests in the
+ ConcurrentTestSuite into some number of concurrently executing
+ sub-suites. make_tests must take a suite, and return an iterable
+ of TestCase-like object, each of which must have a run(result)
+ method.
+ """
+ super(ConcurrentTestSuite, self).__init__([suite])
+ self.make_tests = make_tests
+
+ def run(self, result):
+ """Run the tests concurrently.
+
+ This calls out to the provided make_tests helper, and then serialises
+ the results so that result only sees activity from one TestCase at
+ a time.
+
+ ConcurrentTestSuite provides no special mechanism to stop the tests
+ returned by make_tests, it is up to the make_tests to honour the
+ shouldStop attribute on the result object they are run with, which will
+ be set if an exception is raised in the thread which
+ ConcurrentTestSuite.run is called in.
+ """
+ tests = self.make_tests(self)
+ try:
+ threads = {}
+ queue = Queue.Queue()
+ result_semaphore = threading.Semaphore(1)
+ for test in tests:
+ process_result = testtools.ThreadsafeForwardingResult(result,
+ result_semaphore)
+ reader_thread = threading.Thread(
+ target=self._run_test, args=(test, process_result, queue))
+ threads[test] = reader_thread, process_result
+ reader_thread.start()
+ while threads:
+ finished_test = queue.get()
+ threads[finished_test][0].join()
+ del threads[finished_test]
+ except:
+ for thread, process_result in threads.values():
+ process_result.stop()
+ raise
+
+ def _run_test(self, test, process_result, queue):
+ try:
+ test.run(process_result)
+ finally:
+ queue.put(test)