summaryrefslogtreecommitdiffstats
path: root/scripts/jira/jira-bug-closer/gerrit/streamevents.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/jira/jira-bug-closer/gerrit/streamevents.py')
-rw-r--r--scripts/jira/jira-bug-closer/gerrit/streamevents.py156
1 files changed, 156 insertions, 0 deletions
diff --git a/scripts/jira/jira-bug-closer/gerrit/streamevents.py b/scripts/jira/jira-bug-closer/gerrit/streamevents.py
new file mode 100644
index 00000000..174cac80
--- /dev/null
+++ b/scripts/jira/jira-bug-closer/gerrit/streamevents.py
@@ -0,0 +1,156 @@
+#!/usr/bin/env python3
+#############################################################################
+##
+## Copyright (C) 2019 The Qt Company Ltd.
+## Contact: https://www.qt.io/licensing/
+##
+## This file is part of the Quality Assurance module of the Qt Toolkit.
+##
+## $QT_BEGIN_LICENSE:GPL-EXCEPT$
+## Commercial License Usage
+## Licensees holding valid commercial Qt licenses may use this file in
+## accordance with the commercial license agreement provided with the
+## Software or, alternatively, in accordance with the terms contained in
+## a written agreement between you and The Qt Company. For licensing terms
+## and conditions see https://www.qt.io/terms-conditions. For further
+## information use the contact form at https://www.qt.io/contact-us.
+##
+## GNU General Public License Usage
+## Alternatively, this file may be used under the terms of the GNU
+## General Public License version 3 as published by the Free Software
+## Foundation with exceptions as appearing in the file LICENSE.GPL3-EXCEPT
+## included in the packaging of this file. Please review the following
+## information to ensure the GNU General Public License requirements will
+## be met: https://www.gnu.org/licenses/gpl-3.0.html.
+##
+## $QT_END_LICENSE$
+##
+#############################################################################
+
+import asyncio
+import asyncssh
+import os
+from typing import Any, Callable, Coroutine, List
+from logger import logger
+log = logger("gerrit_stream")
+
+codereview = "codereview.qt-project.org"
+port = 29418
+user = 'qt_ci_bot'
+file_path = os.path.dirname(os.path.abspath(__file__))
+cibot_key_file = os.path.abspath(os.path.join(file_path, '../jira_gerrit_bot_id_rsa'))
+loop = asyncio.get_event_loop()
+
+
+class GerritSshClientSession(asyncssh.SSHClientSession): # type: ignore
+ def __init__(self) -> None:
+ self._buffer: str = ""
+ self._callback: Callable[[str], Coroutine[Any, Any, None]]
+
+ def setDataCallback(self, callback: Callable[[str], Coroutine[Any, Any, None]]) -> None:
+ self._callback = callback
+
+ def data_received(self, data: str, datatype: Any) -> None:
+ # Very long messages can be split in two, so we read into a buffer until we get
+ # a newline '\n' (which is the common case).
+ log.debug("Data received: '%s'", data)
+ self._buffer += data
+ if self._buffer.endswith('\n'):
+ if self._callback:
+ loop.create_task(self._callback(self._buffer))
+ self._buffer = ""
+
+ def eof_received(self) -> None:
+ log.warning('EOF received.')
+
+ def connection_lost(self, exc: Exception) -> None:
+ if exc:
+ log.error('SSH session error: "%s"', str(exc))
+ else:
+ log.warning('Connection lost (no exception).')
+
+
+class GerritSshClient(asyncssh.SSHClient): # type: ignore
+ def connection_made(self, conn: asyncssh.SSHClientConnection) -> None:
+ log.info('Connection made to %s.' % conn.get_extra_info('peername')[0])
+
+ def auth_completed(self) -> None:
+ log.info('Authentication successful.')
+
+ def connection_lost(self, exc: Exception) -> None:
+ log.warning('connection_lost %s', str(exc))
+
+
+class GerritStreamEvents():
+ def __init__(self) -> None:
+ self._session = None
+ self._connection = None
+ self._client = None
+ self._connection_lock = asyncio.Lock()
+
+ async def _create_connection(self) -> None:
+ async with self._connection_lock:
+ if self._connection:
+ return
+ conn, client = await asyncssh.create_connection(
+ GerritSshClient, host=codereview, port=port, username=user,
+ client_keys=[cibot_key_file])
+ log.info("Gerrit SSH client connected.")
+ self._connection = conn
+ self._client = client
+
+ def setDataCallback(self, callback: Callable[[str], Coroutine[Any, Any, None]]) -> None:
+ self._callback = callback
+ if self._session:
+ self._session.setDataCallback(callback)
+
+ async def list_all_projects(self) -> List[str]:
+ while True:
+ connection_attempts = 0
+ try:
+ await self._create_connection()
+ connection_attempts = 0
+ gerrit_process = await self._connection.run('gerrit ls-projects', check=True) # type: ignore
+ if gerrit_process.exit_status == 0:
+ projects = [project for project in gerrit_process.stdout.splitlines() if not project.startswith('{graveyard}')]
+ return projects
+ else:
+ raise Exception("failed to list gerrit projects")
+ except Exception:
+ self._connection = None
+ log.exception("Error listing gerrit projects:", exc_info=True)
+ connection_attempts += 1
+ await asyncio.sleep(connection_attempts * 2)
+
+ async def _run_client(self) -> None:
+ async with self._connection: # type: ignore
+ chan, session = await self._connection.create_session(GerritSshClientSession, 'gerrit stream-events') # type: ignore
+ self._session = session
+ if self._callback:
+ session.setDataCallback(self._callback)
+
+ # at the moment there seems to be no way
+ # for asyncssh to keep the connection alive
+ # so just manually do what openssh does.
+ while True:
+ channel_closed = chan.wait_closed()
+ done, _ = await asyncio.wait((asyncio.sleep(100), channel_closed), return_when=asyncio.FIRST_COMPLETED)
+ if channel_closed in done:
+ break
+ chan.write('keepalive@openssh.com')
+ log.debug("Sent SSH keep alive.")
+
+ log.warning("Session done.")
+
+ async def run(self) -> None:
+ connection_attempts = 0
+ while True:
+ try:
+ await self._create_connection()
+ connection_attempts = 0
+ await self._run_client()
+ except Exception:
+ log.exception('SSH connection failed.')
+ self._connection = None
+ connection_attempts += 1
+ await asyncio.sleep(connection_attempts * 2)