1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
#!/usr/bin/python
'''Producer-Consumer test/example with QThread'''
import unittest
from random import random
import logging
logging.basicConfig(level=logging.WARNING)
from PySide.QtCore import QThread, QCoreApplication, QObject, SIGNAL
class Bucket(QObject):
'''Dummy class to hold the produced values'''
def __init__(self, max_size=10, *args):
#Constructor which receives the max number of produced items
super(Bucket, self).__init__(*args)
self.data = []
self.max_size = 10
def pop(self):
#Retrieves an item
return self.data.pop(0)
def push(self, data):
#Pushes an item
self.data.append(data)
class Producer(QThread):
'''Producer thread'''
def __init__(self, bucket, *args):
#Constructor. Receives the bucket
super(Producer, self).__init__(*args)
self.runs = 0
self.bucket = bucket
self.production_list = []
def run(self):
#Produces at most bucket.max_size items
while self.runs < self.bucket.max_size:
value = int(random()*10) % 10
self.bucket.push(value)
self.production_list.append(value)
logging.debug('PRODUCER - pushed %d' % value)
self.runs += 1
self.msleep(5)
class Consumer(QThread):
'''Consumer thread'''
def __init__(self, bucket, *args):
#Constructor. Receives the bucket
super(Consumer, self).__init__(*args)
self.runs = 0
self.bucket = bucket
self.consumption_list = []
def run(self):
#Consumes at most bucket.max_size items
while self.runs < self.bucket.max_size:
try:
value = self.bucket.pop()
self.consumption_list.append(value)
logging.debug('CONSUMER - got %d' % value)
self.runs += 1
except IndexError:
logging.debug('CONSUMER - empty bucket')
self.msleep(5)
class ProducerConsumer(unittest.TestCase):
'''Basic test case for producer-consumer QThread'''
def setUp(self):
#Create fixtures
self.app = QCoreApplication([])
def tearDown(self):
#Destroy fixtures
del self.app
def finishCb(self):
#Quits the application
self.app.exit(0)
def testProdCon(self):
#QThread producer-consumer example
bucket = Bucket()
prod = Producer(bucket)
cons = Consumer(bucket)
prod.start()
cons.start()
QObject.connect(prod, SIGNAL('finished()'), self.finishCb)
QObject.connect(cons, SIGNAL('finished()'), self.finishCb)
self.app.exec_()
prod.wait()
cons.wait()
self.assertEqual(prod.production_list, cons.consumption_list)
if __name__ == '__main__':
unittest.main()
|