1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
# Copyright (C) 2023 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
import asyncio
import datetime
import logging
from typing import Any, Dict, List, Optional
import socketio # type: ignore
class Info:
"""
Information about the work coordinator.
It monitors integrations and sends work to the runner.
"""
def __init__(self, url: str, secret: str) -> None:
self.url = url
self.secret = secret
class WorkItem:
"""
Item of work, containing a Git revision.
The runner should check out the revision, run benchmarks, and upload results.
"""
timestamp_format = "%Y-%m-%dT%H:%M:%S.%fZ"
def __init__(
self,
integration_id: int,
integration_url: Optional[str],
integration_timestamp: datetime.datetime,
integration_data: List[Dict[str, Any]],
branch: str,
revision: str,
) -> None:
self.integration_id = integration_id
self.integration_url = integration_url
self.integration_timestamp = integration_timestamp
self.integration_data = integration_data
self.branch = branch
self.revision = revision
def to_dictionary(self) -> Dict[str, Any]:
timestamp = self.integration_timestamp.strftime(WorkItem.timestamp_format)
return {
"integrationId": self.integration_id,
"integrationURL": self.integration_url,
"integrationTimestamp": timestamp,
"integrationData": self.integration_data,
"branch": self.branch,
"sha": self.revision,
}
@staticmethod
def from_dictionary(dictionary: Dict[str, Any]) -> "WorkItem":
timestamp = datetime.datetime.strptime(
dictionary["integrationTimestamp"], WorkItem.timestamp_format
)
return WorkItem(
integration_id=dictionary["integrationId"],
integration_url=dictionary["integrationURL"],
integration_timestamp=timestamp,
integration_data=dictionary["integrationData"],
branch=dictionary["branch"],
revision=dictionary["sha"],
)
class Connection:
"""
Connection to the work coordinator. The runner uses it to fetch work and send status updates.
"""
client_type = "agent"
fetch_delay = 30
def __init__(
self, coordinator_info: Info, hostname: str, logger: Optional[logging.Logger]
) -> None:
self.coordinator_info = coordinator_info
self.hostname = hostname
# Used to send events to and receive events from the coordinator.
self.client = socketio.AsyncClient(
handle_sigint=False,
logger=False if logger is None else logger.getChild("socketio"),
engineio_logger=False if logger is None else logger.getChild("engineio"),
)
self.client.on("sendWork")(self._handle_send_work_event)
self.client.on("connect")(self._handle_connect_event)
# Used to pass work items from the "sendWork" callback threads to the main thread.
self.work_item: Optional[Dict[str, Any]] = None
self.work_event_received = asyncio.Condition()
# Used to send status updates after establishing a connection.
self.status: Dict[str, Any] = {"status": "idle"}
self.status_lock = asyncio.Lock()
async def __aenter__(self) -> "Connection":
auth = {
"clientType": Connection.client_type,
"hostname": self.hostname,
"secret": self.coordinator_info.secret,
}
await self.client.connect(url=self.coordinator_info.url, auth=auth)
return self
async def __aexit__(self, exception_type: Any, exception_value: Any, traceback: Any) -> bool:
await self.client.disconnect()
return False
async def send_status(
self, status: str, message: str, work_item: WorkItem, logger: Optional[logging.Logger]
) -> None:
"""
Inform the coordinator about the progress of a work item.
"""
dictionary = {
"status": status,
"detailMessage": message,
"updateTimestamp": datetime.datetime.now().strftime(WorkItem.timestamp_format),
}
dictionary.update(work_item.to_dictionary())
async with self.status_lock:
try:
await self.client.emit(event="statusUpdate", data=dictionary)
except socketio.exceptions.BadNamespaceError as error:
if logger is not None:
logger.warning(f"Could not send status: {error}") # Will be sent on reconnect.
finally:
self.status = dictionary
async def fetch_work(self, use_query_event: bool, logger: Optional[logging.Logger]) -> WorkItem:
async with self.work_event_received:
self.work_item = None
while self.work_item is None:
# Send an event.
try:
await self._send_fetch_work_event(use_query_event)
except socketio.exceptions.BadNamespaceError as error:
if logger is not None:
logger.warning(f"Could not fetch work: {error}")
await asyncio.sleep(Connection.fetch_delay)
continue
# Wait for the response.
try:
await asyncio.wait_for(self.work_event_received.wait(), Connection.fetch_delay)
except asyncio.TimeoutError:
if logger is not None:
logger.debug("Waiting for work")
if logger is not None:
logger.debug(
"\n\t".join(
["Received a work object with these values:"]
+ [f"{key}: {value}" for key, value in self.work_item.items()]
)
)
return WorkItem.from_dictionary(self.work_item)
async def _send_fetch_work_event(self, use_query_event: bool) -> None:
await self.client.emit("queryWork" if use_query_event else "fetchWork")
async def _handle_connect_event(self) -> None:
async with self.status_lock:
await self.client.emit(event="statusUpdate", data=self.status)
async def _handle_send_work_event(self, data: Optional[Dict[str, Any]]) -> None:
async with self.work_event_received:
self.work_item = data
self.work_event_received.notify()
|