diff options
Diffstat (limited to 'sources/pyside6/PySide6/QtAsyncio/futures.py')
-rw-r--r-- | sources/pyside6/PySide6/QtAsyncio/futures.py | 117 |
1 files changed, 117 insertions, 0 deletions
diff --git a/sources/pyside6/PySide6/QtAsyncio/futures.py b/sources/pyside6/PySide6/QtAsyncio/futures.py new file mode 100644 index 000000000..cbb005fc9 --- /dev/null +++ b/sources/pyside6/PySide6/QtAsyncio/futures.py @@ -0,0 +1,117 @@ +# Copyright (C) 2023 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only + +from . import events + +import asyncio +import contextvars +import enum +import typing + + +class QAsyncioFuture(): + """ https://docs.python.org/3/library/asyncio-future.html """ + + # Declare that this class implements the Future protocol. The field must + # exist and be boolean - True indicates 'await' or 'yield from', False + # indicates 'yield'. + _asyncio_future_blocking = False + + class FutureState(enum.Enum): + PENDING = enum.auto() + CANCELLED = enum.auto() + DONE_WITH_RESULT = enum.auto() + DONE_WITH_EXCEPTION = enum.auto() + + def __init__(self, *, loop: typing.Optional["events.QAsyncioEventLoop"] = None, + context: typing.Optional[contextvars.Context] = None) -> None: + self._loop: "events.QAsyncioEventLoop" + if loop is None: + self._loop = asyncio.events.get_event_loop() # type: ignore[assignment] + else: + self._loop = loop + self._context = context + + self._state = QAsyncioFuture.FutureState.PENDING + self._result: typing.Any = None + self._exception: typing.Optional[BaseException] = None + + self._cancel_message: typing.Optional[str] = None + + # List of callbacks that are called when the future is done. + self._callbacks: typing.List[typing.Callable] = list() + + def __await__(self): + if not self.done(): + self._asyncio_future_blocking = True + yield self + if not self.done(): + raise RuntimeError("await was not used with a Future or Future-like object") + return self.result() + + __iter__ = __await__ + + def _schedule_callbacks(self, context: typing.Optional[contextvars.Context] = None): + """ A future can optionally have callbacks that are called when the future is done. """ + for cb in self._callbacks: + self._loop.call_soon( + cb, self, context=context if context else self._context) + + def result(self) -> typing.Union[typing.Any, Exception]: + if self._state == QAsyncioFuture.FutureState.DONE_WITH_RESULT: + return self._result + if self._state == QAsyncioFuture.FutureState.DONE_WITH_EXCEPTION and self._exception: + raise self._exception + if self._state == QAsyncioFuture.FutureState.CANCELLED: + if self._cancel_message: + raise asyncio.CancelledError(self._cancel_message) + else: + raise asyncio.CancelledError + raise asyncio.InvalidStateError + + def set_result(self, result: typing.Any) -> None: + self._result = result + self._state = QAsyncioFuture.FutureState.DONE_WITH_RESULT + self._schedule_callbacks() + + def set_exception(self, exception: Exception) -> None: + self._exception = exception + self._state = QAsyncioFuture.FutureState.DONE_WITH_EXCEPTION + self._schedule_callbacks() + + def done(self) -> bool: + return self._state != QAsyncioFuture.FutureState.PENDING + + def cancelled(self) -> bool: + return self._state == QAsyncioFuture.FutureState.CANCELLED + + def add_done_callback(self, cb: typing.Callable, *, + context: typing.Optional[contextvars.Context] = None) -> None: + if self.done(): + self._loop.call_soon( + cb, self, context=context if context else self._context) + else: + self._callbacks.append(cb) + + def remove_done_callback(self, cb: typing.Callable) -> int: + original_len = len(self._callbacks) + self._callbacks = [_cb for _cb in self._callbacks if _cb != cb] + return original_len - len(self._callbacks) + + def cancel(self, msg: typing.Optional[str] = None) -> bool: + if self.done(): + return False + self._state = QAsyncioFuture.FutureState.CANCELLED + self._cancel_message = msg + self._schedule_callbacks() + return True + + def exception(self) -> typing.Optional[BaseException]: + if self._state == QAsyncioFuture.FutureState.CANCELLED: + raise asyncio.CancelledError + if self.done(): + return self._exception + raise asyncio.InvalidStateError + + def get_loop(self) -> asyncio.AbstractEventLoop: + return self._loop |