diff options
author | Patrik Teivonen <patrik.teivonen@qt.io> | 2022-06-06 08:46:13 +0300 |
---|---|---|
committer | Patrik Teivonen <patrik.teivonen@qt.io> | 2022-09-22 08:44:13 +0000 |
commit | 5cc5a8d307708f7ed3051d2bcd78e1e51cefa9f6 (patch) | |
tree | a006b72b16663888c8ef50c2486453e0dcff89e3 /packaging-tools/threadedwork.py | |
parent | bd4fe3adf592fc2d84a95200873d8f62a4164f7c (diff) |
Add missing type hints, enable strict mypy checking
Enable --strict parameter in mypy.
Add missing type hints and resolve remaining type errors.
Change-Id: Id9547cc3351b88729930c577d4ed628780c1447b
Reviewed-by: Iikka Eklund <iikka.eklund@qt.io>
Diffstat (limited to 'packaging-tools/threadedwork.py')
-rw-r--r-- | packaging-tools/threadedwork.py | 61 |
1 files changed, 31 insertions, 30 deletions
diff --git a/packaging-tools/threadedwork.py b/packaging-tools/threadedwork.py index 87c7ec258..5f89bda2e 100644 --- a/packaging-tools/threadedwork.py +++ b/packaging-tools/threadedwork.py @@ -37,16 +37,17 @@ from multiprocessing import cpu_count from queue import Queue from time import sleep from traceback import format_exc +from typing import Any, List, Optional # we are using RLock, because threaded_print is using the same lock output_lock = threading.RLock() # pylint: disable=invalid-name -output_states = None # pylint: disable=invalid-name +output_states = [] # pylint: disable=invalid-name output_format_string = '' # pylint: disable=invalid-name # prepare our std output hooks class StdOutHook: - def write(self, text): + def write(self, text: str) -> None: # general print method sends line break just ignore that stripped_text = text.strip() if stripped_text == "": @@ -70,21 +71,21 @@ class StdOutHook: sys.__stdout__.write(new_output + cleaner_string) - def flush(self): + def flush(self) -> None: sys.__stdout__.flush() class StdErrHook: - def write(self, text): + def write(self, text: str) -> None: with output_lock: sys.__stderr__.write(text) - def flush(self): + def flush(self) -> None: sys.__stderr__.flush() # builtin print() isn't threadsafe, lets make it threadsafe -def threaded_print(*a, **b): +def threaded_print(*a: Any, **b: Any) -> None: with output_lock: org_print(*a, **b) @@ -95,7 +96,7 @@ org_stdout = sys.stdout org_sterr = sys.stderr -def enable_threaded_print(enable=True, thread_count=cpu_count()): +def enable_threaded_print(enable: bool = True, thread_count: int = cpu_count()) -> None: if enable: global output_states # pylint: disable=W0603,C0103 global output_format_string # pylint: disable=W0603,C0103 @@ -103,8 +104,8 @@ def enable_threaded_print(enable=True, thread_count=cpu_count()): output_format_string = "" for xthread in range(thread_count): output_format_string = output_format_string + "{" + str(xthread) + ":10}" - sys.stdout = StdOutHook() - sys.stderr = StdErrHook() + sys.stdout = StdOutHook() # type: ignore + sys.stderr = StdErrHook() # type: ignore builtins.print = threaded_print else: sys.stdout = org_stdout @@ -115,23 +116,23 @@ def enable_threaded_print(enable=True, thread_count=cpu_count()): thread_data = threading.local() -def next_progress_indicator(): +def next_progress_indicator() -> Any: return next(thread_data.progress_indicator) -class TaskFunction(): +class TaskFunction: - def __init__(self, function, *arguments): + def __init__(self, function: Any, *arguments) -> None: # type: ignore self.function = function self.arguments = arguments - def __str__(self): + def __str__(self) -> str: return str(self.__dict__) -class Task(): +class Task: - def __init__(self, description, function, *arguments): + def __init__(self, description: str, function: Any, *arguments) -> None: # type: ignore self.task_number = 0 # will be set from outside self.description = description self.list_of_functions = [] @@ -142,11 +143,11 @@ class Task(): self.exit_function = os._exit self.exit_function_arguments = [-1] - def add_function(self, function, *arguments): + def add_function(self, function: Any, *arguments) -> None: # type: ignore a_function = TaskFunction(function, *arguments) self.list_of_functions.append(a_function) - def do_task(self): + def do_task(self) -> None: try: for task_function in self.list_of_functions: task_function.function(*(task_function.arguments)) @@ -164,24 +165,24 @@ class Task(): print("Done") -class ThreadedWork(): +class ThreadedWork: - def __init__(self, description): + def __init__(self, description: str) -> None: self.description = os.linesep + f"##### {description} #####" - self.queue = Queue() - self.legend = [] + self.queue = Queue() # type: ignore + self.legend: List[str] = [] self.task_number = 0 - self.exit_function = None - self.exit_function_arguments = None + self.exit_function: Any = None + self.exit_function_arguments: Any = None - def set_exit_fail_function(self, function, *arguments): + def set_exit_fail_function(self, function: Any, *arguments) -> None: # type: ignore self.exit_function = function self.exit_function_arguments = arguments - def add_task(self, description, function, *arguments): + def add_task(self, description: str, function: Any, *arguments) -> None: # type: ignore self.add_task_object(Task(description, function, *arguments)) - def add_task_object(self, task): + def add_task_object(self, task: Any) -> None: task.task_number = self.task_number if self.exit_function: task.exit_function = self.exit_function @@ -190,8 +191,8 @@ class ThreadedWork(): self.queue.put(task) self.task_number = self.task_number + 1 - def run(self, max_threads=None): - if not max_threads: + def run(self, max_threads: Optional[int] = None) -> None: + if max_threads is None: max_threads = min(cpu_count(), self.task_number) print(self.description) print(os.linesep.join(self.legend)) @@ -224,12 +225,12 @@ class ThreadedWork(): class Consumer(threading.Thread): - def __init__(self, queue, worker_thread_id): + def __init__(self, queue: Any, worker_thread_id: Any) -> None: self.queue = queue self.worker_thread_id = worker_thread_id threading.Thread.__init__(self) - def run(self, stable_run_indicator=True): # pylint: disable=W0221 + def run(self, stable_run_indicator: bool = True) -> None: # pylint: disable=W0221 if stable_run_indicator: thread_data.progress_indicator = itertools.cycle(['..']) else: |