diff options
Diffstat (limited to 'sources/pyside6/PySide6/QtAsyncio/tasks.py')
-rw-r--r-- | sources/pyside6/PySide6/QtAsyncio/tasks.py | 190 |
1 files changed, 190 insertions, 0 deletions
diff --git a/sources/pyside6/PySide6/QtAsyncio/tasks.py b/sources/pyside6/PySide6/QtAsyncio/tasks.py new file mode 100644 index 000000000..7edc15093 --- /dev/null +++ b/sources/pyside6/PySide6/QtAsyncio/tasks.py @@ -0,0 +1,190 @@ +# Copyright (C) 2023 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only + +from . import events +from . import futures + +import asyncio +import collections.abc +import concurrent.futures +import contextvars +import typing + + +class QAsyncioTask(futures.QAsyncioFuture): + """ https://docs.python.org/3/library/asyncio-task.html """ + + def __init__(self, coro: typing.Union[collections.abc.Generator, collections.abc.Coroutine], *, + loop: typing.Optional["events.QAsyncioEventLoop"] = None, + name: typing.Optional[str] = None, + context: typing.Optional[contextvars.Context] = None) -> None: + super().__init__(loop=loop, context=context) + + self._coro = coro # The coroutine for which this task was created. + self._name = name if name else "QtTask" + + # The task creates a handle for its coroutine. The handle enqueues the + # task's step function as its callback in the event loop. + self._handle = self._loop.call_soon(self._step, context=self._context) + + # The task step function executes the coroutine until it finishes, + # raises an exception or returns a future. If a future was returned, + # the task will await its completion (or exception). + self._future_to_await: typing.Optional[asyncio.Future] = None + + self._cancelled = False + self._cancel_message: typing.Optional[str] = None + + # https://docs.python.org/3/library/asyncio-extending.html#task-lifetime-support + asyncio._register_task(self) # type: ignore[arg-type] + + def __repr__(self) -> str: + if self._state == futures.QAsyncioFuture.FutureState.PENDING: + state = "Pending" + elif self._state == futures.QAsyncioFuture.FutureState.DONE_WITH_RESULT: + state = "Done" + elif self._state == futures.QAsyncioFuture.FutureState.DONE_WITH_EXCEPTION: + state = f"Done with exception ({repr(self._exception)})" + elif self._state == futures.QAsyncioFuture.FutureState.CANCELLED: + state = "Cancelled" + + return f"Task '{self.get_name()}' with state: {state}" + + class QtTaskApiMisuseError(Exception): + pass + + def set_result(self, result: typing.Any) -> None: # type: ignore[override] + # This function is not inherited from the Future APIs. + raise QAsyncioTask.QtTaskApiMisuseError("Tasks cannot set results") + + def set_exception(self, exception: typing.Any) -> None: # type: ignore[override] + # This function is not inherited from the Future APIs. + raise QAsyncioTask.QtTaskApiMisuseError("Tasks cannot set exceptions") + + def _step(self, + exception_or_future: typing.Union[ + BaseException, futures.QAsyncioFuture, None] = None) -> None: + """ + The step function is the heart of a task. It is scheduled in the event + loop repeatedly, executing the coroutine "step" by "step" (i.e., + iterating through the asynchronous generator) until it finishes with an + exception or successfully. Each step can optionally receive an + exception or a future as a result from a previous step to handle. + """ + + if self.done(): + return + result = None + self._future_to_await = None + + if asyncio.futures.isfuture(exception_or_future): + try: + exception_or_future.result() + except BaseException as e: + exception_or_future = e + + try: + asyncio._enter_task(self._loop, self) # type: ignore[arg-type] + + # It is at this point that the coroutine is resumed for the current + # step (i.e. asynchronous generator iteration). It will now be + # executed until it yields (and potentially returns a future), + # raises an exception, is cancelled, or finishes successfully. + + if isinstance(exception_or_future, BaseException): + # If the coroutine doesn't handle this exception, it propagates + # to the caller. + result = self._coro.throw(exception_or_future) + else: + result = self._coro.send(None) + except StopIteration as e: + self._state = futures.QAsyncioFuture.FutureState.DONE_WITH_RESULT + self._result = e.value + except (concurrent.futures.CancelledError, asyncio.exceptions.CancelledError) as e: + self._state = futures.QAsyncioFuture.FutureState.CANCELLED + self._exception = e + except BaseException as e: + self._state = futures.QAsyncioFuture.FutureState.DONE_WITH_EXCEPTION + self._exception = e + else: + if asyncio.futures.isfuture(result): + # If the coroutine yields a future, the task will await its + # completion, and at that point the step function will be + # called again. + result.add_done_callback( + self._step, context=self._context) # type: ignore[arg-type] + self._future_to_await = result + if self._cancelled: + # If the task was cancelled, then a new future should be + # cancelled as well. Otherwise, in some scenarios like + # a loop inside the task and with bad timing, if the new + # future is not cancelled, the task would continue running + # in this loop despite having been cancelled. This bad + # timing can occur especially if the first future finishes + # very quickly. + self._future_to_await.cancel(self._cancel_message) + elif result is None: + # If no future was yielded, we schedule the step function again + # without any arguments. + self._loop.call_soon(self._step, context=self._context) + else: + # This is not supposed to happen. + exception = RuntimeError(f"Bad task result: {result}") + self._loop.call_soon(self._step, exception, context=self._context) + finally: + asyncio._leave_task(self._loop, self) # type: ignore[arg-type] + + if self._exception: + self._loop.call_exception_handler({ + "message": (str(self._exception) if self._exception + else "An exception occurred during task " + "execution"), + "exception": self._exception, + "task": self, + "future": (exception_or_future + if asyncio.futures.isfuture(exception_or_future) + else None) + }) + + if self.done(): + self._schedule_callbacks() + + # https://docs.python.org/3/library/asyncio-extending.html#task-lifetime-support + asyncio._unregister_task(self) # type: ignore[arg-type] + + def get_stack(self, *, limit=None) -> typing.List[typing.Any]: + # TODO + raise NotImplementedError("QtTask.get_stack is not implemented") + + def print_stack(self, *, limit=None, file=None) -> None: + # TODO + raise NotImplementedError("QtTask.print_stack is not implemented") + + def get_coro(self) -> typing.Union[collections.abc.Generator, collections.abc.Coroutine]: + return self._coro + + def get_name(self) -> str: + return self._name + + def set_name(self, value) -> None: + self._name = str(value) + + def cancel(self, msg: typing.Optional[str] = None) -> bool: + if self.done(): + return False + self._cancel_message = msg + self._handle.cancel() + if self._future_to_await is not None: + # A task that is awaiting a future must also cancel this future in + # order for the cancellation to be successful. + self._future_to_await.cancel(msg) + self._cancelled = True + return True + + def uncancel(self) -> None: + # TODO + raise NotImplementedError("QtTask.uncancel is not implemented") + + def cancelling(self) -> bool: + # TODO + raise NotImplementedError("QtTask.cancelling is not implemented") |