aboutsummaryrefslogtreecommitdiffstats
path: root/packaging-tools/threadedwork.py
diff options
context:
space:
mode:
authorPatrik Teivonen <patrik.teivonen@qt.io>2022-06-06 08:46:13 +0300
committerPatrik Teivonen <patrik.teivonen@qt.io>2022-09-22 08:44:13 +0000
commit5cc5a8d307708f7ed3051d2bcd78e1e51cefa9f6 (patch)
treea006b72b16663888c8ef50c2486453e0dcff89e3 /packaging-tools/threadedwork.py
parentbd4fe3adf592fc2d84a95200873d8f62a4164f7c (diff)
Add missing type hints, enable strict mypy checking
Enable --strict parameter in mypy. Add missing type hints and resolve remaining type errors. Change-Id: Id9547cc3351b88729930c577d4ed628780c1447b Reviewed-by: Iikka Eklund <iikka.eklund@qt.io>
Diffstat (limited to 'packaging-tools/threadedwork.py')
-rw-r--r--packaging-tools/threadedwork.py61
1 files changed, 31 insertions, 30 deletions
diff --git a/packaging-tools/threadedwork.py b/packaging-tools/threadedwork.py
index 87c7ec258..5f89bda2e 100644
--- a/packaging-tools/threadedwork.py
+++ b/packaging-tools/threadedwork.py
@@ -37,16 +37,17 @@ from multiprocessing import cpu_count
from queue import Queue
from time import sleep
from traceback import format_exc
+from typing import Any, List, Optional
# we are using RLock, because threaded_print is using the same lock
output_lock = threading.RLock() # pylint: disable=invalid-name
-output_states = None # pylint: disable=invalid-name
+output_states = [] # pylint: disable=invalid-name
output_format_string = '' # pylint: disable=invalid-name
# prepare our std output hooks
class StdOutHook:
- def write(self, text):
+ def write(self, text: str) -> None:
# general print method sends line break just ignore that
stripped_text = text.strip()
if stripped_text == "":
@@ -70,21 +71,21 @@ class StdOutHook:
sys.__stdout__.write(new_output + cleaner_string)
- def flush(self):
+ def flush(self) -> None:
sys.__stdout__.flush()
class StdErrHook:
- def write(self, text):
+ def write(self, text: str) -> None:
with output_lock:
sys.__stderr__.write(text)
- def flush(self):
+ def flush(self) -> None:
sys.__stderr__.flush()
# builtin print() isn't threadsafe, lets make it threadsafe
-def threaded_print(*a, **b):
+def threaded_print(*a: Any, **b: Any) -> None:
with output_lock:
org_print(*a, **b)
@@ -95,7 +96,7 @@ org_stdout = sys.stdout
org_sterr = sys.stderr
-def enable_threaded_print(enable=True, thread_count=cpu_count()):
+def enable_threaded_print(enable: bool = True, thread_count: int = cpu_count()) -> None:
if enable:
global output_states # pylint: disable=W0603,C0103
global output_format_string # pylint: disable=W0603,C0103
@@ -103,8 +104,8 @@ def enable_threaded_print(enable=True, thread_count=cpu_count()):
output_format_string = ""
for xthread in range(thread_count):
output_format_string = output_format_string + "{" + str(xthread) + ":10}"
- sys.stdout = StdOutHook()
- sys.stderr = StdErrHook()
+ sys.stdout = StdOutHook() # type: ignore
+ sys.stderr = StdErrHook() # type: ignore
builtins.print = threaded_print
else:
sys.stdout = org_stdout
@@ -115,23 +116,23 @@ def enable_threaded_print(enable=True, thread_count=cpu_count()):
thread_data = threading.local()
-def next_progress_indicator():
+def next_progress_indicator() -> Any:
return next(thread_data.progress_indicator)
-class TaskFunction():
+class TaskFunction:
- def __init__(self, function, *arguments):
+ def __init__(self, function: Any, *arguments) -> None: # type: ignore
self.function = function
self.arguments = arguments
- def __str__(self):
+ def __str__(self) -> str:
return str(self.__dict__)
-class Task():
+class Task:
- def __init__(self, description, function, *arguments):
+ def __init__(self, description: str, function: Any, *arguments) -> None: # type: ignore
self.task_number = 0 # will be set from outside
self.description = description
self.list_of_functions = []
@@ -142,11 +143,11 @@ class Task():
self.exit_function = os._exit
self.exit_function_arguments = [-1]
- def add_function(self, function, *arguments):
+ def add_function(self, function: Any, *arguments) -> None: # type: ignore
a_function = TaskFunction(function, *arguments)
self.list_of_functions.append(a_function)
- def do_task(self):
+ def do_task(self) -> None:
try:
for task_function in self.list_of_functions:
task_function.function(*(task_function.arguments))
@@ -164,24 +165,24 @@ class Task():
print("Done")
-class ThreadedWork():
+class ThreadedWork:
- def __init__(self, description):
+ def __init__(self, description: str) -> None:
self.description = os.linesep + f"##### {description} #####"
- self.queue = Queue()
- self.legend = []
+ self.queue = Queue() # type: ignore
+ self.legend: List[str] = []
self.task_number = 0
- self.exit_function = None
- self.exit_function_arguments = None
+ self.exit_function: Any = None
+ self.exit_function_arguments: Any = None
- def set_exit_fail_function(self, function, *arguments):
+ def set_exit_fail_function(self, function: Any, *arguments) -> None: # type: ignore
self.exit_function = function
self.exit_function_arguments = arguments
- def add_task(self, description, function, *arguments):
+ def add_task(self, description: str, function: Any, *arguments) -> None: # type: ignore
self.add_task_object(Task(description, function, *arguments))
- def add_task_object(self, task):
+ def add_task_object(self, task: Any) -> None:
task.task_number = self.task_number
if self.exit_function:
task.exit_function = self.exit_function
@@ -190,8 +191,8 @@ class ThreadedWork():
self.queue.put(task)
self.task_number = self.task_number + 1
- def run(self, max_threads=None):
- if not max_threads:
+ def run(self, max_threads: Optional[int] = None) -> None:
+ if max_threads is None:
max_threads = min(cpu_count(), self.task_number)
print(self.description)
print(os.linesep.join(self.legend))
@@ -224,12 +225,12 @@ class ThreadedWork():
class Consumer(threading.Thread):
- def __init__(self, queue, worker_thread_id):
+ def __init__(self, queue: Any, worker_thread_id: Any) -> None:
self.queue = queue
self.worker_thread_id = worker_thread_id
threading.Thread.__init__(self)
- def run(self, stable_run_indicator=True): # pylint: disable=W0221
+ def run(self, stable_run_indicator: bool = True) -> None: # pylint: disable=W0221
if stable_run_indicator:
thread_data.progress_indicator = itertools.cycle(['..'])
else: