summaryrefslogtreecommitdiffstats
path: root/scripts/corebenchrunner/tests/dummy.py
blob: 15b53efa0ca280bf8f4f7473d6fc593fcf5918fe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# Copyright (C) 2023 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
import asyncio
import datetime
from typing import Any, Dict, Optional

import aiohttp
import socketio  # type: ignore

import coordinator


class Coordinator:
    """Dummy implementation of the coordinator for testing."""

    def __init__(
        self,
        info: coordinator.Info,
        # Default values from documentation.
        ping_interval: int = 25,
        ping_timeout: int = 5,
    ) -> None:
        self.info = info
        self.work_item = coordinator.WorkItem(
            integration_id=1234,
            integration_url=None,
            integration_timestamp=datetime.datetime.fromisoformat("2000-01-01"),
            integration_data=[],
            branch="dev",
            revision="816ca43b88893e06ea866c3edadd0ca26f64b533",
        )
        self.server = socketio.AsyncServer(ping_interval=ping_interval, ping_timeout=ping_timeout)
        self.app = aiohttp.web.Application()
        self.server.attach(self.app)
        self.runner = aiohttp.web.AppRunner(self.app)

        # Event setup.
        self.server.on("connect")(self.handle_connect)
        self.socket_id: Optional[str] = None
        self.auth: Optional[Dict[str, str]] = None
        self.connected = asyncio.Event()
        self.server.on("statusUpdate")(self.handle_status_update)
        self.status: Optional[Dict[str, Any]] = None
        self.status_updated = asyncio.Event()
        self.server.on("fetchWork")(self.handle_send_work)
        self.server.on("queryWork")(self.handle_send_work)
        self.server.on("disconnect")(self.handle_disconnect)
        self.disconnected = asyncio.Event()

    async def handle_connect(
        self, socket_id: str, environ: Dict[str, Any], auth: Dict[str, Any]
    ) -> None:
        self.socket_id = socket_id
        self.auth = auth
        self.connected.set()

    async def handle_status_update(self, socket_id: str, status: Dict[str, Any]) -> None:
        self.status = status
        self.status_updated.set()

    async def handle_send_work(self, socket_id: str) -> None:
        await self.server.emit("sendWork", self.work_item.to_dictionary())

    async def handle_disconnect(self, socket_id: str) -> None:
        self.disconnected.set()

    async def wait_for_connect(self) -> Dict[str, str]:
        await self.connected.wait()
        assert isinstance(self.auth, dict)
        auth = self.auth
        self.auth = None
        self.connected.clear()
        return auth

    async def wait_for_status(self) -> Dict[str, Any]:
        await self.status_updated.wait()
        assert isinstance(self.status, dict)
        status = self.status
        self.status = None
        self.status_updated.clear()
        return status

    async def wait_for_disconnect(self) -> None:
        await self.disconnected.wait()
        self.disconnected.clear()

    async def start(self) -> None:
        await self.runner.setup()
        address, port = self.info.url.removeprefix("http://").split(":")
        await aiohttp.web.TCPSite(self.runner, address, int(port)).start()

    async def stop(self) -> None:
        await self.runner.cleanup()


async def main() -> int:
    info = coordinator.Info(url="http://localhost:5000", secret="1234")
    dummy_coordinator = Coordinator(info)
    await dummy_coordinator.start()
    await asyncio.sleep(36000)
    return 0


if __name__ == "__main__":
    asyncio.run(main())