summaryrefslogtreecommitdiffstats
path: root/util/wasm/wasmtestrunner/qt-wasmtestrunner.py
diff options
context:
space:
mode:
Diffstat (limited to 'util/wasm/wasmtestrunner/qt-wasmtestrunner.py')
-rwxr-xr-xutil/wasm/wasmtestrunner/qt-wasmtestrunner.py331
1 files changed, 331 insertions, 0 deletions
diff --git a/util/wasm/wasmtestrunner/qt-wasmtestrunner.py b/util/wasm/wasmtestrunner/qt-wasmtestrunner.py
new file mode 100755
index 0000000000..7eb840f1cb
--- /dev/null
+++ b/util/wasm/wasmtestrunner/qt-wasmtestrunner.py
@@ -0,0 +1,331 @@
+#!/usr/bin/env python3
+# Copyright (C) 2021 The Qt Company Ltd.
+# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
+
+import argparse
+from selenium.webdriver.chrome.service import Service
+from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
+from selenium.webdriver.support import expected_conditions
+from selenium.webdriver.support.ui import WebDriverWait
+from selenium import webdriver
+from pathlib import Path
+import typing
+import http.server
+import subprocess
+import threading
+import psutil
+import re
+import os
+from signal import SIGINT
+
+import sys
+
+
+class StdoutOutputSink(object):
+ def __init__(self):
+ pass
+
+ def write(self, data: str):
+ print(data)
+
+ def __enter__(self):
+ pass
+
+ def __exit__(self, _, __, ___):
+ pass
+
+
+class FileOutputSink(object):
+ def __init__(self, filename: str):
+ self.__filename = filename
+ self.__file = None
+
+ def write(self, data: str):
+ self.__file.write(data)
+
+ def __enter__(self):
+ self.__file = open(self.__filename, 'w')
+
+ def __exit__(self, _, __, ___):
+ self.__file.close()
+
+
+class OutputMulticast(object):
+ def __init__(self, destinations: typing.List[str]):
+ self.__sinks: typing.List[typing.Union[StdoutOutputSink, FileOutputSink]] = [
+ ]
+ self.__destinations = [
+ 'stdout'] if destinations is None else destinations
+ number_of_stdout_sinks = sum(
+ [1 if destination == 'stdout' else 0 for destination in self.__destinations])
+ if number_of_stdout_sinks > 1:
+ raise Exception('Maximum allowed number of stdout sinks is 1')
+
+ def write(self, data: str):
+ for sink in self.__sinks:
+ sink.write(data)
+
+ def _makeSink(self, destination: str):
+ return StdoutOutputSink() if 'stdout' == destination else FileOutputSink(destination)
+
+ def __enter__(self):
+ for destination in self.__destinations:
+ sink = self._makeSink(destination)
+ sink.__enter__()
+ self.__sinks.append(sink)
+ return self
+
+ def __exit__(self, _, __, ___):
+ for sink in reversed(self.__sinks):
+ sink.__exit__(_, __, ___)
+
+
+class WasmTestRunner:
+ def __init__(self, args: dict):
+ self.server_process = None
+ self.browser_process = None
+ self.python_path = Path(sys.executable)
+ self.script_dir = Path(os.path.dirname(os.path.realpath(__file__)))
+ self.host = 'localhost'
+ self.webserver = None
+ self.webthread = None
+
+ paths = ['html_path', 'browser_path', 'chromedriver_path', 'tmp_dir']
+
+ for key, value in args.items():
+ if value is None:
+ continue
+ if key in paths:
+ value = Path(value)
+ value.resolve()
+ setattr(self, key, value)
+
+ if not self.html_path.exists():
+ raise FileNotFoundError(self.html_path)
+
+ self.webroot = self.html_path.parent
+
+ if hasattr(self, 'browser_path') and not self.browser_path.exists():
+ raise FileNotFoundError(self.browser_path)
+
+ def run(self):
+ self.run_threaded_webserver()
+
+ with OutputMulticast(
+ self.output if hasattr(self, 'output') else ['stdout']) as output_multicast:
+ try:
+ if self.use_browser:
+ return self.run_wasm_browser()
+ else:
+ return self.run_wasm_webdriver(output_multicast)
+ finally:
+ self.cleanup()
+
+ def run_webserver(self):
+ webroot = self.html_path.parent.resolve()
+ self.server_process =\
+ subprocess.Popen([
+ str(self.python_path),
+ '-m', 'http.server',
+ '--directory', str(webroot),
+ self.port
+ ])
+
+ def run_threaded_webserver(self):
+ self.webserver = http.server.ThreadingHTTPServer(
+ (self.host, int(self.port)), self.get_http_handler_class())
+
+ self.webthread = threading.Thread(target=self.webserver.serve_forever)
+ self.webthread.start()
+
+ def shutdown_threaded_webserver(self):
+ if self.webserver is not None:
+ self.webserver.shutdown()
+ if self.webthread is not None:
+ self.webthread.join()
+
+ def run_wasm_webdriver(self, output_multicast: OutputMulticast):
+ url = f'http://localhost:{self.port}/{self.html_path.name}'
+ if (self.batched_test is not None):
+ url = f'{url}?qtestname={self.batched_test}&qtestoutputformat={self.format}'
+
+ d = DesiredCapabilities.CHROME
+ d['goog:loggingPrefs'] = {'browser': 'ALL'}
+ ser = Service(executable_path=self.chromedriver_path)
+ driver = webdriver.Chrome(desired_capabilities=d, service=ser)
+ driver.get(url)
+ driver.execute_script(
+ """ const status = qtTestRunner.status;
+ const onFinished = status => {
+ if (status === 'Completed' || status === 'Error')
+ document.title = 'qtFinished';
+ };
+ onFinished(status);
+ qtTestRunner.onStatusChanged.addEventListener(onFinished);
+ """)
+
+ WebDriverWait(driver, self.timeout).until(
+ expected_conditions.title_is('qtFinished'))
+
+ runner_status = driver.execute_script(f"return qtTestRunner.status")
+ if runner_status == 'Error':
+ output_multicast.write(driver.execute_script(
+ "return qtTestRunner.errorDetails"))
+ return -1
+ else:
+ assert runner_status == 'Completed'
+ output_multicast.write(driver.execute_script(
+ f"return qtTestRunner.results.get('{self.batched_test}').textOutput"))
+ return driver.execute_script(
+ f"return qtTestRunner.results.get('{self.batched_test}').exitCode")
+
+ def run_wasm_browser(self):
+ if not hasattr(self, 'browser_path'):
+ print('Error: browser path must be set to run with browser')
+ return
+
+ if not hasattr(self, 'tmp_dir'):
+ print('Error: tmp_dir must be set to run with browser')
+ return
+
+ self.create_tmp_dir()
+ self.browser_process =\
+ subprocess.Popen([
+ str(self.browser_path),
+ '--user-data-dir=' + str(self.tmp_dir),
+ '--enable-logging=stderr',
+ f'http://localhost:{self.port}/{self.html_path.name}'
+ ],
+ stderr=subprocess.PIPE
+ )
+
+ # Only capture the console content
+ regex = re.compile(r'[^"]*CONSOLE[^"]*"(.*)"[.\w]*')
+
+ for line in self.browser_process.stderr:
+ str_line = line.decode('utf-8')
+
+ match = regex.match(str_line)
+
+ # Error condition, this should have matched
+ if 'CONSOLE' in str_line and match is None:
+ print('Error: did not match console line:')
+ print(str_line)
+
+ if match is not None:
+ console_line = match.group(1)
+ print(console_line)
+
+ if 'Finished testing' in str_line:
+ self.browser_process.kill()
+ break
+
+ @staticmethod
+ def get_loader_variable(driver, varname: str):
+ return driver.execute_script('return qtLoader.' + varname)
+
+ def create_tmp_dir(self):
+ if not self.tmp_dir.exists():
+ self.tmp_dir.mkdir()
+
+ if not self.tmp_dir.is_dir():
+ raise NotADirectoryError(self.tmp_dir)
+
+ # Needed to bypass the "Welcome to Chrome" prompt
+ first_run = Path(self.tmp_dir, 'First Run')
+ first_run.touch()
+
+ def get_http_handler_class(self):
+ wtr = self
+
+ class OriginIsolationHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
+ def __init__(self, request, client_address, server):
+ super().__init__(request, client_address, server, directory=wtr.webroot)
+
+ # Headers required to enable SharedArrayBuffer
+ # See https://web.dev/cross-origin-isolation-guide/
+ def end_headers(self):
+ self.send_header("Cross-Origin-Opener-Policy", "same-origin")
+ self.send_header(
+ "Cross-Origin-Embedder-Policy", "require-corp")
+ self.send_header(
+ "Cross-Origin-Resource-Policy", "cross-origin")
+ http.server.SimpleHTTPRequestHandler.end_headers(self)
+
+ # We usually don't care that much about what the webserver is logging
+ def log_message(self, format_, *args):
+ return
+
+ return OriginIsolationHTTPRequestHandler
+
+ def cleanup(self):
+ if self.browser_process is not None:
+ self.browser_process.kill()
+ if self.server_process is not None:
+ self.server_process.kill()
+ self.shutdown_threaded_webserver()
+
+
+class BackendProcess:
+ def __init__(self) -> None:
+ self.__process = subprocess.Popen(
+ [sys.executable, *sys.argv, '--backend'], shell=False, stdout=subprocess.PIPE)
+
+ def abort(self):
+ current_process = psutil.Process(self.__process.pid)
+ children = current_process.children(recursive=True)
+ for child in [*children, current_process]:
+ os.kill(child.pid, SIGINT)
+
+ def communicate(self, timeout):
+ return self.__process.communicate(timeout)[0].decode('utf-8')
+
+ def returncode(self):
+ return self.__process.returncode
+
+
+def main():
+ parser = argparse.ArgumentParser(description='WASM testrunner')
+ parser.add_argument('html_path', help='Path to the HTML file to request')
+ parser.add_argument(
+ '--batched_test', help='Specifies a batched test to run')
+ parser.add_argument('--timeout', help='Test timeout',
+ type=int, default=120)
+ parser.add_argument(
+ '--port', help='Port to run the webserver on', default='8000')
+ parser.add_argument('--use_browser', action='store_true')
+ parser.add_argument('--browser_path', help='Path to the browser to use')
+ parser.add_argument('--chromedriver_path', help='Absolute path to chromedriver',
+ default='chromedriver')
+ parser.add_argument('--tmp_dir', help='Path to the tmpdir to use when using a browser',
+ default='/tmp/wasm-testrunner')
+ parser.add_argument(
+ '-o', help='filename. Filename may be "stdout" to write to stdout.',
+ action='append', dest='output')
+ parser.add_argument(
+ '--format', help='Output format', choices=['txt', 'xml', 'lightxml', 'junitxml', 'tap'],
+ default='txt')
+ parser.add_argument(
+ '--backend', help='Run as a backend process. There are two types of test runner processes - '
+ 'the main monitoring process and the backend processes launched by it. The tests are '
+ 'run on the backend to avoid any undesired behavior, like deadlocks in browser main process, '
+ 'spilling over across test cases.',
+ action='store_true')
+
+ args = parser.parse_args()
+ if not args.backend:
+ backend_process = BackendProcess()
+ try:
+ stdout = backend_process.communicate(args.timeout)
+ print(stdout)
+ return backend_process.returncode()
+ except Exception as e:
+ print(f"Exception while executing test {e}")
+ backend_process.abort()
+ return -1
+
+ return WasmTestRunner(vars(args)).run()
+
+
+if __name__ == '__main__':
+ sys.exit(main())