aboutsummaryrefslogtreecommitdiffstats
path: root/sources/pyside6/tests/QtCore/qthread_prod_cons_test.py
blob: a179c936f4bce02825e03bbbc951800c6da11fb4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/python
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0

'''Producer-Consumer test/example with QThread'''

import gc
import logging
import os
from random import random
import sys
import unittest

from pathlib import Path
sys.path.append(os.fspath(Path(__file__).resolve().parents[1]))
from init_paths import init_test_paths
init_test_paths(False)

logging.basicConfig(level=logging.WARNING)

from PySide6.QtCore import QThread, QCoreApplication, QObject, SIGNAL


class Bucket(QObject):
    '''Dummy class to hold the produced values'''
    def __init__(self, max_size=10, *args):
        # Constructor which receives the max number of produced items
        super().__init__(*args)
        self.data = []
        self.max_size = 10

    def pop(self):
        # Retrieves an item
        return self.data.pop(0)

    def push(self, data):
        # Pushes an item
        self.data.append(data)


class Producer(QThread):
    '''Producer thread'''

    def __init__(self, bucket, *args):
        # Constructor. Receives the bucket
        super().__init__(*args)
        self.runs = 0
        self.bucket = bucket
        self.production_list = []

    def run(self):
        # Produces at most bucket.max_size items
        while self.runs < self.bucket.max_size:
            value = int(random() * 10) % 10
            self.bucket.push(value)
            self.production_list.append(value)
            logging.debug(f'PRODUCER - pushed {value}')
            self.runs += 1
            self.msleep(5)


class Consumer(QThread):
    '''Consumer thread'''
    def __init__(self, bucket, *args):
        # Constructor. Receives the bucket
        super().__init__(*args)
        self.runs = 0
        self.bucket = bucket
        self.consumption_list = []

    def run(self):
        # Consumes at most bucket.max_size items
        while self.runs < self.bucket.max_size:
            try:
                value = self.bucket.pop()
                self.consumption_list.append(value)
                logging.debug(f'CONSUMER - got {value}')
                self.runs += 1
            except IndexError:
                logging.debug('CONSUMER - empty bucket')
            self.msleep(5)


class ProducerConsumer(unittest.TestCase):
    '''Basic test case for producer-consumer QThread'''

    def setUp(self):
        # Create fixtures
        self.app = QCoreApplication([])

    def tearDown(self):
        # Destroy fixtures
        del self.app
        # PYSIDE-535: Need to collect garbage in PyPy to trigger deletion
        gc.collect()

    def testProdCon(self):
        # QThread producer-consumer example
        bucket = Bucket()
        prod = Producer(bucket)
        cons = Consumer(bucket)

        cons.finished.connect(QCoreApplication.quit)
        prod.start()
        cons.start()

        self.app.exec()

        self.assertTrue(prod.wait(1000))
        self.assertTrue(cons.wait(1000))

        self.assertEqual(prod.production_list, cons.consumption_list)


if __name__ == '__main__':
    unittest.main()