aboutsummaryrefslogtreecommitdiffstats
path: root/packaging-tools/release_repo_meta_update.py
blob: ca646e74b0563c8e75fafb0476bf481bd0d29afd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

#############################################################################
#
# Copyright (C) 2022 The Qt Company Ltd.
# Contact: https://www.qt.io/licensing/
#
# This file is part of the release tools of the Qt Toolkit.
#
# $QT_BEGIN_LICENSE:GPL-EXCEPT$
# Commercial License Usage
# Licensees holding valid commercial Qt licenses may use this file in
# accordance with the commercial license agreement provided with the
# Software or, alternatively, in accordance with the terms contained in
# a written agreement between you and The Qt Company. For licensing terms
# and conditions see https://www.qt.io/terms-conditions. For further
# information use the contact form at https://www.qt.io/contact-us.
#
# GNU General Public License Usage
# Alternatively, this file may be used under the terms of the GNU
# General Public License version 3 as published by the Free Software
# Foundation with exceptions as appearing in the file LICENSE.GPL3-EXCEPT
# included in the packaging of this file. Please review the following
# information to ensure the GNU General Public License requirements will
# be met: https://www.gnu.org/licenses/gpl-3.0.html.
#
# $QT_END_LICENSE$
#
#############################################################################

import argparse
import os
import shutil
import sys
from datetime import datetime
from pathlib import Path
from time import time
from typing import Dict, List, Tuple

from bldinstallercommon import locate_path
from installer_utils import download_archive, extract_archive, is_valid_url_path
from logging_util import init_logger
from runner import run_cmd

if sys.version_info < (3, 7):
    import asyncio_backport as asyncio
else:
    import asyncio

log = init_logger(__name__, debug_mode=False)
session_timestamp = datetime.fromtimestamp(time()).strftime('%Y-%m-%d--%H:%M:%S')
CONVERT_SUFFIX = "____unified_metadata_update"
BACKUP_SUFFIX = "____split_metadata_backup-"


class IfwRepoUpdateError(Exception):
    pass


async def fetch_repogen(ifw_tools_url: str) -> str:
    assert is_valid_url_path(ifw_tools_url)
    log.info("Preparing ifw tools: %s", ifw_tools_url)
    # fetch the tool first
    current_dir = Path.cwd()
    ifw_tools_dir = os.path.join(current_dir, "ifw_tools")
    if not os.path.isdir(ifw_tools_dir):
        Path(ifw_tools_dir).mkdir(parents=True)
        dest_file = download_archive(ifw_tools_url, ifw_tools_dir)
        await extract_archive(dest_file, ifw_tools_dir)
    tool_name = "repogen"
    return locate_path(ifw_tools_dir, [tool_name])


def check_repos_which_can_be_updated(repositories_to_migrate: List[str]) -> Tuple[Dict[str, str], Dict[str, str]]:
    log.info("Analysing repositories..")
    existing_pending_repos = {}  # type: Dict[str, str]
    updatable_repos = {}  # type: Dict[str, str]
    for repo in repositories_to_migrate:
        if repo.endswith(CONVERT_SUFFIX):
            log.info("Skipping '%s' as it already is the pending repository", repo)
            continue
        if BACKUP_SUFFIX in repo:
            log.info("Skipping backup repo: %s", repo)
            continue
        migrated_repo = repo + CONVERT_SUFFIX
        if os.path.exists(migrated_repo):
            log.warning("There already exists pending repository '%s' for the source repo: %s", migrated_repo, repo)
            existing_pending_repos[repo] = migrated_repo
        else:
            updatable_repos[repo] = migrated_repo
    return (updatable_repos, existing_pending_repos)


async def create_converted_repositories(repogen: str, repositories_to_migrate: List[str], dry_run: bool = False) -> Tuple[Dict[str, str], Dict[str, str]]:
    # first check that pending repository does not already exist per given repository
    log.info("Starting to create new converted repositories: %s", len(repositories_to_migrate))
    updatable_repos, existing_pending_repos = check_repos_which_can_be_updated(repositories_to_migrate)
    if existing_pending_repos:
        log.warning("There are already existing pending repositories which should be completed first:")
        for repo in existing_pending_repos:
            log.warning("  %s", repo)
        raise IfwRepoUpdateError("Repositories found in pending state, complete those first!")

    # convert all repositories to combined metadata version
    successful_conversions = {}  # type: Dict[str, str]
    failed_conversions = {}  # type: Dict[str, str]
    for repo in updatable_repos:
        repo_output_path = repo + CONVERT_SUFFIX  # the "pending" repository
        cmd = [repogen, "--repository", repo, "--unite-metadata", repo_output_path]
        if dry_run:
            cmd.insert(0, "echo")
        try:
            # perform the update
            run_cmd(cmd=cmd, timeout=60 * 15)
            successful_conversions[repo] = repo_output_path
        except Exception as error:
            log.error("Failed to update metadata for repository: %s - reason: %s", repo, str(error))
            failed_conversions[repo] = repo_output_path

    return (successful_conversions, failed_conversions)


