1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
# Copyright (C) 2023 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
import asyncio
import datetime
from typing import Any, Dict, Optional
import aiohttp
import socketio # type: ignore
import coordinator
class Coordinator:
"""Dummy implementation of the coordinator for testing."""
def __init__(
self,
info: coordinator.Info,
# Default values from documentation.
ping_interval: int = 25,
ping_timeout: int = 5,
) -> None:
self.info = info
self.work_item = coordinator.WorkItem(
integration_id=1234,
integration_url=None,
integration_timestamp=datetime.datetime.fromisoformat("2000-01-01"),
integration_data=[],
branch="dev",
revision="816ca43b88893e06ea866c3edadd0ca26f64b533",
)
self.server = socketio.AsyncServer(ping_interval=ping_interval, ping_timeout=ping_timeout)
self.app = aiohttp.web.Application()
self.server.attach(self.app)
self.runner = aiohttp.web.AppRunner(self.app)
# Event setup.
self.server.on("connect")(self.handle_connect)
self.socket_id: Optional[str] = None
self.auth: Optional[Dict[str, str]] = None
self.connected = asyncio.Event()
self.server.on("statusUpdate")(self.handle_status_update)
self.status: Optional[Dict[str, Any]] = None
self.status_updated = asyncio.Event()
self.server.on("fetchWork")(self.handle_send_work)
self.server.on("queryWork")(self.handle_send_work)
self.server.on("disconnect")(self.handle_disconnect)
self.disconnected = asyncio.Event()
async def handle_connect(
self, socket_id: str, environ: Dict[str, Any], auth: Dict[str, Any]
) -> None:
self.socket_id = socket_id
self.auth = auth
self.connected.set()
async def handle_status_update(self, socket_id: str, status: Dict[str, Any]) -> None:
self.status = status
self.status_updated.set()
async def handle_send_work(self, socket_id: str) -> None:
await self.server.emit("sendWork", self.work_item.to_dictionary())
async def handle_disconnect(self, socket_id: str) -> None:
self.disconnected.set()
async def wait_for_connect(self) -> Dict[str, str]:
await self.connected.wait()
assert isinstance(self.auth, dict)
auth = self.auth
self.auth = None
self.connected.clear()
return auth
async def wait_for_status(self) -> Dict[str, Any]:
await self.status_updated.wait()
assert isinstance(self.status, dict)
status = self.status
self.status = None
self.status_updated.clear()
return status
async def wait_for_disconnect(self) -> None:
await self.disconnected.wait()
self.disconnected.clear()
async def start(self) -> None:
await self.runner.setup()
address, port = self.info.url.removeprefix("http://").split(":")
await aiohttp.web.TCPSite(self.runner, address, int(port)).start()
async def stop(self) -> None:
await self.runner.cleanup()
async def main() -> int:
info = coordinator.Info(url="http://localhost:5000", secret="1234")
dummy_coordinator = Coordinator(info)
await dummy_coordinator.start()
await asyncio.sleep(36000)
return 0
if __name__ == "__main__":
asyncio.run(main())
|