aboutsummaryrefslogtreecommitdiffstats
path: root/sources/pyside6/tests/QtAsyncio/qasyncio_test_queues.py
blob: 0bd98c361f285811e7ec2e12816b1fedc4aad579 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# Copyright (C) 2023 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0

'''Test cases for QtAsyncio'''

import unittest
import asyncio
import random
import time

from PySide6.QtAsyncio import QAsyncioEventLoopPolicy


class QAsyncioTestCaseQueues(unittest.TestCase):

    async def produce(self, output, queue):
        for _ in range(random.randint(0, 2)):
            await asyncio.sleep(random.random())
            await queue.put(self.i)
            output += f"{self.i} added to queue\n"
            self.i += 1

    async def consume(self, output, queue):
        while True:
            await asyncio.sleep(random.random())
            i = await queue.get()
            output += f"{i} pulled from queue\n"
            queue.task_done()

    async def main(self, output1, output2, num_producers, num_consumers):
        self.i = 0
        queue = asyncio.Queue()
        producers = [
            asyncio.create_task(self.produce(output1, queue)) for _ in range(num_producers)]
        consumers = [
            asyncio.create_task(self.consume(output2, queue)) for _ in range(num_consumers)]
        await asyncio.gather(*producers)
        await queue.join()
        for consumer in consumers:
            consumer.cancel()

    def test_queues(self):
        args = [(2, 3), (2, 1)]
        for arg in args:
            outputs_expected1 = []
            outputs_expected2 = []
            outputs_real1 = []
            outputs_real2 = []

            # Run the code without QAsyncioEventLoopPolicy
            random.seed(17)
            start = time.perf_counter()
            asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
            asyncio.run(self.main(outputs_expected1, outputs_expected2, *arg))
            end_expected = time.perf_counter() - start

            # Run the code with QAsyncioEventLoopPolicy and QtEventLoop
            random.seed(17)
            start = time.perf_counter()
            asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy())
            asyncio.run(self.main(outputs_real1, outputs_real2, *arg))
            end_real = time.perf_counter() - start

            self.assertEqual(outputs_expected1, outputs_real1)
            self.assertEqual(outputs_expected2, outputs_real2)
            self.assertAlmostEqual(end_expected, end_real, delta=1)


if __name__ == "__main__":
    unittest.main()