aboutsummaryrefslogtreecommitdiffstats
path: root/sources/pyside6/tests/QtAsyncio
diff options
context:
space:
mode:
Diffstat (limited to 'sources/pyside6/tests/QtAsyncio')
-rw-r--r--sources/pyside6/tests/QtAsyncio/CMakeLists.txt2
-rw-r--r--sources/pyside6/tests/QtAsyncio/QtAsyncio.pyproject3
-rw-r--r--sources/pyside6/tests/QtAsyncio/qasyncio_test.py51
-rw-r--r--sources/pyside6/tests/QtAsyncio/qasyncio_test_cancel_task.py46
-rw-r--r--sources/pyside6/tests/QtAsyncio/qasyncio_test_cancel_taskgroup.py57
-rw-r--r--sources/pyside6/tests/QtAsyncio/qasyncio_test_chain.py57
-rw-r--r--sources/pyside6/tests/QtAsyncio/qasyncio_test_executor.py47
-rw-r--r--sources/pyside6/tests/QtAsyncio/qasyncio_test_queues.py70
-rw-r--r--sources/pyside6/tests/QtAsyncio/qasyncio_test_threadsafe.py58
-rw-r--r--sources/pyside6/tests/QtAsyncio/qasyncio_test_time.py49
10 files changed, 440 insertions, 0 deletions
diff --git a/sources/pyside6/tests/QtAsyncio/CMakeLists.txt b/sources/pyside6/tests/QtAsyncio/CMakeLists.txt
new file mode 100644
index 000000000..935e0d90a
--- /dev/null
+++ b/sources/pyside6/tests/QtAsyncio/CMakeLists.txt
@@ -0,0 +1,2 @@
+PYSIDE_TEST(qasyncio_test.py)
+PYSIDE_TEST(qasyncio_test_chain.py)
diff --git a/sources/pyside6/tests/QtAsyncio/QtAsyncio.pyproject b/sources/pyside6/tests/QtAsyncio/QtAsyncio.pyproject
new file mode 100644
index 000000000..a36dcd5ad
--- /dev/null
+++ b/sources/pyside6/tests/QtAsyncio/QtAsyncio.pyproject
@@ -0,0 +1,3 @@
+{
+ "files": ["qt_asyncio_test.py", "qt_asyncio_test_chain.py", "qt_asyncio_test_time.py"]
+}
diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test.py
new file mode 100644
index 000000000..f3c971285
--- /dev/null
+++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test.py
@@ -0,0 +1,51 @@
+# Copyright (C) 2023 The Qt Company Ltd.
+# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
+
+'''Test cases for QtAsyncio'''
+
+import unittest
+import asyncio
+
+from PySide6.QtAsyncio import QAsyncioEventLoopPolicy
+
+
+class QAsyncioTestCase(unittest.TestCase):
+ async def sleep(self, output):
+ output += "Hello"
+ await asyncio.sleep(0.2)
+ output += "World"
+
+ async def gather(self, output):
+ await asyncio.gather(self.sleep(output), self.sleep(output), self.sleep(output))
+
+ def test_sleep(self):
+ outputs_expected = []
+ outputs_real = []
+
+ # Run the code without QAsyncioEventLoopPolicy
+ asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
+ asyncio.run(self.sleep(outputs_expected))
+
+ # Run the code with QAsyncioEventLoopPolicy and QtEventLoop
+ asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy())
+ asyncio.run(self.sleep(outputs_real))
+
+ self.assertEqual(outputs_expected, outputs_real)
+
+ def test_gather(self):
+ outputs_expected = []
+ outputs_real = []
+
+ # Run the code without QAsyncioEventLoopPolicy
+ asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
+ asyncio.run(self.gather(outputs_expected))
+
+ # Run the code with QAsyncioEventLoopPolicy and QtEventLoop
+ asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy())
+ asyncio.run(self.gather(outputs_real))
+
+ self.assertEqual(outputs_expected, outputs_real)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_cancel_task.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_cancel_task.py
new file mode 100644
index 000000000..7ef2bb90d
--- /dev/null
+++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_cancel_task.py
@@ -0,0 +1,46 @@
+# Copyright (C) 2024 The Qt Company Ltd.
+# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
+
+'''Test cases for QtAsyncio'''
+
+import asyncio
+import unittest
+
+import PySide6.QtAsyncio as QtAsyncio
+
+
+class QAsyncioTestCaseCancelTask(unittest.TestCase):
+ # Taken from https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel
+
+ async def cancel_me(self, output):
+ output += "(1) cancel_me(): before sleep"
+
+ try:
+ await asyncio.sleep(10)
+ except asyncio.CancelledError:
+ output += "(2) cancel_me(): cancel sleep"
+ raise
+ finally:
+ output += "(3) cancel_me(): after sleep"
+
+ async def main(self, output):
+ task = asyncio.create_task(self.cancel_me(output))
+ await asyncio.sleep(0.1)
+ task.cancel()
+ try:
+ await task
+ except asyncio.CancelledError:
+ output += "(4) main(): cancel_me is cancelled now"
+
+ def test_await_tasks(self):
+ output_expected = []
+ output_real = []
+
+ asyncio.run(self.main(output_expected))
+ QtAsyncio.run(self.main(output_real), keep_running=False)
+
+ self.assertEqual(output_real, output_expected)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_cancel_taskgroup.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_cancel_taskgroup.py
new file mode 100644
index 000000000..aa8ce4718
--- /dev/null
+++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_cancel_taskgroup.py
@@ -0,0 +1,57 @@
+# Copyright (C) 2024 The Qt Company Ltd.
+# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
+
+'''Test cases for QtAsyncio'''
+
+import asyncio
+import unittest
+
+import PySide6.QtAsyncio as QtAsyncio
+
+
+class QAsyncioTestCaseCancelTaskGroup(unittest.TestCase):
+ def setUp(self) -> None:
+ super().setUp()
+ # We only reach the end of the loop if the task is not cancelled.
+ self.loop_end_reached = False
+
+ async def raise_error(self):
+ raise RuntimeError
+
+ async def loop_short(self):
+ self._loop_end_reached = False
+ for _ in range(1000):
+ await asyncio.sleep(1e-3)
+ self._loop_end_reached = True
+
+ async def loop_shorter(self):
+ self._loop_end_reached = False
+ for _ in range(1000):
+ await asyncio.sleep(1e-4)
+ self._loop_end_reached = True
+
+ async def loop_the_shortest(self):
+ self._loop_end_reached = False
+ for _ in range(1000):
+ await asyncio.to_thread(lambda: None)
+ self._loop_end_reached = True
+
+ async def main(self, coro):
+ async with asyncio.TaskGroup() as tg:
+ tg.create_task(coro())
+ tg.create_task(self.raise_error())
+
+ def test_cancel_taskgroup(self):
+ coros = [self.loop_short, self.loop_shorter, self.loop_the_shortest]
+
+ for coro in coros:
+ try:
+ QtAsyncio.run(self.main(coro), keep_running=False)
+ except ExceptionGroup as e:
+ self.assertEqual(len(e.exceptions), 1)
+ self.assertIsInstance(e.exceptions[0], RuntimeError)
+ self.assertFalse(self._loop_end_reached)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_chain.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_chain.py
new file mode 100644
index 000000000..a0a949720
--- /dev/null
+++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_chain.py
@@ -0,0 +1,57 @@
+# Copyright (C) 2023 The Qt Company Ltd.
+# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
+
+'''Test cases for QtAsyncio'''
+
+import unittest
+import asyncio
+import random
+import time
+
+from PySide6.QtAsyncio import QAsyncioEventLoopPolicy
+
+
+class QAsyncioTestCaseChain(unittest.TestCase):
+
+ async def link(self, output, n, i):
+ t = random.randint(0, 5)
+ output += f"link {i}({n}): {t}s "
+ await asyncio.sleep(i)
+ result = f"result {n}-{i}"
+ output += f"link {i}({n}) finished with {result} "
+ return result
+
+ async def chain(self, output, n):
+ link1 = await self.link(output, n, 0.2)
+ link2 = await self.link(output, n, 0.5)
+ output += f"chain {n}: {link1} -> {link2} "
+
+ async def gather(self, output, *args):
+ await asyncio.gather(*(self.chain(output, n) for n in args))
+
+ def test_chain(self):
+ args = [1, 2, 3]
+
+ outputs_expected = []
+ outputs_real = []
+
+ # Run the code without QAsyncioEventLoopPolicy
+ random.seed(17)
+ asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
+ start = time.perf_counter()
+ asyncio.run(self.gather(outputs_expected, *args))
+ end_expected = time.perf_counter() - start
+
+ # Run the code with QAsyncioEventLoopPolicy and QtEventLoop
+ random.seed(17)
+ asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy())
+ start = time.perf_counter()
+ asyncio.run(self.gather(outputs_real, *args))
+ end_real = time.perf_counter() - start
+
+ self.assertEqual(outputs_expected, outputs_real)
+ self.assertAlmostEqual(end_expected, end_real, delta=0.5)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_executor.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_executor.py
new file mode 100644
index 000000000..25e680b39
--- /dev/null
+++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_executor.py
@@ -0,0 +1,47 @@
+# Copyright (C) 2023 The Qt Company Ltd.
+# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
+
+'''Test cases for QtAsyncio'''
+
+import unittest
+import asyncio
+
+from concurrent.futures import ThreadPoolExecutor
+
+from PySide6.QtCore import QThread
+from PySide6.QtAsyncio import QAsyncioEventLoopPolicy
+
+
+class QAsyncioTestCaseExecutor(unittest.TestCase):
+ def setUp(self) -> None:
+ super().setUp()
+ self.executor_thread = None
+
+ def tearDown(self) -> None:
+ super().tearDown()
+
+ def blocking_function(self):
+ self.executor_thread = QThread.currentThread()
+ return 42
+
+ async def run_asyncio_executor(self):
+ main_thread = QThread.currentThread()
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ result = await asyncio.get_running_loop().run_in_executor(
+ executor, self.blocking_function)
+
+ # Assert that we are back to the main thread.
+ self.assertEqual(QThread.currentThread(), main_thread)
+
+ # Assert that the blocking function was executed in a different thread.
+ self.assertNotEqual(self.executor_thread, main_thread)
+
+ self.assertEqual(result, 42)
+
+ def test_qasyncio_executor(self):
+ asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy())
+ asyncio.run(self.run_asyncio_executor())
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_queues.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_queues.py
new file mode 100644
index 000000000..0bd98c361
--- /dev/null
+++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_queues.py
@@ -0,0 +1,70 @@
+# Copyright (C) 2023 The Qt Company Ltd.
+# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
+
+'''Test cases for QtAsyncio'''
+
+import unittest
+import asyncio
+import random
+import time
+
+from PySide6.QtAsyncio import QAsyncioEventLoopPolicy
+
+
+class QAsyncioTestCaseQueues(unittest.TestCase):
+
+ async def produce(self, output, queue):
+ for _ in range(random.randint(0, 2)):
+ await asyncio.sleep(random.random())
+ await queue.put(self.i)
+ output += f"{self.i} added to queue\n"
+ self.i += 1
+
+ async def consume(self, output, queue):
+ while True:
+ await asyncio.sleep(random.random())
+ i = await queue.get()
+ output += f"{i} pulled from queue\n"
+ queue.task_done()
+
+ async def main(self, output1, output2, num_producers, num_consumers):
+ self.i = 0
+ queue = asyncio.Queue()
+ producers = [
+ asyncio.create_task(self.produce(output1, queue)) for _ in range(num_producers)]
+ consumers = [
+ asyncio.create_task(self.consume(output2, queue)) for _ in range(num_consumers)]
+ await asyncio.gather(*producers)
+ await queue.join()
+ for consumer in consumers:
+ consumer.cancel()
+
+ def test_queues(self):
+ args = [(2, 3), (2, 1)]
+ for arg in args:
+ outputs_expected1 = []
+ outputs_expected2 = []
+ outputs_real1 = []
+ outputs_real2 = []
+
+ # Run the code without QAsyncioEventLoopPolicy
+ random.seed(17)
+ start = time.perf_counter()
+ asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
+ asyncio.run(self.main(outputs_expected1, outputs_expected2, *arg))
+ end_expected = time.perf_counter() - start
+
+ # Run the code with QAsyncioEventLoopPolicy and QtEventLoop
+ random.seed(17)
+ start = time.perf_counter()
+ asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy())
+ asyncio.run(self.main(outputs_real1, outputs_real2, *arg))
+ end_real = time.perf_counter() - start
+
+ self.assertEqual(outputs_expected1, outputs_real1)
+ self.assertEqual(outputs_expected2, outputs_real2)
+ self.assertAlmostEqual(end_expected, end_real, delta=1)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_threadsafe.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_threadsafe.py
new file mode 100644
index 000000000..5b52db239
--- /dev/null
+++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_threadsafe.py
@@ -0,0 +1,58 @@
+# Copyright (C) 2023 The Qt Company Ltd.
+# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
+
+'''Test cases for QtAsyncio'''
+
+import unittest
+import asyncio
+import threading
+import time
+
+from PySide6.QtAsyncio import QAsyncioEventLoopPolicy
+
+
+class QAsyncioTestCaseThreadsafe(unittest.TestCase):
+
+ def setUp(self) -> None:
+ super().setUp()
+ asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy())
+ self.loop_event = asyncio.Event()
+
+ def thread_target(self, is_threadsafe):
+ time.sleep(1)
+ if is_threadsafe:
+ # call_soon_threadsafe() wakes the loop that is in another thread, so the
+ # loop checks the event and will not hang.
+ asyncio.get_event_loop().call_soon_threadsafe(self.loop_event.set)
+ else:
+ # call_soon() does not wake the loop that is in another thread, and so the
+ # loop keeps waiting without checking the event and will hang.
+ asyncio.get_event_loop().call_soon(self.loop_event.set)
+
+ async def coro(self, is_threadsafe):
+ thread = threading.Thread(target=self.thread_target, args=(is_threadsafe,))
+ thread.start()
+
+ task = asyncio.create_task(self.loop_event.wait())
+
+ # The timeout is necessary because the loop will hang for the non-threadsafe case.
+ done, pending = await asyncio.wait([task], timeout=2)
+
+ thread.join()
+
+ if is_threadsafe:
+ self.assertEqual(len(done), 1)
+ self.assertEqual(len(pending), 0)
+ else:
+ self.assertEqual(len(done), 0)
+ self.assertEqual(len(pending), 1)
+
+ def test_not_threadsafe(self):
+ asyncio.run(self.coro(False))
+
+ def test_threadsafe(self):
+ asyncio.run(self.coro(True))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_time.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_time.py
new file mode 100644
index 000000000..07a126644
--- /dev/null
+++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_time.py
@@ -0,0 +1,49 @@
+# Copyright (C) 2023 The Qt Company Ltd.
+# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
+
+'''Test cases for QtAsyncio'''
+
+import unittest
+import asyncio
+import datetime
+
+from PySide6.QtAsyncio import QAsyncioEventLoopPolicy
+
+
+class QAsyncioTestCaseTime(unittest.TestCase):
+
+ def setUp(self):
+ self.previous_time = None
+ self.exception = None
+
+ def display_date(self, end_time, loop):
+ if self.previous_time is not None:
+ try:
+ self.assertAlmostEqual(
+ (datetime.datetime.now() - self.previous_time).total_seconds(), 1, delta=0.1)
+ except AssertionError as e:
+ self.exception = e
+ self.previous_time = datetime.datetime.now()
+ if (loop.time() + 1.0) < end_time:
+ loop.call_later(1, self.display_date, end_time, loop)
+ else:
+ loop.stop()
+
+ def test_time(self):
+ asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy())
+ loop = asyncio.new_event_loop()
+
+ end_time = loop.time() + 3.0
+ loop.call_soon(self.display_date, end_time, loop)
+
+ try:
+ loop.run_forever()
+ finally:
+ loop.close()
+
+ if self.exception is not None:
+ raise self.exception
+
+
+if __name__ == '__main__':
+ unittest.main()