aboutsummaryrefslogtreecommitdiffstats
path: root/packaging-tools/threadedwork.py
blob: 7d098ea8e3b48fc9c048aeccfeb85bda13c4de1b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

#############################################################################
#
# Copyright (C) 2022 The Qt Company Ltd.
# Contact: https://www.qt.io/licensing/
#
# This file is part of the release tools of the Qt Toolkit.
#
# $QT_BEGIN_LICENSE:GPL-EXCEPT$
# Commercial License Usage
# Licensees holding valid commercial Qt licenses may use this file in
# accordance with the commercial license agreement provided with the
# Software or, alternatively, in accordance with the terms contained in
# a written agreement between you and The Qt Company. For licensing terms
# and conditions see https://www.qt.io/terms-conditions. For further
# information use the contact form at https://www.qt.io/contact-us.
#
# GNU General Public License Usage
# Alternatively, this file may be used under the terms of the GNU
# General Public License version 3 as published by the Free Software
# Foundation with exceptions as appearing in the file LICENSE.GPL3-EXCEPT
# included in the packaging of this file. Please review the following
# information to ensure the GNU General Public License requirements will
# be met: https://www.gnu.org/licenses/gpl-3.0.html.
#
# $QT_END_LICENSE$
#
#############################################################################
import builtins
import itertools
import os
import sys
import threading
from multiprocessing import cpu_count
from queue import Queue
from time import sleep
from traceback import format_exc
from typing import Any, List, Optional

# we are using RLock, because threaded_print is using the same lock
output_lock = threading.RLock()  # pylint: disable=invalid-name
output_states = []  # pylint: disable=invalid-name
output_format_string = ''  # pylint: disable=invalid-name


# prepare our std output hooks
class StdOutHook:
    def write(self, text: str) -> None:
        # general print method sends line break just ignore that
        stripped_text = text.strip()
        if stripped_text == "":
            return
        local_progress_indicator = None
        if len(stripped_text) > 6:
            local_progress_indicator = next_progress_indicator()
        else:
            local_progress_indicator = stripped_text

        new_value = f"{thread_data.task_number}: {local_progress_indicator}"
        with output_lock:
            if new_value != output_states[thread_data.worker_thread_id]:
                old_output = "\r" + output_format_string.format(*output_states).strip()
                output_states[thread_data.worker_thread_id] = new_value
                new_output = "\r" + output_format_string.format(*output_states).strip()
                # cleanup old output if the new line is shorter
                cleaner_string = ""
                if len(old_output) > len(new_output):
                    cleaner_string = " " * (len(old_output) - len(new_output))

                sys.__stdout__.write(new_output + cleaner_string)

    def flush(self) -> None:
        sys.__stdout__.flush()


class StdErrHook:
    def write(self, text: str) -> None:
        with output_lock:
            sys.__stderr__.write(text)

    def flush(self) -> None:
        sys.__stderr__.flush()


# builtin print() isn't threadsafe, lets make it threadsafe
def threaded_print(*a: Any, **b: Any) -> None:
    with output_lock:
        org_print(*a, **b)


# this is really a HACK or better only useful in this complicate situation
org_print = builtins.print
org_stdout = sys.stdout
org_sterr = sys.stderr


def enable_threaded_print(enable: bool = True, thread_count: int = cpu_count()) -> None:
    if enable:
        global output_states  # pylint: disable=W0603,C0103
        global output_format_string  # pylint: disable=W0603,C0103
        output_states = [""] * (thread_count)
        output_format_string = ""
        for xthread in range(thread_count):
            output_format_string = output_format_string + "{" + str(xthread) + ":10}"
        sys.stdout = StdOutHook()  # type: ignore
        sys.stderr = StdErrHook()  # type: ignore
        builtins.print = threaded_print
    else:
        sys.stdout = org_stdout
        sys.stderr = org_sterr
        builtins.print = org_print


thread_data = threading.local()


def next_progress_indicator() -> Any:
    return next(thread_data.progress_indicator)


class TaskFunction:

    def __init__(self, function: Any, *arguments) -> None:  # type: ignore
        self.function = function
        self.arguments = arguments

    def __str__(self) -> str:
        return str(self.__dict__)


class Task:

    def __init__(self, description: str, function: Any, *arguments) -> None:  # type: ignore
        self.task_number = 0  # will be set from outside
        self.description = description
        self.list_of_functions = []
        if function:
            first_function = TaskFunction(function, *arguments)
            self.list_of_functions.append(first_function)
        # exit the complete program with code -1, sys.exit would just close the thread
        self.exit_function = os._exit
        self.exit_function_arguments = [-1]

    def add_function(self, function: Any, *arguments) -> None:  # type: ignore
        a_function = TaskFunction(function, *arguments)
        self.list_of_functions.append(a_function)

    def do_task(self) -> None:
        try:
            for task_function in self.list_of_functions:
                task_function.function(*(task_function.arguments))
        except Exception:
            print("FAIL")
            with output_lock:
                # there is no clean exit so we adding linesep here
                sys.__stdout__.write(os.linesep)
                sys.__stdout__.flush()
                sys.__stderr__.write(format(task_function))
                sys.__stderr__.write(os.linesep)
                sys.__stderr__.write(format_exc())
                sys.__stderr__.flush()
                self.exit_function(*(self.exit_function_arguments))
        print("Done")


class ThreadedWork:

    def __init__(self, description: str) -> None:
        self.description = os.linesep + f"##### {description} #####"
        self.queue = Queue()  # type: ignore
        self.legend: List[str] = []
        self.task_number = 0
        self.exit_function: Any = None
        self.exit_function_arguments: Any = None

    def set_exit_fail_function(self, function: Any, *arguments) -> None:  # type: ignore
        self.exit_function = function
        self.exit_function_arguments = arguments

    def add_task(self, description: str, function: Any, *arguments) -> None:  # type: ignore
        self.add_task_object(Task(description, function, *arguments))

    def add_task_object(self, task: Any) -> None:
        task.task_number = self.task_number
        if self.exit_function:
            task.exit_function = self.exit_function
            task.exit_function_arguments = self.exit_function_arguments
        self.legend.append(("{:d}: " + os.linesep + "\t{}" + os.linesep).format(task.task_number, task.description))
        self.queue.put(task)
        self.task_number = self.task_number + 1

    def run(self, max_threads: Optional[int] = None) -> None:
        if max_threads is None:
            max_threads = min(cpu_count(), self.task_number)
        print(self.description)
        print(os.linesep.join(self.legend))

        if max_threads > 1:
            enable_threaded_print(True, max_threads)
        list_of_consumers = []
        for i in range(max_threads):
            # every Consumer needs a stop/none item
            self.queue.put(None)
            new_consumer = Consumer(self.queue, i)
            list_of_consumers.append(new_consumer)
            new_consumer.daemon = True
            new_consumer.start()

        # block until everything is done
        for consumer in list_of_consumers:
            while consumer.is_alive():
                try:
                    # wait 1 second, then go back and ask if thread is still alive
                    sleep(1)
                except KeyboardInterrupt:  # if ctrl-C is pressed within that second,
                    # catch the KeyboardInterrupt exception
                    raise SystemExit(0) from KeyboardInterrupt
        # self.queue.join() <- this ignoring the KeyboardInterrupt
        if max_threads > 1:
            enable_threaded_print(False)
        print(f"\n{self.description} ... done")


class Consumer(threading.Thread):

    def __init__(self, queue: Any, worker_thread_id: Any) -> None:
        self.queue = queue
        self.worker_thread_id = worker_thread_id
        threading.Thread.__init__(self)

    def run(self, stable_run_indicator: bool = True) -> None:  # pylint: disable=W0221
        if stable_run_indicator:
            thread_data.progress_indicator = itertools.cycle(['..'])
        else:
            thread_data.progress_indicator = itertools.cycle(['|', '/', '-', '\\'])
        thread_data.worker_thread_id = self.worker_thread_id
        # run as long we have something in that queue
        while True:
            task = self.queue.get()
            if task is None:
                self.queue.task_done()
                break
            # we like to know which task get the progress -> see std handling
            thread_data.task_number = task.task_number
            task.do_task()
            self.queue.task_done()