aboutsummaryrefslogtreecommitdiffstats
path: root/sources/shiboken6/tests/samplebinding/python_thread_test.py
blob: 65398b5c676455dfc7690ff8fe0aa21abdf480b9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0

#!/usr/bin/env python

'''Tests for using Shiboken-based bindings with python threads'''

import logging
import os
from random import random
import sys
import threading
import time
import unittest

from pathlib import Path
sys.path.append(os.fspath(Path(__file__).resolve().parents[1]))
from shiboken_paths import init_paths
init_paths()

import sample

#logging.basicConfig(level=logging.DEBUG)


class Producer(threading.Thread):
    '''Producer thread'''

    def __init__(self, bucket, max_runs, *args):
        #Constructor. Receives the bucket
        super(Producer, self).__init__(*args)
        self.runs = 0
        self.bucket = bucket
        self.max_runs = max_runs
        self.production_list = []

    def run(self):
        while self.runs < self.max_runs:
            value = int(random() * 10) % 10
            self.bucket.push(value)
            self.production_list.append(value)
            logging.debug(f'PRODUCER - pushed {value}')
            self.runs += 1
            #self.msleep(5)
            time.sleep(0.01)


class Consumer(threading.Thread):
    '''Consumer thread'''
    def __init__(self, bucket, max_runs, *args):
        #Constructor. Receives the bucket
        super(Consumer, self).__init__(*args)
        self.runs = 0
        self.bucket = bucket
        self.max_runs = max_runs
        self.consumption_list = []

    def run(self):
        while self.runs < self.max_runs:
            if not self.bucket.empty():
                value = self.bucket.pop()
                self.consumption_list.append(value)
                logging.debug(f'CONSUMER - got {value}')
                self.runs += 1
            else:
                logging.debug('CONSUMER - empty bucket')
            time.sleep(0.01)


class ProducerConsumer(unittest.TestCase):
    '''Basic test case for producer-consumer QThread'''

    def finishCb(self):
        #Quits the application
        self.app.exit(0)

    def testProdCon(self):
        #QThread producer-consumer example
        bucket = sample.Bucket()
        prod = Producer(bucket, 10)
        cons = Consumer(bucket, 10)

        prod.start()
        cons.start()

        #QObject.connect(prod, SIGNAL('finished()'), self.finishCb)
        #QObject.connect(cons, SIGNAL('finished()'), self.finishCb)

        prod.join()
        cons.join()

        self.assertEqual(prod.production_list, cons.consumption_list)


if __name__ == '__main__':
    unittest.main()