#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#############################################################################
#
# Copyright (C) 2023 The Qt Company Ltd.
# Contact: https://www.qt.io/licensing/
#
# This file is part of the release tools of the Qt Toolkit.
#
# $QT_BEGIN_LICENSE:GPL-EXCEPT$
# Commercial License Usage
# Licensees holding valid commercial Qt licenses may use this file in
# accordance with the commercial license agreement provided with the
# Software or, alternatively, in accordance with the terms contained in
# a written agreement between you and The Qt Company. For licensing terms
# and conditions see https://www.qt.io/terms-conditions. For further
# information use the contact form at https://www.qt.io/contact-us.
#
# GNU General Public License Usage
# Alternatively, this file may be used under the terms of the GNU
# General Public License version 3 as published by the Free Software
# Foundation with exceptions as appearing in the file LICENSE.GPL3-EXCEPT
# included in the packaging of this file. Please review the following
# information to ensure the GNU General Public License requirements will
# be met: https://www.gnu.org/licenses/gpl-3.0.html.
#
# $QT_END_LICENSE$
#
#############################################################################
import os
import unittest
from configparser import ConfigParser
from pathlib import Path
from shutil import rmtree
from typing import List
from ddt import ddt # type: ignore
from temppathlib import TemporaryDirectory
from installer_utils import PackagingError, ch_dir
from read_remote_config import get_pkg_value
from release_repo_updater import (
append_to_task_filters,
build_online_repositories,
check_repogen_output,
create_remote_repository_backup,
ensure_ext_repo_paths,
format_task_filters,
has_connection_error,
parse_ext,
remote_file_exists,
reset_new_remote_repository,
string_to_bool,
upload_ifw_to_remote,
upload_pending_repository_content,
)
from release_task_reader import parse_data
from tests.testhelpers import (
asyncio_test,
asyncio_test_parallel_data,
is_internal_file_server_reachable,
)
def _write_dummy_file(path: str) -> None:
Path(path).parent.mkdir(parents=True, exist_ok=True)
with open(path, 'w+', encoding="utf-8") as handle:
handle.write("\n")
def _write_package_xml(path: str, version: str, release_date: str) -> None:
Path(path).parent.mkdir(parents=True, exist_ok=True)
with open(path, 'w+', encoding="utf-8") as handle:
handle.write("\n")
handle.write("\n")
handle.write(" qt.foo.bar1\n")
handle.write(" Test\n")
handle.write(" Test\n")
handle.write(f" {version}\n")
handle.write(f" {release_date}\n")
handle.write("\n")
def _write_updates_xml(path: str, version: str, release_date: str) -> None:
Path(path).parent.mkdir(parents=True, exist_ok=True)
with open(path, 'w+', encoding="utf-8") as handle:
handle.write("\n")
handle.write(" {AnyApplication}\n")
handle.write(" 1.0.0\n")
handle.write(" false\n")
handle.write(" \n")
handle.write(" qt.foo.bar1\n")
handle.write(" Foo bar\n")
handle.write(" Foo and bar\n")
handle.write(f" {version}\n")
handle.write(f" {release_date}\n")
handle.write(" \n")
handle.write(" \n")
handle.write(" c1559cbb0f0983909f7229dc79dfdf7eab46cd52\n")
handle.write(" \n")
handle.write("\n")
async def _get_repogen() -> str:
pkgsrv = get_pkg_value("PACKAGE_STORAGE_SERVER_PATH_HTTP")
ifw_tools = (
f"{pkgsrv}/archive/ifw/enterprise/unifiedqt/4.3.0/tqtc-installer-framework-Linux-RHEL_7_6-"
"GCC-Linux-RHEL_7_6-X86_64.7z"
)
server = "127.0.0.1"
with TemporaryDirectory() as temp_dir:
with ch_dir(str(temp_dir.path)):
return await upload_ifw_to_remote(ifw_tools, server)
@ddt
class TestReleaseRepoUpdater(unittest.TestCase):
server = None # type: str
@classmethod
def setUpClass(cls) -> None:
cls.server = "127.0.0.1"
@asyncio_test
async def test_remote_file_exists(self) -> None:
self.assertTrue(remote_file_exists(self.server, os.path.abspath(__file__)))
self.assertFalse(remote_file_exists(self.server, "/some/bogus/directory/foo.txt"))
@unittest.skipUnless(is_internal_file_server_reachable(), "Skipping because file server is not accessible")
@asyncio_test
async def test_upload_ifw_to_remote(self) -> None:
repogen = ""
try:
repogen = await _get_repogen()
finally:
repogen_path = Path.home() / repogen
self.assertTrue(repogen_path.is_file())
rmtree(repogen_path.parent)
@asyncio_test
async def test_upload_pending_repository_content(self) -> None:
with TemporaryDirectory(prefix="_repo_tmp_") as tmp_dir:
tmp_base_dir = tmp_dir.path
source_repo = tmp_base_dir / "repository"
destination_repo = tmp_base_dir / "destination_online_repository"
_write_dummy_file(str(source_repo / "qt.foo.bar1" / "meta" / "package.xml"))
_write_dummy_file(str(source_repo / "qt.foo.bar2" / "meta" / "package.xml"))
_write_dummy_file(str(source_repo / "Updates.xml"))
upload_pending_repository_content(self.server, str(source_repo), str(destination_repo))
self.assertListEqual(
sorted([x.relative_to(source_repo) for x in source_repo.iterdir()]),
sorted([x.relative_to(destination_repo) for x in destination_repo.iterdir()])
)
@asyncio_test
async def test_reset_new_remote_repository(self) -> None:
with TemporaryDirectory(prefix="_repo_tmp_") as tmp_dir:
tmp_base_dir = tmp_dir.path
remote_source_repo_path = tmp_base_dir / "repository"
remote_target_repo_path = tmp_base_dir / "destination_online_repository"
_write_dummy_file(str(remote_source_repo_path / "qt.foo.bar1" / "meta" / "package.xml"))
_write_dummy_file(str(remote_source_repo_path / "qt.foo.bar2" / "meta" / "package.xml"))
_write_dummy_file(str(remote_source_repo_path / "Updates.xml"))
reset_new_remote_repository(self.server, str(remote_source_repo_path), str(remote_target_repo_path))
self.assertTrue((remote_target_repo_path / "qt.foo.bar1" / "meta" / "package.xml").is_file())
self.assertTrue((remote_target_repo_path / "qt.foo.bar2" / "meta" / "package.xml").is_file())
self.assertTrue((remote_target_repo_path / "Updates.xml").is_file())
# existing repository should be automatically be moved as backup
reset_new_remote_repository(self.server, str(remote_source_repo_path), str(remote_target_repo_path))
backup_name = remote_target_repo_path.name + "____snapshot_backup"
self.assertTrue((remote_target_repo_path.with_name(backup_name)).exists())
@asyncio_test
async def test_create_remote_repository_backup(self) -> None:
with TemporaryDirectory(prefix="_repo_tmp_") as tmp_dir:
tmp_base_dir = tmp_dir.path
remote_source_repo_path = tmp_base_dir / "repository"
_write_dummy_file(str(remote_source_repo_path / "qt.foo.bar1" / "meta" / "package.xml"))
_write_dummy_file(str(remote_source_repo_path / "qt.foo.bar2" / "meta" / "package.xml"))
_write_dummy_file(str(remote_source_repo_path / "Updates.xml"))
remote_repo_backup_path = Path(
create_remote_repository_backup(
self.server, str(remote_source_repo_path)
)
)
self.assertFalse(remote_source_repo_path.exists())
self.assertListEqual(
sorted(["Updates.xml", "qt.foo.bar1", "qt.foo.bar2"]),
sorted([x.name for x in remote_repo_backup_path.iterdir()])
)
@asyncio_test_parallel_data( # type: ignore
(True, True),
(False, False),
("yes", True),
("1", True),
("y", True),
("false", False),
("n", False),
("0", False),
("no", False),
)
async def test_string_to_bool(self, value: str, expected_result: bool) -> None:
self.assertEqual(string_to_bool(value), expected_result)
@asyncio_test
async def test_build_online_repositories_dryrun(self) -> None:
sample_config = """
[task.repository.linux.x86_64.repo1]
config_file: foobar_config_file
repo_path: foo/bar/path_1
"""
config = ConfigParser()
config.read_string(sample_config)
# parse all tasks i.e. no filters
tasks = parse_data(config, task_filters=[])
await build_online_repositories(tasks=tasks, license_="opensource", installer_config_base_dir="foo", artifact_share_base_url="foo",
ifw_tools="foo", build_repositories=False)
task = tasks.pop()
self.assertTrue(task.source_online_repository_path.endswith("foo/bar/path_1/online_repository"))
@asyncio_test
async def test_ensure_ext_repo_paths(self) -> None:
with TemporaryDirectory(prefix="_repo_tmp_") as tmp_dir:
expected_repo = tmp_dir.path / "some" / "test" / "path"
await ensure_ext_repo_paths(self.server, self.server, str(expected_repo))
self.assertTrue(expected_repo.is_dir())
@asyncio_test_parallel_data( # type: ignore
("user@server.com:/foo/bar"),
("server.com:/foo/bar"),
("user@server.com:/"), unpack=False
)
async def test_parse_ext_valid(self, ext: str) -> None:
parse_ext(ext)
@asyncio_test_parallel_data( # type: ignore
("user@server.com"),
("server.com:/foo/bar:"),
("user@server.com:some/path"), unpack=False
)
async def test_parse_ext_invalid(self, ext: str) -> None:
with self.assertRaises(PackagingError):
parse_ext(ext)
@asyncio_test_parallel_data( # type: ignore
("Error: Repository parameter missing argument"),
("Invalid content in ..."),
("Repository target directory /foobar/bar/foo already exists."), unpack=False
)
async def test_check_invalid_repogen_output(self, repogen_output: str) -> None:
with self.assertRaises(PackagingError):
check_repogen_output(repogen_output)
@asyncio_test_parallel_data( # type: ignore
("Update component a.b.c.d"),
("Cannot find new components to update"), unpack=False
)
async def test_check_valid_repogen_output(self, repogen_output: str) -> None:
# should not throw exception
check_repogen_output(repogen_output)
@asyncio_test_parallel_data( # type: ignore
([], ["repository"]),
(["linux,common"], ["repository,linux,common"]),
(["", "linux,common"], ["repository", "repository,linux,common"])
)
async def test_append_to_task_filters(self, task_filters: List[str], exp_result: bool) -> None:
self.assertEqual(append_to_task_filters(task_filters, "repository"), exp_result)
@asyncio_test_parallel_data( # type: ignore
(["task.repository.linux.x64.feature1"], ["task,repository,linux,x64,feature1"]),
(["task.repository.linux.x64.feature1", "windows.x64,feature2"],
["task,repository,linux,x64,feature1", "windows,x64,feature2"]),
(["offline,linux.x64,feature1"], ["offline,linux,x64,feature1"]),
(["linux"], ["linux"]),
([""], [""])
)
async def test_format_task_filters(self, task_filters: List[str], exp_result: bool) -> None:
self.assertEqual(format_task_filters(task_filters), exp_result)
@asyncio_test_parallel_data( # type: ignore
("qtsdkrepository/windows_x86/desktop/tools_maintenance/log-s3-2020-12-03--10:18:11-xml.t"
"xt:fatal error: Could not connect to the endpoint URL: 'https://qt-cdn.s3.eu-west-1.ama"
"zonaws.com/?list-type=2&prefix=qtsdkrepository%2Fwindows_x86%2Fdesktop%2Ftools_maintena"
"nce%2F&encoding-type=url'qtsdkrepository/", True),
("qtsdkrepository/windows_x86/desktop/tools_maintenance/log-s3-2020-12-03--10:18:11-xml.t"
"xt:fatal error: to the endpoint URL: 'https://qt-cdn.s3.eu-west-1.amazonaws.com/?list-t"
"ype=2&prefix=qtsdkrepository%2Fwindows_x86%2Fdesktop%2Ftools_maintenance%2F&encoding-ty"
"pe=url'qtsdkrepository/", False),
("", False)
)
async def test_has_connection_error(self, output: str, expected_result: bool) -> None:
self.assertEqual(expected_result, has_connection_error(output))
if __name__ == '__main__':
unittest.main()