diff options
Diffstat (limited to 'sources/pyside6/tests/QtAsyncio')
10 files changed, 440 insertions, 0 deletions
diff --git a/sources/pyside6/tests/QtAsyncio/CMakeLists.txt b/sources/pyside6/tests/QtAsyncio/CMakeLists.txt new file mode 100644 index 000000000..935e0d90a --- /dev/null +++ b/sources/pyside6/tests/QtAsyncio/CMakeLists.txt @@ -0,0 +1,2 @@ +PYSIDE_TEST(qasyncio_test.py) +PYSIDE_TEST(qasyncio_test_chain.py) diff --git a/sources/pyside6/tests/QtAsyncio/QtAsyncio.pyproject b/sources/pyside6/tests/QtAsyncio/QtAsyncio.pyproject new file mode 100644 index 000000000..a36dcd5ad --- /dev/null +++ b/sources/pyside6/tests/QtAsyncio/QtAsyncio.pyproject @@ -0,0 +1,3 @@ +{ + "files": ["qt_asyncio_test.py", "qt_asyncio_test_chain.py", "qt_asyncio_test_time.py"] +} diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test.py new file mode 100644 index 000000000..f3c971285 --- /dev/null +++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test.py @@ -0,0 +1,51 @@ +# Copyright (C) 2023 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0 + +'''Test cases for QtAsyncio''' + +import unittest +import asyncio + +from PySide6.QtAsyncio import QAsyncioEventLoopPolicy + + +class QAsyncioTestCase(unittest.TestCase): + async def sleep(self, output): + output += "Hello" + await asyncio.sleep(0.2) + output += "World" + + async def gather(self, output): + await asyncio.gather(self.sleep(output), self.sleep(output), self.sleep(output)) + + def test_sleep(self): + outputs_expected = [] + outputs_real = [] + + # Run the code without QAsyncioEventLoopPolicy + asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) + asyncio.run(self.sleep(outputs_expected)) + + # Run the code with QAsyncioEventLoopPolicy and QtEventLoop + asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy()) + asyncio.run(self.sleep(outputs_real)) + + self.assertEqual(outputs_expected, outputs_real) + + def test_gather(self): + outputs_expected = [] + outputs_real = [] + + # Run the code without QAsyncioEventLoopPolicy + asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) + asyncio.run(self.gather(outputs_expected)) + + # Run the code with QAsyncioEventLoopPolicy and QtEventLoop + asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy()) + asyncio.run(self.gather(outputs_real)) + + self.assertEqual(outputs_expected, outputs_real) + + +if __name__ == '__main__': + unittest.main() diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_cancel_task.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_cancel_task.py new file mode 100644 index 000000000..7ef2bb90d --- /dev/null +++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_cancel_task.py @@ -0,0 +1,46 @@ +# Copyright (C) 2024 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0 + +'''Test cases for QtAsyncio''' + +import asyncio +import unittest + +import PySide6.QtAsyncio as QtAsyncio + + +class QAsyncioTestCaseCancelTask(unittest.TestCase): + # Taken from https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel + + async def cancel_me(self, output): + output += "(1) cancel_me(): before sleep" + + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + output += "(2) cancel_me(): cancel sleep" + raise + finally: + output += "(3) cancel_me(): after sleep" + + async def main(self, output): + task = asyncio.create_task(self.cancel_me(output)) + await asyncio.sleep(0.1) + task.cancel() + try: + await task + except asyncio.CancelledError: + output += "(4) main(): cancel_me is cancelled now" + + def test_await_tasks(self): + output_expected = [] + output_real = [] + + asyncio.run(self.main(output_expected)) + QtAsyncio.run(self.main(output_real), keep_running=False) + + self.assertEqual(output_real, output_expected) + + +if __name__ == '__main__': + unittest.main() diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_cancel_taskgroup.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_cancel_taskgroup.py new file mode 100644 index 000000000..aa8ce4718 --- /dev/null +++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_cancel_taskgroup.py @@ -0,0 +1,57 @@ +# Copyright (C) 2024 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0 + +'''Test cases for QtAsyncio''' + +import asyncio +import unittest + +import PySide6.QtAsyncio as QtAsyncio + + +class QAsyncioTestCaseCancelTaskGroup(unittest.TestCase): + def setUp(self) -> None: + super().setUp() + # We only reach the end of the loop if the task is not cancelled. + self.loop_end_reached = False + + async def raise_error(self): + raise RuntimeError + + async def loop_short(self): + self._loop_end_reached = False + for _ in range(1000): + await asyncio.sleep(1e-3) + self._loop_end_reached = True + + async def loop_shorter(self): + self._loop_end_reached = False + for _ in range(1000): + await asyncio.sleep(1e-4) + self._loop_end_reached = True + + async def loop_the_shortest(self): + self._loop_end_reached = False + for _ in range(1000): + await asyncio.to_thread(lambda: None) + self._loop_end_reached = True + + async def main(self, coro): + async with asyncio.TaskGroup() as tg: + tg.create_task(coro()) + tg.create_task(self.raise_error()) + + def test_cancel_taskgroup(self): + coros = [self.loop_short, self.loop_shorter, self.loop_the_shortest] + + for coro in coros: + try: + QtAsyncio.run(self.main(coro), keep_running=False) + except ExceptionGroup as e: + self.assertEqual(len(e.exceptions), 1) + self.assertIsInstance(e.exceptions[0], RuntimeError) + self.assertFalse(self._loop_end_reached) + + +if __name__ == '__main__': + unittest.main() diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_chain.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_chain.py new file mode 100644 index 000000000..a0a949720 --- /dev/null +++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_chain.py @@ -0,0 +1,57 @@ +# Copyright (C) 2023 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0 + +'''Test cases for QtAsyncio''' + +import unittest +import asyncio +import random +import time + +from PySide6.QtAsyncio import QAsyncioEventLoopPolicy + + +class QAsyncioTestCaseChain(unittest.TestCase): + + async def link(self, output, n, i): + t = random.randint(0, 5) + output += f"link {i}({n}): {t}s " + await asyncio.sleep(i) + result = f"result {n}-{i}" + output += f"link {i}({n}) finished with {result} " + return result + + async def chain(self, output, n): + link1 = await self.link(output, n, 0.2) + link2 = await self.link(output, n, 0.5) + output += f"chain {n}: {link1} -> {link2} " + + async def gather(self, output, *args): + await asyncio.gather(*(self.chain(output, n) for n in args)) + + def test_chain(self): + args = [1, 2, 3] + + outputs_expected = [] + outputs_real = [] + + # Run the code without QAsyncioEventLoopPolicy + random.seed(17) + asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) + start = time.perf_counter() + asyncio.run(self.gather(outputs_expected, *args)) + end_expected = time.perf_counter() - start + + # Run the code with QAsyncioEventLoopPolicy and QtEventLoop + random.seed(17) + asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy()) + start = time.perf_counter() + asyncio.run(self.gather(outputs_real, *args)) + end_real = time.perf_counter() - start + + self.assertEqual(outputs_expected, outputs_real) + self.assertAlmostEqual(end_expected, end_real, delta=0.5) + + +if __name__ == '__main__': + unittest.main() diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_executor.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_executor.py new file mode 100644 index 000000000..25e680b39 --- /dev/null +++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_executor.py @@ -0,0 +1,47 @@ +# Copyright (C) 2023 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0 + +'''Test cases for QtAsyncio''' + +import unittest +import asyncio + +from concurrent.futures import ThreadPoolExecutor + +from PySide6.QtCore import QThread +from PySide6.QtAsyncio import QAsyncioEventLoopPolicy + + +class QAsyncioTestCaseExecutor(unittest.TestCase): + def setUp(self) -> None: + super().setUp() + self.executor_thread = None + + def tearDown(self) -> None: + super().tearDown() + + def blocking_function(self): + self.executor_thread = QThread.currentThread() + return 42 + + async def run_asyncio_executor(self): + main_thread = QThread.currentThread() + with ThreadPoolExecutor(max_workers=2) as executor: + result = await asyncio.get_running_loop().run_in_executor( + executor, self.blocking_function) + + # Assert that we are back to the main thread. + self.assertEqual(QThread.currentThread(), main_thread) + + # Assert that the blocking function was executed in a different thread. + self.assertNotEqual(self.executor_thread, main_thread) + + self.assertEqual(result, 42) + + def test_qasyncio_executor(self): + asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy()) + asyncio.run(self.run_asyncio_executor()) + + +if __name__ == '__main__': + unittest.main() diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_queues.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_queues.py new file mode 100644 index 000000000..0bd98c361 --- /dev/null +++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_queues.py @@ -0,0 +1,70 @@ +# Copyright (C) 2023 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0 + +'''Test cases for QtAsyncio''' + +import unittest +import asyncio +import random +import time + +from PySide6.QtAsyncio import QAsyncioEventLoopPolicy + + +class QAsyncioTestCaseQueues(unittest.TestCase): + + async def produce(self, output, queue): + for _ in range(random.randint(0, 2)): + await asyncio.sleep(random.random()) + await queue.put(self.i) + output += f"{self.i} added to queue\n" + self.i += 1 + + async def consume(self, output, queue): + while True: + await asyncio.sleep(random.random()) + i = await queue.get() + output += f"{i} pulled from queue\n" + queue.task_done() + + async def main(self, output1, output2, num_producers, num_consumers): + self.i = 0 + queue = asyncio.Queue() + producers = [ + asyncio.create_task(self.produce(output1, queue)) for _ in range(num_producers)] + consumers = [ + asyncio.create_task(self.consume(output2, queue)) for _ in range(num_consumers)] + await asyncio.gather(*producers) + await queue.join() + for consumer in consumers: + consumer.cancel() + + def test_queues(self): + args = [(2, 3), (2, 1)] + for arg in args: + outputs_expected1 = [] + outputs_expected2 = [] + outputs_real1 = [] + outputs_real2 = [] + + # Run the code without QAsyncioEventLoopPolicy + random.seed(17) + start = time.perf_counter() + asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) + asyncio.run(self.main(outputs_expected1, outputs_expected2, *arg)) + end_expected = time.perf_counter() - start + + # Run the code with QAsyncioEventLoopPolicy and QtEventLoop + random.seed(17) + start = time.perf_counter() + asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy()) + asyncio.run(self.main(outputs_real1, outputs_real2, *arg)) + end_real = time.perf_counter() - start + + self.assertEqual(outputs_expected1, outputs_real1) + self.assertEqual(outputs_expected2, outputs_real2) + self.assertAlmostEqual(end_expected, end_real, delta=1) + + +if __name__ == "__main__": + unittest.main() diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_threadsafe.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_threadsafe.py new file mode 100644 index 000000000..5b52db239 --- /dev/null +++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_threadsafe.py @@ -0,0 +1,58 @@ +# Copyright (C) 2023 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0 + +'''Test cases for QtAsyncio''' + +import unittest +import asyncio +import threading +import time + +from PySide6.QtAsyncio import QAsyncioEventLoopPolicy + + +class QAsyncioTestCaseThreadsafe(unittest.TestCase): + + def setUp(self) -> None: + super().setUp() + asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy()) + self.loop_event = asyncio.Event() + + def thread_target(self, is_threadsafe): + time.sleep(1) + if is_threadsafe: + # call_soon_threadsafe() wakes the loop that is in another thread, so the + # loop checks the event and will not hang. + asyncio.get_event_loop().call_soon_threadsafe(self.loop_event.set) + else: + # call_soon() does not wake the loop that is in another thread, and so the + # loop keeps waiting without checking the event and will hang. + asyncio.get_event_loop().call_soon(self.loop_event.set) + + async def coro(self, is_threadsafe): + thread = threading.Thread(target=self.thread_target, args=(is_threadsafe,)) + thread.start() + + task = asyncio.create_task(self.loop_event.wait()) + + # The timeout is necessary because the loop will hang for the non-threadsafe case. + done, pending = await asyncio.wait([task], timeout=2) + + thread.join() + + if is_threadsafe: + self.assertEqual(len(done), 1) + self.assertEqual(len(pending), 0) + else: + self.assertEqual(len(done), 0) + self.assertEqual(len(pending), 1) + + def test_not_threadsafe(self): + asyncio.run(self.coro(False)) + + def test_threadsafe(self): + asyncio.run(self.coro(True)) + + +if __name__ == '__main__': + unittest.main() diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_time.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_time.py new file mode 100644 index 000000000..07a126644 --- /dev/null +++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_time.py @@ -0,0 +1,49 @@ +# Copyright (C) 2023 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0 + +'''Test cases for QtAsyncio''' + +import unittest +import asyncio +import datetime + +from PySide6.QtAsyncio import QAsyncioEventLoopPolicy + + +class QAsyncioTestCaseTime(unittest.TestCase): + + def setUp(self): + self.previous_time = None + self.exception = None + + def display_date(self, end_time, loop): + if self.previous_time is not None: + try: + self.assertAlmostEqual( + (datetime.datetime.now() - self.previous_time).total_seconds(), 1, delta=0.1) + except AssertionError as e: + self.exception = e + self.previous_time = datetime.datetime.now() + if (loop.time() + 1.0) < end_time: + loop.call_later(1, self.display_date, end_time, loop) + else: + loop.stop() + + def test_time(self): + asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy()) + loop = asyncio.new_event_loop() + + end_time = loop.time() + 3.0 + loop.call_soon(self.display_date, end_time, loop) + + try: + loop.run_forever() + finally: + loop.close() + + if self.exception is not None: + raise self.exception + + +if __name__ == '__main__': + unittest.main() |