1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
# Copyright (C) 2023 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
from . import events
import asyncio
import contextvars
import enum
import typing
class QAsyncioFuture():
""" https://docs.python.org/3/library/asyncio-future.html """
# Declare that this class implements the Future protocol. The field must
# exist and be boolean - True indicates 'await' or 'yield from', False
# indicates 'yield'.
_asyncio_future_blocking = False
class FutureState(enum.Enum):
PENDING = enum.auto()
CANCELLED = enum.auto()
DONE_WITH_RESULT = enum.auto()
DONE_WITH_EXCEPTION = enum.auto()
def __init__(self, *, loop: typing.Optional["events.QAsyncioEventLoop"] = None,
context: typing.Optional[contextvars.Context] = None) -> None:
self._loop: "events.QAsyncioEventLoop"
if loop is None:
self._loop = asyncio.events.get_event_loop() # type: ignore[assignment]
else:
self._loop = loop
self._context = context
self._state = QAsyncioFuture.FutureState.PENDING
self._result: typing.Any = None
self._exception: typing.Optional[BaseException] = None
self._cancel_message: typing.Optional[str] = None
# List of callbacks that are called when the future is done.
self._callbacks: typing.List[typing.Callable] = list()
def __await__(self):
if not self.done():
self._asyncio_future_blocking = True
yield self
if not self.done():
raise RuntimeError("await was not used with a Future or Future-like object")
return self.result()
__iter__ = __await__
def _schedule_callbacks(self, context: typing.Optional[contextvars.Context] = None):
""" A future can optionally have callbacks that are called when the future is done. """
for cb in self._callbacks:
self._loop.call_soon(
cb, self, context=context if context else self._context)
def result(self) -> typing.Union[typing.Any, Exception]:
if self._state == QAsyncioFuture.FutureState.DONE_WITH_RESULT:
return self._result
if self._state == QAsyncioFuture.FutureState.DONE_WITH_EXCEPTION and self._exception:
raise self._exception
if self._state == QAsyncioFuture.FutureState.CANCELLED:
if self._cancel_message:
raise asyncio.CancelledError(self._cancel_message)
else:
raise asyncio.CancelledError
raise asyncio.InvalidStateError
def set_result(self, result: typing.Any) -> None:
self._result = result
self._state = QAsyncioFuture.FutureState.DONE_WITH_RESULT
self._schedule_callbacks()
def set_exception(self, exception: Exception) -> None:
self._exception = exception
self._state = QAsyncioFuture.FutureState.DONE_WITH_EXCEPTION
self._schedule_callbacks()
def done(self) -> bool:
return self._state != QAsyncioFuture.FutureState.PENDING
def cancelled(self) -> bool:
return self._state == QAsyncioFuture.FutureState.CANCELLED
def add_done_callback(self, cb: typing.Callable, *,
context: typing.Optional[contextvars.Context] = None) -> None:
if self.done():
self._loop.call_soon(
cb, self, context=context if context else self._context)
else:
self._callbacks.append(cb)
def remove_done_callback(self, cb: typing.Callable) -> int:
original_len = len(self._callbacks)
self._callbacks = [_cb for _cb in self._callbacks if _cb != cb]
return original_len - len(self._callbacks)
def cancel(self, msg: typing.Optional[str] = None) -> bool:
if self.done():
return False
self._state = QAsyncioFuture.FutureState.CANCELLED
self._cancel_message = msg
self._schedule_callbacks()
return True
def exception(self) -> typing.Optional[BaseException]:
if self._state == QAsyncioFuture.FutureState.CANCELLED:
raise asyncio.CancelledError
if self.done():
return self._exception
raise asyncio.InvalidStateError
def get_loop(self) -> asyncio.AbstractEventLoop:
return self._loop
|