diff options
Diffstat (limited to 'Tools/Scripts/webkitpy/layout_tests/controllers/layout_test_runner.py')
-rw-r--r-- | Tools/Scripts/webkitpy/layout_tests/controllers/layout_test_runner.py | 582 |
1 files changed, 0 insertions, 582 deletions
diff --git a/Tools/Scripts/webkitpy/layout_tests/controllers/layout_test_runner.py b/Tools/Scripts/webkitpy/layout_tests/controllers/layout_test_runner.py deleted file mode 100644 index 3fd40e38f..000000000 --- a/Tools/Scripts/webkitpy/layout_tests/controllers/layout_test_runner.py +++ /dev/null @@ -1,582 +0,0 @@ -# Copyright (C) 2011 Google Inc. All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import logging -import math -import threading -import time - -from webkitpy.common import message_pool -from webkitpy.layout_tests.controllers import single_test_runner -from webkitpy.layout_tests.models.test_run_results import TestRunResults -from webkitpy.layout_tests.models import test_expectations -from webkitpy.layout_tests.models import test_failures -from webkitpy.layout_tests.models import test_results -from webkitpy.tool import grammar - - -_log = logging.getLogger(__name__) - - -TestExpectations = test_expectations.TestExpectations - -# Export this so callers don't need to know about message pools. -WorkerException = message_pool.WorkerException - - -class TestRunInterruptedException(Exception): - """Raised when a test run should be stopped immediately.""" - def __init__(self, reason): - Exception.__init__(self) - self.reason = reason - self.msg = reason - - def __reduce__(self): - return self.__class__, (self.reason,) - - -class LayoutTestRunner(object): - def __init__(self, options, port, printer, results_directory, test_is_slow_fn): - self._options = options - self._port = port - self._printer = printer - self._results_directory = results_directory - self._test_is_slow = test_is_slow_fn - self._sharder = Sharder(self._port.split_test, self._options.max_locked_shards) - self._filesystem = self._port.host.filesystem - - self._expectations = None - self._test_inputs = [] - self._needs_http = None - self._needs_websockets = None - self._retrying = False - - self._current_run_results = None - self._remaining_locked_shards = [] - self._has_http_lock = False - - def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, needs_http, needs_websockets, retrying): - self._expectations = expectations - self._test_inputs = test_inputs - self._needs_http = needs_http - self._needs_websockets = needs_websockets - self._retrying = retrying - - # FIXME: rename all variables to test_run_results or some such ... - run_results = TestRunResults(self._expectations, len(test_inputs) + len(tests_to_skip)) - self._current_run_results = run_results - self._remaining_locked_shards = [] - self._has_http_lock = False - self._printer.num_tests = len(test_inputs) - self._printer.num_started = 0 - - if not retrying: - self._printer.print_expected(run_results, self._expectations.get_tests_with_result_type) - - for test_name in set(tests_to_skip): - result = test_results.TestResult(test_name) - result.type = test_expectations.SKIP - run_results.add(result, expected=True, test_is_slow=self._test_is_slow(test_name)) - - self._printer.write_update('Sharding tests ...') - locked_shards, unlocked_shards = self._sharder.shard_tests(test_inputs, int(self._options.child_processes), self._options.fully_parallel) - - # FIXME: We don't have a good way to coordinate the workers so that - # they don't try to run the shards that need a lock if we don't actually - # have the lock. The easiest solution at the moment is to grab the - # lock at the beginning of the run, and then run all of the locked - # shards first. This minimizes the time spent holding the lock, but - # means that we won't be running tests while we're waiting for the lock. - # If this becomes a problem in practice we'll need to change this. - - all_shards = locked_shards + unlocked_shards - self._remaining_locked_shards = locked_shards - if locked_shards and self._options.http: - self.start_servers_with_lock(2 * min(num_workers, len(locked_shards))) - - num_workers = min(num_workers, len(all_shards)) - self._printer.print_workers_and_shards(num_workers, len(all_shards), len(locked_shards)) - - if self._options.dry_run: - return run_results - - self._printer.write_update('Starting %s ...' % grammar.pluralize('worker', num_workers)) - - try: - with message_pool.get(self, self._worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool: - pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards) - except TestRunInterruptedException, e: - _log.warning(e.reason) - run_results.interrupted = True - except KeyboardInterrupt: - self._printer.flush() - self._printer.writeln('Interrupted, exiting ...') - raise - except Exception, e: - _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e))) - raise - finally: - self.stop_servers_with_lock() - - return run_results - - def _worker_factory(self, worker_connection): - results_directory = self._results_directory - if self._retrying: - self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries')) - results_directory = self._filesystem.join(self._results_directory, 'retries') - return Worker(worker_connection, results_directory, self._options) - - def _mark_interrupted_tests_as_skipped(self, run_results): - for test_input in self._test_inputs: - if test_input.test_name not in run_results.results_by_name: - result = test_results.TestResult(test_input.test_name, [test_failures.FailureEarlyExit()]) - # FIXME: We probably need to loop here if there are multiple iterations. - # FIXME: Also, these results are really neither expected nor unexpected. We probably - # need a third type of result. - run_results.add(result, expected=False, test_is_slow=self._test_is_slow(test_input.test_name)) - - def _interrupt_if_at_failure_limits(self, run_results): - # Note: The messages in this method are constructed to match old-run-webkit-tests - # so that existing buildbot grep rules work. - def interrupt_if_at_failure_limit(limit, failure_count, run_results, message): - if limit and failure_count >= limit: - message += " %d tests run." % (run_results.expected + run_results.unexpected) - self._mark_interrupted_tests_as_skipped(run_results) - raise TestRunInterruptedException(message) - - interrupt_if_at_failure_limit( - self._options.exit_after_n_failures, - run_results.unexpected_failures, - run_results, - "Exiting early after %d failures." % run_results.unexpected_failures) - interrupt_if_at_failure_limit( - self._options.exit_after_n_crashes_or_timeouts, - run_results.unexpected_crashes + run_results.unexpected_timeouts, - run_results, - # This differs from ORWT because it does not include WebProcess crashes. - "Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts)) - - def _update_summary_with_result(self, run_results, result): - if result.type == test_expectations.SKIP: - exp_str = got_str = 'SKIP' - expected = True - else: - expected = self._expectations.matches_an_expected_result(result.test_name, result.type, self._options.pixel_tests or result.reftest_type) - exp_str = self._expectations.get_expectations_string(result.test_name) - got_str = self._expectations.expectation_to_string(result.type) - - run_results.add(result, expected, self._test_is_slow(result.test_name)) - - self._printer.print_finished_test(result, expected, exp_str, got_str) - - self._interrupt_if_at_failure_limits(run_results) - - def start_servers_with_lock(self, number_of_servers): - self._printer.write_update('Acquiring http lock ...') - self._port.acquire_http_lock() - if self._needs_http: - self._printer.write_update('Starting HTTP server ...') - self._port.start_http_server(number_of_servers=number_of_servers) - if self._needs_websockets: - self._printer.write_update('Starting WebSocket server ...') - self._port.start_websocket_server() - self._has_http_lock = True - - def stop_servers_with_lock(self): - if self._has_http_lock: - if self._needs_http: - self._printer.write_update('Stopping HTTP server ...') - self._port.stop_http_server() - if self._needs_websockets: - self._printer.write_update('Stopping WebSocket server ...') - self._port.stop_websocket_server() - self._printer.write_update('Releasing server lock ...') - self._port.release_http_lock() - self._has_http_lock = False - - def handle(self, name, source, *args): - method = getattr(self, '_handle_' + name) - if method: - return method(source, *args) - raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args))) - - def _handle_started_test(self, worker_name, test_input, test_timeout_sec): - self._printer.print_started_test(test_input.test_name) - - def _handle_finished_test_list(self, worker_name, list_name): - def find(name, test_lists): - for i in range(len(test_lists)): - if test_lists[i].name == name: - return i - return -1 - - index = find(list_name, self._remaining_locked_shards) - if index >= 0: - self._remaining_locked_shards.pop(index) - if not self._remaining_locked_shards: - self.stop_servers_with_lock() - - def _handle_finished_test(self, worker_name, result, log_messages=[]): - self._update_summary_with_result(self._current_run_results, result) - - -class Worker(object): - def __init__(self, caller, results_directory, options): - self._caller = caller - self._worker_number = caller.worker_number - self._name = caller.name - self._results_directory = results_directory - self._options = options - - # The remaining fields are initialized in start() - self._host = None - self._port = None - self._batch_size = None - self._batch_count = None - self._filesystem = None - self._driver = None - self._num_tests = 0 - - def __del__(self): - self.stop() - - def start(self): - """This method is called when the object is starting to be used and it is safe - for the object to create state that does not need to be pickled (usually this means - it is called in a child process).""" - self._host = self._caller.host - self._filesystem = self._host.filesystem - self._port = self._host.port_factory.get(self._options.platform, self._options) - - self._batch_count = 0 - self._batch_size = self._options.batch_size or 0 - - def handle(self, name, source, test_list_name, test_inputs): - assert name == 'test_list' - for test_input in test_inputs: - self._run_test(test_input, test_list_name) - self._caller.post('finished_test_list', test_list_name) - - def _update_test_input(self, test_input): - if test_input.reference_files is None: - # Lazy initialization. - test_input.reference_files = self._port.reference_files(test_input.test_name) - if test_input.reference_files: - test_input.should_run_pixel_test = True - else: - test_input.should_run_pixel_test = self._port.should_run_as_pixel_test(test_input) - - def _run_test(self, test_input, shard_name): - self._batch_count += 1 - - stop_when_done = False - if self._batch_size > 0 and self._batch_count >= self._batch_size: - self._batch_count = 0 - stop_when_done = True - - self._update_test_input(test_input) - test_timeout_sec = self._timeout(test_input) - start = time.time() - self._caller.post('started_test', test_input, test_timeout_sec) - - result = self._run_test_with_timeout(test_input, test_timeout_sec, stop_when_done) - result.shard_name = shard_name - result.worker_name = self._name - result.total_run_time = time.time() - start - result.test_number = self._num_tests - self._num_tests += 1 - - self._caller.post('finished_test', result) - - self._clean_up_after_test(test_input, result) - - def stop(self): - _log.debug("%s cleaning up" % self._name) - self._kill_driver() - - def _timeout(self, test_input): - """Compute the appropriate timeout value for a test.""" - # The DumpRenderTree watchdog uses 2.5x the timeout; we want to be - # larger than that. We also add a little more padding if we're - # running tests in a separate thread. - # - # Note that we need to convert the test timeout from a - # string value in milliseconds to a float for Python. - driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0 - if not self._options.run_singly: - return driver_timeout_sec - - thread_padding_sec = 1.0 - thread_timeout_sec = driver_timeout_sec + thread_padding_sec - return thread_timeout_sec - - def _kill_driver(self): - # Be careful about how and when we kill the driver; if driver.stop() - # raises an exception, this routine may get re-entered via __del__. - driver = self._driver - self._driver = None - if driver: - _log.debug("%s killing driver" % self._name) - driver.stop() - - def _run_test_with_timeout(self, test_input, timeout, stop_when_done): - if self._options.run_singly: - return self._run_test_in_another_thread(test_input, timeout, stop_when_done) - return self._run_test_in_this_thread(test_input, stop_when_done) - - def _clean_up_after_test(self, test_input, result): - test_name = test_input.test_name - - if result.failures: - # Check and kill DumpRenderTree if we need to. - if any([f.driver_needs_restart() for f in result.failures]): - self._kill_driver() - # Reset the batch count since the shell just bounced. - self._batch_count = 0 - - # Print the error message(s). - _log.debug("%s %s failed:" % (self._name, test_name)) - for f in result.failures: - _log.debug("%s %s" % (self._name, f.message())) - elif result.type == test_expectations.SKIP: - _log.debug("%s %s skipped" % (self._name, test_name)) - else: - _log.debug("%s %s passed" % (self._name, test_name)) - - def _run_test_in_another_thread(self, test_input, thread_timeout_sec, stop_when_done): - """Run a test in a separate thread, enforcing a hard time limit. - - Since we can only detect the termination of a thread, not any internal - state or progress, we can only run per-test timeouts when running test - files singly. - - Args: - test_input: Object containing the test filename and timeout - thread_timeout_sec: time to wait before killing the driver process. - Returns: - A TestResult - """ - worker = self - - driver = self._port.create_driver(self._worker_number) - - class SingleTestThread(threading.Thread): - def __init__(self): - threading.Thread.__init__(self) - self.result = None - - def run(self): - self.result = worker._run_single_test(driver, test_input, stop_when_done) - - thread = SingleTestThread() - thread.start() - thread.join(thread_timeout_sec) - result = thread.result - failures = [] - if thread.isAlive(): - # If join() returned with the thread still running, the - # DumpRenderTree is completely hung and there's nothing - # more we can do with it. We have to kill all the - # DumpRenderTrees to free it up. If we're running more than - # one DumpRenderTree thread, we'll end up killing the other - # DumpRenderTrees too, introducing spurious crashes. We accept - # that tradeoff in order to avoid losing the rest of this - # thread's results. - _log.error('Test thread hung: killing all DumpRenderTrees') - failures = [test_failures.FailureTimeout()] - - driver.stop() - - if not result: - result = test_results.TestResult(test_input.test_name, failures=failures, test_run_time=0) - return result - - def _run_test_in_this_thread(self, test_input, stop_when_done): - """Run a single test file using a shared DumpRenderTree process. - - Args: - test_input: Object containing the test filename, uri and timeout - - Returns: a TestResult object. - """ - if self._driver and self._driver.has_crashed(): - self._kill_driver() - if not self._driver: - self._driver = self._port.create_driver(self._worker_number) - return self._run_single_test(self._driver, test_input, stop_when_done) - - def _run_single_test(self, driver, test_input, stop_when_done): - return single_test_runner.run_single_test(self._port, self._options, self._results_directory, - self._name, driver, test_input, stop_when_done) - - -class TestShard(object): - """A test shard is a named list of TestInputs.""" - - def __init__(self, name, test_inputs): - self.name = name - self.test_inputs = test_inputs - self.requires_lock = test_inputs[0].requires_lock - - def __repr__(self): - return "TestShard(name='%s', test_inputs=%s, requires_lock=%s'" % (self.name, self.test_inputs, self.requires_lock) - - def __eq__(self, other): - return self.name == other.name and self.test_inputs == other.test_inputs - - -class Sharder(object): - def __init__(self, test_split_fn, max_locked_shards): - self._split = test_split_fn - self._max_locked_shards = max_locked_shards - - def shard_tests(self, test_inputs, num_workers, fully_parallel): - """Groups tests into batches. - This helps ensure that tests that depend on each other (aka bad tests!) - continue to run together as most cross-tests dependencies tend to - occur within the same directory. - Return: - Two list of TestShards. The first contains tests that must only be - run under the server lock, the second can be run whenever. - """ - - # FIXME: Move all of the sharding logic out of manager into its - # own class or module. Consider grouping it with the chunking logic - # in prepare_lists as well. - if num_workers == 1: - return self._shard_in_two(test_inputs) - elif fully_parallel: - return self._shard_every_file(test_inputs) - return self._shard_by_directory(test_inputs, num_workers) - - def _shard_in_two(self, test_inputs): - """Returns two lists of shards, one with all the tests requiring a lock and one with the rest. - - This is used when there's only one worker, to minimize the per-shard overhead.""" - locked_inputs = [] - unlocked_inputs = [] - for test_input in test_inputs: - if test_input.requires_lock: - locked_inputs.append(test_input) - else: - unlocked_inputs.append(test_input) - - locked_shards = [] - unlocked_shards = [] - if locked_inputs: - locked_shards = [TestShard('locked_tests', locked_inputs)] - if unlocked_inputs: - unlocked_shards = [TestShard('unlocked_tests', unlocked_inputs)] - - return locked_shards, unlocked_shards - - def _shard_every_file(self, test_inputs): - """Returns two lists of shards, each shard containing a single test file. - - This mode gets maximal parallelism at the cost of much higher flakiness.""" - locked_shards = [] - unlocked_shards = [] - for test_input in test_inputs: - # Note that we use a '.' for the shard name; the name doesn't really - # matter, and the only other meaningful value would be the filename, - # which would be really redundant. - if test_input.requires_lock: - locked_shards.append(TestShard('.', [test_input])) - else: - unlocked_shards.append(TestShard('.', [test_input])) - - return locked_shards, unlocked_shards - - def _shard_by_directory(self, test_inputs, num_workers): - """Returns two lists of shards, each shard containing all the files in a directory. - - This is the default mode, and gets as much parallelism as we can while - minimizing flakiness caused by inter-test dependencies.""" - locked_shards = [] - unlocked_shards = [] - tests_by_dir = {} - # FIXME: Given that the tests are already sorted by directory, - # we can probably rewrite this to be clearer and faster. - for test_input in test_inputs: - directory = self._split(test_input.test_name)[0] - tests_by_dir.setdefault(directory, []) - tests_by_dir[directory].append(test_input) - - for directory, test_inputs in tests_by_dir.iteritems(): - shard = TestShard(directory, test_inputs) - if test_inputs[0].requires_lock: - locked_shards.append(shard) - else: - unlocked_shards.append(shard) - - # Sort the shards by directory name. - locked_shards.sort(key=lambda shard: shard.name) - unlocked_shards.sort(key=lambda shard: shard.name) - - # Put a ceiling on the number of locked shards, so that we - # don't hammer the servers too badly. - - # FIXME: For now, limit to one shard or set it - # with the --max-locked-shards. After testing to make sure we - # can handle multiple shards, we should probably do something like - # limit this to no more than a quarter of all workers, e.g.: - # return max(math.ceil(num_workers / 4.0), 1) - return (self._resize_shards(locked_shards, self._max_locked_shards, 'locked_shard'), - unlocked_shards) - - def _resize_shards(self, old_shards, max_new_shards, shard_name_prefix): - """Takes a list of shards and redistributes the tests into no more - than |max_new_shards| new shards.""" - - # This implementation assumes that each input shard only contains tests from a - # single directory, and that tests in each shard must remain together; as a - # result, a given input shard is never split between output shards. - # - # Each output shard contains the tests from one or more input shards and - # hence may contain tests from multiple directories. - - def divide_and_round_up(numerator, divisor): - return int(math.ceil(float(numerator) / divisor)) - - def extract_and_flatten(shards): - test_inputs = [] - for shard in shards: - test_inputs.extend(shard.test_inputs) - return test_inputs - - def split_at(seq, index): - return (seq[:index], seq[index:]) - - num_old_per_new = divide_and_round_up(len(old_shards), max_new_shards) - new_shards = [] - remaining_shards = old_shards - while remaining_shards: - some_shards, remaining_shards = split_at(remaining_shards, num_old_per_new) - new_shards.append(TestShard('%s_%d' % (shard_name_prefix, len(new_shards) + 1), extract_and_flatten(some_shards))) - return new_shards |