aboutsummaryrefslogtreecommitdiffstats
path: root/sources/pyside6/PySide6/QtAsyncio/futures.py
blob: 611bd5634a7b7401884e962a11da123a408364cb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# Copyright (C) 2023 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only

from . import events

import asyncio
import contextvars
import enum
import typing


class QAsyncioFuture():
    """ https://docs.python.org/3/library/asyncio-future.html """

    # Declare that this class implements the Future protocol. The field must
    # exist and be boolean - True indicates 'await' or 'yield from', False
    # indicates 'yield'.
    _asyncio_future_blocking = False

    class FutureState(enum.Enum):
        PENDING = enum.auto()
        CANCELLED = enum.auto()
        DONE_WITH_RESULT = enum.auto()
        DONE_WITH_EXCEPTION = enum.auto()

    def __init__(self, *, loop: typing.Optional["events.QAsyncioEventLoop"] = None,
                 context: typing.Optional[contextvars.Context] = None) -> None:
        self._loop: "events.QAsyncioEventLoop"
        if loop is None:
            self._loop = asyncio.events.get_event_loop()  # type: ignore[assignment]
        else:
            self._loop = loop
        self._context = context

        self._state = QAsyncioFuture.FutureState.PENDING
        self._result: typing.Any = None
        self._exception: typing.Optional[BaseException] = None

        self._callbacks: typing.List[typing.Callable] = list()

        self._cancel_message: typing.Optional[str] = None

    def __await__(self):
        if not self.done():
            self._asyncio_future_blocking = True
            yield self
            if not self.done():
                raise RuntimeError("await was not used with a Future or Future-like object")
        return self.result()

    __iter__ = __await__

    def _schedule_callbacks(self, context: typing.Optional[contextvars.Context] = None):
        for cb in self._callbacks:
            self._loop.call_soon(
                cb, self, context=context if context else self._context)

    def result(self) -> typing.Union[typing.Any, Exception]:
        if self._state == QAsyncioFuture.FutureState.DONE_WITH_RESULT:
            return self._result
        if self._state == QAsyncioFuture.FutureState.DONE_WITH_EXCEPTION and self._exception:
            raise self._exception
        if self._state == QAsyncioFuture.FutureState.CANCELLED:
            if self._cancel_message:
                raise asyncio.CancelledError(self._cancel_message)
            else:
                raise asyncio.CancelledError
        raise asyncio.InvalidStateError

    def set_result(self, result: typing.Any) -> None:
        self._result = result
        self._state = QAsyncioFuture.FutureState.DONE_WITH_RESULT
        self._schedule_callbacks()

    def set_exception(self, exception: Exception) -> None:
        self._exception = exception
        self._state = QAsyncioFuture.FutureState.DONE_WITH_EXCEPTION
        self._schedule_callbacks()

    def done(self) -> bool:
        return self._state != QAsyncioFuture.FutureState.PENDING

    def cancelled(self) -> bool:
        return self._state == QAsyncioFuture.FutureState.CANCELLED

    def add_done_callback(self, cb: typing.Callable, *,
                          context: typing.Optional[contextvars.Context] = None) -> None:
        if self.done():
            self._loop.call_soon(
                cb, self, context=context if context else self._context)
        else:
            self._callbacks.append(cb)

    def remove_done_callback(self, cb: typing.Callable) -> int:
        original_len = len(self._callbacks)
        self._callbacks = [_cb for _cb in self._callbacks if _cb != cb]
        return original_len - len(self._callbacks)

    def cancel(self, msg: typing.Optional[str] = None) -> bool:
        if self.done():
            return False
        self._state = QAsyncioFuture.FutureState.CANCELLED
        self._cancel_message = msg
        self._schedule_callbacks()
        return True

    def exception(self) -> typing.Optional[BaseException]:
        if self._state == QAsyncioFuture.FutureState.CANCELLED:
            raise asyncio.CancelledError
        if self.done():
            return self._exception
        raise asyncio.InvalidStateError

    def get_loop(self) -> asyncio.AbstractEventLoop:
        return self._loop