aboutsummaryrefslogtreecommitdiffstats
path: root/sources/pyside6/PySide6/QtAsyncio/futures.py
diff options
context:
space:
mode:
Diffstat (limited to 'sources/pyside6/PySide6/QtAsyncio/futures.py')
-rw-r--r--sources/pyside6/PySide6/QtAsyncio/futures.py117
1 files changed, 117 insertions, 0 deletions
diff --git a/sources/pyside6/PySide6/QtAsyncio/futures.py b/sources/pyside6/PySide6/QtAsyncio/futures.py
new file mode 100644
index 000000000..cbb005fc9
--- /dev/null
+++ b/sources/pyside6/PySide6/QtAsyncio/futures.py
@@ -0,0 +1,117 @@
+# Copyright (C) 2023 The Qt Company Ltd.
+# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
+
+from . import events
+
+import asyncio
+import contextvars
+import enum
+import typing
+
+
+class QAsyncioFuture():
+ """ https://docs.python.org/3/library/asyncio-future.html """
+
+ # Declare that this class implements the Future protocol. The field must
+ # exist and be boolean - True indicates 'await' or 'yield from', False
+ # indicates 'yield'.
+ _asyncio_future_blocking = False
+
+ class FutureState(enum.Enum):
+ PENDING = enum.auto()
+ CANCELLED = enum.auto()
+ DONE_WITH_RESULT = enum.auto()
+ DONE_WITH_EXCEPTION = enum.auto()
+
+ def __init__(self, *, loop: typing.Optional["events.QAsyncioEventLoop"] = None,
+ context: typing.Optional[contextvars.Context] = None) -> None:
+ self._loop: "events.QAsyncioEventLoop"
+ if loop is None:
+ self._loop = asyncio.events.get_event_loop() # type: ignore[assignment]
+ else:
+ self._loop = loop
+ self._context = context
+
+ self._state = QAsyncioFuture.FutureState.PENDING
+ self._result: typing.Any = None
+ self._exception: typing.Optional[BaseException] = None
+
+ self._cancel_message: typing.Optional[str] = None
+
+ # List of callbacks that are called when the future is done.
+ self._callbacks: typing.List[typing.Callable] = list()
+
+ def __await__(self):
+ if not self.done():
+ self._asyncio_future_blocking = True
+ yield self
+ if not self.done():
+ raise RuntimeError("await was not used with a Future or Future-like object")
+ return self.result()
+
+ __iter__ = __await__
+
+ def _schedule_callbacks(self, context: typing.Optional[contextvars.Context] = None):
+ """ A future can optionally have callbacks that are called when the future is done. """
+ for cb in self._callbacks:
+ self._loop.call_soon(
+ cb, self, context=context if context else self._context)
+
+ def result(self) -> typing.Union[typing.Any, Exception]:
+ if self._state == QAsyncioFuture.FutureState.DONE_WITH_RESULT:
+ return self._result
+ if self._state == QAsyncioFuture.FutureState.DONE_WITH_EXCEPTION and self._exception:
+ raise self._exception
+ if self._state == QAsyncioFuture.FutureState.CANCELLED:
+ if self._cancel_message:
+ raise asyncio.CancelledError(self._cancel_message)
+ else:
+ raise asyncio.CancelledError
+ raise asyncio.InvalidStateError
+
+ def set_result(self, result: typing.Any) -> None:
+ self._result = result
+ self._state = QAsyncioFuture.FutureState.DONE_WITH_RESULT
+ self._schedule_callbacks()
+
+ def set_exception(self, exception: Exception) -> None:
+ self._exception = exception
+ self._state = QAsyncioFuture.FutureState.DONE_WITH_EXCEPTION
+ self._schedule_callbacks()
+
+ def done(self) -> bool:
+ return self._state != QAsyncioFuture.FutureState.PENDING
+
+ def cancelled(self) -> bool:
+ return self._state == QAsyncioFuture.FutureState.CANCELLED
+
+ def add_done_callback(self, cb: typing.Callable, *,
+ context: typing.Optional[contextvars.Context] = None) -> None:
+ if self.done():
+ self._loop.call_soon(
+ cb, self, context=context if context else self._context)
+ else:
+ self._callbacks.append(cb)
+
+ def remove_done_callback(self, cb: typing.Callable) -> int:
+ original_len = len(self._callbacks)
+ self._callbacks = [_cb for _cb in self._callbacks if _cb != cb]
+ return original_len - len(self._callbacks)
+
+ def cancel(self, msg: typing.Optional[str] = None) -> bool:
+ if self.done():
+ return False
+ self._state = QAsyncioFuture.FutureState.CANCELLED
+ self._cancel_message = msg
+ self._schedule_callbacks()
+ return True
+
+ def exception(self) -> typing.Optional[BaseException]:
+ if self._state == QAsyncioFuture.FutureState.CANCELLED:
+ raise asyncio.CancelledError
+ if self.done():
+ return self._exception
+ raise asyncio.InvalidStateError
+
+ def get_loop(self) -> asyncio.AbstractEventLoop:
+ return self._loop