def swap_repositories(repositories_to_swap: Dict[str, str]) -> Tuple[Dict[str, Tuple[str, str, str]], Dict[str, Tuple[str, str, str]]]:
    log.info("Starting to swap converted repositories with destination directories: %s", len(repositories_to_swap))
    errors = []  # type: List[Tuple[str, str]]
    for orig_repo, converted_repo in repositories_to_swap.items():
        backup_repo_name = orig_repo + BACKUP_SUFFIX + session_timestamp
        if os.path.exists(backup_repo_name):
            # this really should not happen as backup dir name contains timestamp, but do check anyways
            errors.append((orig_repo, f"Destination backup directory already exists: {backup_repo_name}"))
        if not os.path.exists(orig_repo):
            errors.append((orig_repo, f"Source repository did not exist: {orig_repo}"))
        if not os.path.exists(converted_repo):
            errors.append((orig_repo, f"Converted repository did not exist: {converted_repo}"))

    if errors:
        log.error("Unable to proceed to swap repositories due to following found issues:")
        for orig_repo, msg in errors:
            log.error("  [%s]: %s", orig_repo, msg)
        raise IfwRepoUpdateError("Failed to swap repositories!")

    operations_ok = {}  # type: Dict[str, Tuple[str, str, str]]
    operations_nok = {}  # type: Dict[str, Tuple[str, str, str]]
    for orig_repo, converted_repo in repositories_to_swap.items():
        log.info("-> swapping: %s", orig_repo)
        backup_repo_name = orig_repo + BACKUP_SUFFIX + session_timestamp  # unique backup dir name
        try:
            # We want only unified metadata .7z and updated Updates.xml from the top level
            # and we want to ensure the data portion stays the same, so:
            # Remove all subdirs from converted repo
            for item in os.listdir(converted_repo):
                if os.path.isdir(os.path.join(converted_repo, item)):
                    shutil.rmtree(os.path.join(converted_repo, item))
            # Copy subdirs & content from orig repo to converted repo i.e. data portion
            for item in os.listdir(orig_repo):
                if os.path.isdir(os.path.join(orig_repo, item)):
                    shutil.copytree(os.path.join(orig_repo, item), os.path.join(converted_repo, item))

            # rename original repo as backup
            os.rename(orig_repo, backup_repo_name)
            # rename converted repo as the existing one
            os.rename(converted_repo, orig_repo)
            operations_ok[orig_repo] = (converted_repo, backup_repo_name, "")
        except Exception as error:
            log.error("%s", str(error))
            operations_nok[orig_repo] = (converted_repo, backup_repo_name, str(error))
    return (operations_ok, operations_nok)


def scan_repositories(search_path: str) -> Tuple[List[str], List[str], List[str], List[str]]:
    assert os.path.isdir(search_path), f"Not a valid directory: {search_path}"
    log.info("Scan repository status from: %s", search_path)

    def check_unified_meta_exists(path: Path) -> bool:
        return any(Path(path).glob("*_meta.7z"))

    repos = [path.resolve().parent for path in Path(search_path).rglob('Updates.xml')]
    done_repos = []  # type: List[str]
    pending_repos = []  # type: List[str]
    unconverted_repos = []  # type: List[str]
    broken_repos = []  # type: List[str]
    for repo in repos:
        if BACKUP_SUFFIX in repo.as_posix():
            log.info("Skipping backup repo: %s", repo.as_posix())
            continue
        if repo.as_posix().endswith(CONVERT_SUFFIX):
            if not check_unified_meta_exists(repo):
                # this is broken pending repo
                log.error("Pending repository was missing '_meta.7z'")
                broken_repos.append(repo.as_posix())
                continue
            # expected destination repo
            expected_destination_repo = Path(repo.as_posix().rstrip(CONVERT_SUFFIX))
            if not expected_destination_repo.exists():
                # this is broken pending repo
                log.error("Pending repository '%s' was missing matching destination directory: %s", repo.as_posix(), expected_destination_repo.as_posix())
                broken_repos.append(repo.as_posix())
                continue
            pending_repos.append(repo.as_posix())
        else:
            if check_unified_meta_exists(repo):
                done_repos.append(repo.as_posix())
            else:
                unconverted_repos.append(repo.as_posix())

    return (done_repos, pending_repos, unconverted_repos, broken_repos)


