diff options
Diffstat (limited to 'sources/pyside6/PySide6/QtAsyncio/events.py')
-rw-r--r-- | sources/pyside6/PySide6/QtAsyncio/events.py | 609 |
1 files changed, 609 insertions, 0 deletions
diff --git a/sources/pyside6/PySide6/QtAsyncio/events.py b/sources/pyside6/PySide6/QtAsyncio/events.py new file mode 100644 index 000000000..a29e480b7 --- /dev/null +++ b/sources/pyside6/PySide6/QtAsyncio/events.py @@ -0,0 +1,609 @@ +# Copyright (C) 2023 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only + +from PySide6.QtCore import (QCoreApplication, QDateTime, QDeadlineTimer, + QEventLoop, QObject, QTimer, QThread, Slot) + +from . import futures +from . import tasks + +import asyncio +import collections.abc +import concurrent.futures +import contextvars +import enum +import os +import signal +import socket +import subprocess +import typing +import warnings + +__all__ = [ + "QAsyncioEventLoopPolicy", "QAsyncioEventLoop", + "QAsyncioHandle", "QAsyncioTimerHandle", +] + + +class QAsyncioExecutorWrapper(QObject): + + def __init__(self, func: typing.Callable, *args: typing.Tuple) -> None: + super().__init__() + self._loop: QEventLoop + self._func = func + self._args = args + self._result = None + self._exception = None + + def _cb(self): + try: + self._result = self._func(*self._args) + except BaseException as e: + self._exception = e + self._loop.exit() + + def do(self): + # This creates a new event loop and dispatcher for the thread, if not already created. + self._loop = QEventLoop() + asyncio.events._set_running_loop(self._loop) + QTimer.singleShot(0, self._loop, lambda: self._cb()) + self._loop.exec() + if self._exception is not None: + raise self._exception + return self._result + + def exit(self): + self._loop.exit() + + +class QAsyncioEventLoopPolicy(asyncio.AbstractEventLoopPolicy): + def __init__(self, + application: typing.Optional[QCoreApplication] = None, + quit_qapp: bool = True, + handle_sigint: bool = False) -> None: + super().__init__() + if application is None: + if QCoreApplication.instance() is None: + application = QCoreApplication() + else: + application = QCoreApplication.instance() + self._application: QCoreApplication = application # type: ignore[assignment] + self._quit_qapp = quit_qapp + self._event_loop: typing.Optional[asyncio.AbstractEventLoop] = None + + if handle_sigint: + signal.signal(signal.SIGINT, signal.SIG_DFL) + + def get_event_loop(self) -> asyncio.AbstractEventLoop: + if self._event_loop is None: + self._event_loop = QAsyncioEventLoop(self._application, quit_qapp=self._quit_qapp) + return self._event_loop + + def set_event_loop(self, loop: typing.Optional[asyncio.AbstractEventLoop]) -> None: + self._event_loop = loop + + def new_event_loop(self) -> asyncio.AbstractEventLoop: + return QAsyncioEventLoop(self._application, quit_qapp=self._quit_qapp) + + def get_child_watcher(self) -> "asyncio.AbstractChildWatcher": + raise DeprecationWarning("Child watchers are deprecated since Python 3.12") + + def set_child_watcher(self, watcher: "asyncio.AbstractChildWatcher") -> None: + raise DeprecationWarning("Child watchers are deprecated since Python 3.12") + + +class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject): + """ + Implements the asyncio API: + https://docs.python.org/3/library/asyncio-eventloop.html + """ + + class ShutDownThread(QThread): + def __init__(self, future: futures.QAsyncioFuture, loop: "QAsyncioEventLoop") -> None: + super().__init__() + self._future = future + self._loop = loop + self.started.connect(self.shutdown) + + def run(self) -> None: + pass + + def shutdown(self) -> None: + try: + self._loop._default_executor.shutdown(wait=True) + if not self._loop.is_closed(): + self._loop.call_soon_threadsafe(self._future.set_result, None) + except Exception as e: + if not self._loop.is_closed(): + self._loop.call_soon_threadsafe(self._future.set_exception, e) + + def __init__(self, + application: QCoreApplication, quit_qapp: bool = True) -> None: + asyncio.BaseEventLoop.__init__(self) + QObject.__init__(self) + + self._application: QCoreApplication = application + self._quit_qapp = quit_qapp + self._thread = QThread.currentThread() + + self._closed = False + + self._quit_from_inside = False + self._quit_from_outside = False + + self._asyncgens: typing.Set[collections.abc.AsyncGenerator] = set() + + # Starting with Python 3.11, this must be an instance of + # ThreadPoolExecutor. + self._default_executor = concurrent.futures.ThreadPoolExecutor() + + self._exception_handler: typing.Optional[typing.Callable] = self.default_exception_handler + self._task_factory: typing.Optional[typing.Callable] = None + self._future_to_complete: typing.Optional[futures.QAsyncioFuture] = None + + self._debug = bool(os.getenv("PYTHONASYNCIODEBUG", False)) + + self._application.aboutToQuit.connect(self._about_to_quit_cb) + + # Running and stopping the loop + + def _run_until_complete_cb(self, future: futures.QAsyncioFuture) -> None: + if not future.cancelled(): + if isinstance(future.exception(), (SystemExit, KeyboardInterrupt)): + return + future.get_loop().stop() + + def run_until_complete(self, + future: futures.QAsyncioFuture) -> typing.Any: # type: ignore[override] + if self.is_closed(): + raise RuntimeError("Event loop is closed") + if self.is_running(): + raise RuntimeError("Event loop is already running") + + arg_was_coro = not asyncio.futures.isfuture(future) + future = asyncio.tasks.ensure_future(future, loop=self) # type: ignore[assignment] + future.add_done_callback(self._run_until_complete_cb) + self._future_to_complete = future + + try: + self.run_forever() + except Exception as e: + if arg_was_coro and future.done() and not future.cancelled(): + future.exception() + raise e + finally: + future.remove_done_callback(self._run_until_complete_cb) + if not future.done(): + raise RuntimeError("Event loop stopped before Future completed") + + return future.result() + + def run_forever(self) -> None: + if self.is_closed(): + raise RuntimeError("Event loop is closed") + if self.is_running(): + raise RuntimeError("Event loop is already running") + asyncio.events._set_running_loop(self) + self._application.exec() + asyncio.events._set_running_loop(None) + + def _about_to_quit_cb(self): + if not self._quit_from_inside: + self._quit_from_outside = True + self.close() + + def stop(self) -> None: + if self._future_to_complete is not None: + if self._future_to_complete.done(): + self._future_to_complete = None + else: + return + self._quit_from_inside = True + if self._quit_qapp: + self._application.quit() + + def is_running(self) -> bool: + return self._thread.loopLevel() > 0 + + def is_closed(self) -> bool: + return self._closed + + def close(self) -> None: + if self.is_running() and not self._quit_from_outside: + raise RuntimeError("Cannot close a running event loop") + if self.is_closed(): + return + if self._default_executor is not None: + self._default_executor.shutdown(wait=False) + self._closed = True + + async def shutdown_asyncgens(self) -> None: + if not len(self._asyncgens): + return + + results = await asyncio.tasks.gather( + *[asyncgen.aclose() for asyncgen in self._asyncgens], + return_exceptions=True) + + for result, asyncgen in zip(results, self._asyncgens): + if isinstance(result, Exception): + self.call_exception_handler({ + "message": f"Closing asynchronous generator {asyncgen}" + f"raised an exception", + "exception": result, + "asyncgen": asyncgen}) + + self._asyncgens.clear() + + async def shutdown_default_executor(self, # type: ignore[override] + timeout: typing.Union[int, float, None] = None) -> None: + shutdown_successful = False + if timeout is not None: + deadline_timer = QDeadlineTimer(int(timeout * 1000)) + else: + deadline_timer = QDeadlineTimer(QDeadlineTimer.Forever) + + if self._default_executor is None: + return + future = self.create_future() + thread = QAsyncioEventLoop.ShutDownThread(future, self) + thread.start() + try: + await future + finally: + shutdown_successful = thread.wait(deadline_timer) + + if timeout is not None and not shutdown_successful: + warnings.warn( + f"Could not shutdown the default executor within {timeout} seconds", + RuntimeWarning, stacklevel=2) + self._default_executor.shutdown(wait=False) + + # Scheduling callbacks + + def _call_soon_impl(self, callback: typing.Callable, *args: typing.Any, + context: typing.Optional[contextvars.Context] = None, + is_threadsafe: typing.Optional[bool] = False) -> asyncio.Handle: + return self._call_later_impl(0, callback, *args, context=context, + is_threadsafe=is_threadsafe) + + def call_soon(self, callback: typing.Callable, *args: typing.Any, + context: typing.Optional[contextvars.Context] = None) -> asyncio.Handle: + return self._call_soon_impl(callback, *args, context=context, is_threadsafe=False) + + def call_soon_threadsafe(self, callback: typing.Callable, *args: typing.Any, + context: + typing.Optional[contextvars.Context] = None) -> asyncio.Handle: + if self.is_closed(): + raise RuntimeError("Event loop is closed") + if context is None: + context = contextvars.copy_context() + return self._call_soon_impl(callback, *args, context=context, is_threadsafe=True) + + def _call_later_impl(self, delay: typing.Union[int, float], + callback: typing.Callable, *args: typing.Any, + context: typing.Optional[contextvars.Context] = None, + is_threadsafe: typing.Optional[bool] = False) -> asyncio.TimerHandle: + if not isinstance(delay, (int, float)): + raise TypeError("delay must be an int or float") + return self._call_at_impl(self.time() + delay, callback, *args, context=context, + is_threadsafe=is_threadsafe) + + def call_later(self, delay: typing.Union[int, float], + callback: typing.Callable, *args: typing.Any, + context: typing.Optional[contextvars.Context] = None) -> asyncio.TimerHandle: + return self._call_later_impl(delay, callback, *args, context=context, is_threadsafe=False) + + def _call_at_impl(self, when: typing.Union[int, float], + callback: typing.Callable, *args: typing.Any, + context: typing.Optional[contextvars.Context] = None, + is_threadsafe: typing.Optional[bool] = False) -> asyncio.TimerHandle: + if not isinstance(when, (int, float)): + raise TypeError("when must be an int or float") + return QAsyncioTimerHandle(when, callback, args, self, context, is_threadsafe=is_threadsafe) + + def call_at(self, when: typing.Union[int, float], + callback: typing.Callable, *args: typing.Any, + context: typing.Optional[contextvars.Context] = None) -> asyncio.TimerHandle: + return self._call_at_impl(when, callback, *args, context=context, is_threadsafe=False) + + def time(self) -> float: + return QDateTime.currentMSecsSinceEpoch() / 1000 + + # Creating Futures and Tasks + + def create_future(self) -> futures.QAsyncioFuture: # type: ignore[override] + return futures.QAsyncioFuture(loop=self) + + def create_task(self, # type: ignore[override] + coro: typing.Union[collections.abc.Generator, collections.abc.Coroutine], + *, name: typing.Optional[str] = None, + context: typing.Optional[contextvars.Context] = None) -> tasks.QAsyncioTask: + if self._task_factory is None: + task = tasks.QAsyncioTask(coro, loop=self, name=name, context=context) + else: + task = self._task_factory(self, coro, context=context) + task.set_name(name) + + return task + + def set_task_factory(self, factory: typing.Optional[typing.Callable]) -> None: + if factory is not None and not callable(factory): + raise TypeError("The task factory must be a callable or None") + self._task_factory = factory + + def get_task_factory(self) -> typing.Optional[typing.Callable]: + return self._task_factory + + # Opening network connections + + async def create_connection( + self, protocol_factory, host=None, port=None, + *, ssl=None, family=0, proto=0, + flags=0, sock=None, local_addr=None, + server_hostname=None, + ssl_handshake_timeout=None, + ssl_shutdown_timeout=None, + happy_eyeballs_delay=None, interleave=None): + raise NotImplementedError + + async def create_datagram_endpoint(self, protocol_factory, + local_addr=None, remote_addr=None, *, + family=0, proto=0, flags=0, + reuse_address=None, reuse_port=None, + allow_broadcast=None, sock=None): + raise NotImplementedError + + async def create_unix_connection( + self, protocol_factory, path=None, *, + ssl=None, sock=None, + server_hostname=None, + ssl_handshake_timeout=None, + ssl_shutdown_timeout=None): + raise NotImplementedError + + # Creating network servers + + async def create_server( + self, protocol_factory, host=None, port=None, + *, family=socket.AF_UNSPEC, + flags=socket.AI_PASSIVE, sock=None, backlog=100, + ssl=None, reuse_address=None, reuse_port=None, + ssl_handshake_timeout=None, + ssl_shutdown_timeout=None, + start_serving=True): + raise NotImplementedError + + async def create_unix_server( + self, protocol_factory, path=None, *, + sock=None, backlog=100, ssl=None, + ssl_handshake_timeout=None, + ssl_shutdown_timeout=None, + start_serving=True): + raise NotImplementedError + + async def connect_accepted_socket( + self, protocol_factory, sock, + *, ssl=None, + ssl_handshake_timeout=None, + ssl_shutdown_timeout=None): + raise NotImplementedError + + # Transferring files + + async def sendfile(self, transport, file, offset=0, count=None, + *, fallback=True): + raise NotImplementedError + + # TLS Upgrade + + async def start_tls(self, transport, protocol, sslcontext, *, + server_side=False, + server_hostname=None, + ssl_handshake_timeout=None, + ssl_shutdown_timeout=None): + raise NotImplementedError + + # Watching file descriptors + + def add_reader(self, fd, callback, *args): + raise NotImplementedError + + def remove_reader(self, fd): + raise NotImplementedError + + def add_writer(self, fd, callback, *args): + raise NotImplementedError + + def remove_writer(self, fd): + raise NotImplementedError + + # Working with socket objects directly + + async def sock_recv(self, sock, nbytes): + raise NotImplementedError + + async def sock_recv_into(self, sock, buf): + raise NotImplementedError + + async def sock_recvfrom(self, sock, bufsize): + raise NotImplementedError + + async def sock_recvfrom_into(self, sock, buf, nbytes=0): + raise NotImplementedError + + async def sock_sendall(self, sock, data): + raise NotImplementedError + + async def sock_sendto(self, sock, data, address): + raise NotImplementedError + + async def sock_connect(self, sock, address): + raise NotImplementedError + + async def sock_accept(self, sock): + raise NotImplementedError + + async def sock_sendfile(self, sock, file, offset=0, count=None, *, + fallback=None): + raise NotImplementedError + + # DNS + + async def getaddrinfo(self, host, port, *, + family=0, type=0, proto=0, flags=0): + raise NotImplementedError + + async def getnameinfo(self, sockaddr, flags=0): + raise NotImplementedError + + # Working with pipes + + async def connect_read_pipe(self, protocol_factory, pipe): + raise NotImplementedError + + async def connect_write_pipe(self, protocol_factory, pipe): + raise NotImplementedError + + # Unix signals + + def add_signal_handler(self, sig, callback, *args): + raise NotImplementedError + + def remove_signal_handler(self, sig): + raise NotImplementedError + + # Executing code in thread or process pools + + def run_in_executor(self, + executor: typing.Optional[concurrent.futures.ThreadPoolExecutor], + func: typing.Callable, *args: typing.Tuple) -> asyncio.futures.Future: + if self.is_closed(): + raise RuntimeError("Event loop is closed") + if executor is None: + executor = self._default_executor + wrapper = QAsyncioExecutorWrapper(func, *args) + return asyncio.futures.wrap_future( + executor.submit(wrapper.do), loop=self + ) + + def set_default_executor(self, + executor: typing.Optional[ + concurrent.futures.ThreadPoolExecutor]) -> None: + if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): + raise TypeError("The executor must be a ThreadPoolExecutor") + self._default_executor = executor + + # Error Handling API + + def set_exception_handler(self, handler: typing.Optional[typing.Callable]) -> None: + if handler is not None and not callable(handler): + raise TypeError("The handler must be a callable or None") + self._exception_handler = handler + + def get_exception_handler(self) -> typing.Optional[typing.Callable]: + return self._exception_handler + + def default_exception_handler(self, context: typing.Dict[str, typing.Any]) -> None: + # TODO + if context["message"]: + print(context["message"]) + + def call_exception_handler(self, context: typing.Dict[str, typing.Any]) -> None: + if self._exception_handler is not None: + self._exception_handler(context) + + # Enabling debug mode + + def get_debug(self) -> bool: + # TODO: Part of the asyncio API but currently unused. More details: + # https://docs.python.org/3/library/asyncio-dev.html#asyncio-debug-mode + return self._debug + + def set_debug(self, enabled: bool) -> None: + self._debug = enabled + + # Running subprocesses + + async def subprocess_exec(self, protocol_factory, *args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + **kwargs): + raise NotImplementedError + + async def subprocess_shell(self, protocol_factory, cmd, *, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + **kwargs): + raise NotImplementedError + + +class QAsyncioHandle(): + class HandleState(enum.Enum): + PENDING = enum.auto() + CANCELLED = enum.auto() + DONE = enum.auto() + + def __init__(self, callback: typing.Callable, args: typing.Tuple, + loop: QAsyncioEventLoop, context: typing.Optional[contextvars.Context], + is_threadsafe: typing.Optional[bool] = False) -> None: + self._callback = callback + self._args = args + self._loop = loop + self._context = context + self._is_threadsafe = is_threadsafe + + self._timeout = 0 + + self._state = QAsyncioHandle.HandleState.PENDING + self._start() + + def _schedule_event(self, timeout: int, func: typing.Callable) -> None: + if not self._loop.is_closed() and not self._loop._quit_from_outside: + if self._is_threadsafe: + QTimer.singleShot(timeout, self._loop, func) + else: + QTimer.singleShot(timeout, func) + + def _start(self) -> None: + self._schedule_event(self._timeout, lambda: self._cb()) + + @Slot() + def _cb(self) -> None: + if self._state == QAsyncioHandle.HandleState.PENDING: + if self._context is not None: + self._context.run(self._callback, *self._args) + else: + self._callback(*self._args) + self._state = QAsyncioHandle.HandleState.DONE + + def cancel(self) -> None: + if self._state == QAsyncioHandle.HandleState.PENDING: + # The old timer that was created in _start will still trigger but _cb won't do anything. + self._state = QAsyncioHandle.HandleState.CANCELLED + + def cancelled(self) -> bool: + return self._state == QAsyncioHandle.HandleState.CANCELLED + + +class QAsyncioTimerHandle(QAsyncioHandle, asyncio.TimerHandle): + def __init__(self, when: float, callback: typing.Callable, args: typing.Tuple, + loop: QAsyncioEventLoop, context: typing.Optional[contextvars.Context], + is_threadsafe: typing.Optional[bool] = False) -> None: + QAsyncioHandle.__init__(self, callback, args, loop, context, is_threadsafe) + + self._when = when + time = self._loop.time() + self._timeout = round(max(self._when - time, 0) * 1000) + + QAsyncioHandle._start(self) + + # Override this so that timer.start() is only called once at the end + # of the constructor for both QtHandle and QtTimerHandle. + def _start(self) -> None: + pass + + def when(self) -> float: + return self._when |