def convert_repos(search_path: str, ifw_tools_url: str) -> None:
    repogen = asyncio.run(fetch_repogen(ifw_tools_url))
    log.info("Using repogen from: %s", repogen)
    to_convert = scan_repositories(search_path)[2]
    converted_repos, failed_repos = asyncio.run(create_converted_repositories(repogen, to_convert))
    operations_ok, operations_nok = swap_repositories(converted_repos)
    for orig_repo, items in operations_ok.items():
        backup_repo_name = items[1]
        log.info("Converted repo: %s", orig_repo)
        log.info("  original backup: %s", backup_repo_name)
    if failed_repos:
        log.error("Some of the conversions failed -> aborting! Original repo(s) are in place. Cleanup tmp converted repo dirs!:")
        for repo, expected_output_repo in failed_repos.items():
            log.error("  '%s' -> '%s'", repo, expected_output_repo)
    for orig_repo, items in operations_nok.items():
        backup_repo_name = items[1]
        log.error("Failed swaps: %s", orig_repo)
        log.warning("  original backup: %s", backup_repo_name)


def revert_repos(search_path: str, ifw_tools_url: str, time_stamp: str, dry_run: bool) -> None:
    repogen = asyncio.run(fetch_repogen(ifw_tools_url))
    log.info("Using repogen from: %s", repogen)
    converted_repos = scan_repositories(search_path)[0]

    revert_actions: Dict[str, str] = {}
    for converted_repo in converted_repos:
        expected_backup_repo = converted_repo + "____split_metadata_backup-" + time_stamp
        if not os.path.isdir(expected_backup_repo):
            log.warning("Can not revert repository as original backup repo does not exist: %s", expected_backup_repo)
            continue
        revert_actions[converted_repo] = expected_backup_repo

    for converted, backup in revert_actions.items():
        reverted_backup_repo_name = converted + "____REVERTED"
        log.info("Reverting: '%s' -> '%s'", backup, converted)
        if dry_run:
            continue
        try:
            os.rename(converted, reverted_backup_repo_name)
            os.rename(backup, converted)
        except Exception:
            log.error("Failed to revert: '{backup}' -> '{converted}'")


def scan_repos(search_path: str) -> None:
    done_repos, pending_repos, unconverted_repos, broken_repos = scan_repositories(search_path)
    log.info("")
    log.info("--- Scan results ---")
    log.info("")
    log.info("Repositories already containing combined metadata:")
    for repo in sorted(done_repos):
        log.info("%s", repo)
    log.info("")
    log.info("Pending repositories containing combined metadata updates:")
    for repo in sorted(pending_repos):
        log.info("%s", repo)
    log.info("")
    log.info("Repositories that do not contain combined metadata (needs update):")
    for repo in sorted(unconverted_repos):
        log.info("%s", repo)
    log.info("")
    log.info("Broken repositories:")
    for repo in sorted(broken_repos):
        log.error("%s", repo)


def main() -> None:
    """Main"""
    parser = argparse.ArgumentParser(prog="Script to update split metadata to unified metadata in online repositories.")
    parser.add_argument("--search-path", dest="search_path", type=str, required=True, help="Path to scan for online repositories")
    parser.add_argument("--ifw-tools", dest="ifw_tools_url", type=str, required=True, help="Archive containing repogen(.exe)")
    parser.add_argument("--command", dest="command", type=str, choices=["scan", "convert", "revert"], required=True, help="")
    parser.add_argument("--revert-timestamp", dest="revert_timestamp", type=str, default="", help="Which backup to use")
    parser.add_argument("--dry-run", dest="dry_run", action='store_true')

    args = parser.parse_args(sys.argv[1:])
    if args.command == "scan":
        scan_repos(args.search_path)
    elif args.command == "convert":
        convert_repos(args.search_path, args.ifw_tools_url)
    elif args.command == "revert":
        revert_repos(args.search_path, args.ifw_tools_url, args.revert_timestamp, args.dry_run)
    else:
        log.error("Invalid command given: %s", args.command)


if __name__ == "__main__":
    main()