aboutsummaryrefslogtreecommitdiffstats
path: root/packaging-tools/read_remote_config.py
diff options
context:
space:
mode:
Diffstat (limited to 'packaging-tools/read_remote_config.py')
-rwxr-xr-xpackaging-tools/read_remote_config.py277
1 files changed, 255 insertions, 22 deletions
diff --git a/packaging-tools/read_remote_config.py b/packaging-tools/read_remote_config.py
index d4c854280..c7c987865 100755
--- a/packaging-tools/read_remote_config.py
+++ b/packaging-tools/read_remote_config.py
@@ -34,26 +34,184 @@ import os
import sys
from configparser import ConfigParser
from io import StringIO
-from typing import Any, Optional
-from urllib.request import urlopen
+from pathlib import Path
+from tempfile import TemporaryDirectory
+from typing import Dict, Optional, Union
+import pysftp # type: ignore
+from cryptography.fernet import Fernet, InvalidToken
+from paramiko import (
+ AgentKey,
+ AuthenticationException,
+ PasswordRequiredException,
+ RSAKey,
+ SSHException,
+)
+
+from installer_utils import PackagingError
from logging_util import init_logger
log = init_logger(__name__, debug_mode=False)
-class RemotePkgConfigError(Exception):
+class RemoteConfigError(Exception):
pass
-def read_packaging_keys_config_url(url: str) -> Any:
- return urlopen(url).read().decode('utf-8').strip()
+def _get_private_key() -> bytes:
+ """
+ Read encrypted private key file from default path
+
+ Returns:
+ Key file content in a bytes array
+
+ Raises:
+ PackagingError: When path is invalid or file cannot be found
+ """
+ try:
+ k_path = Path.home() / "sshkeys" / os.environ["ID_RSA_FILE"]
+ k_path.resolve(strict=True)
+ except KeyError as err:
+ raise PackagingError("Could not determine private key path from env") from err
+ except FileNotFoundError as err:
+ raise PackagingError(f"Failed to locate private key from path: {k_path}") from err
+ log.info("Reading the private key: %s", k_path)
+ with k_path.open("rb") as private_key:
+ return private_key.read()
+
+
+def _get_decrypt_key() -> bytes:
+ """
+ Read Fernet decryption key from default path
+
+ Returns:
+ Key file content in a bytes array
+
+ Raises:
+ PackagingError: When path is invalid or file cannot be found
+ """
+ try:
+ k_path = Path(os.environ["PKG_NODE_ROOT"], os.environ["FILES_SHARE_PATH"])
+ k_path.resolve(strict=True)
+ except KeyError as err:
+ raise PackagingError("Could not determine decryption key path from env") from err
+ except FileNotFoundError as err:
+ raise PackagingError(f"Failed to locate decryption key from path: {k_path}") from err
+ log.info("Reading the pre-generated Fernet key: %s", k_path)
+ with open(k_path, "rb") as decrypt_key:
+ return decrypt_key.read()
+
+
+def _decrypt_private_key(key: bytes, decrypt_key: bytes) -> RSAKey:
+ """
+ Decrypt a Fernet encrypted key and return a RSA key object containing the decrypted key
+
+ Args:
+ key: Encrypted content to be decrypted (Fernet token)
+ decrypt_key: Key for decryption (Fernet base64-encoded 32-byte key)
+
+ Raises:
+ PackagingError: Raised on the decryption failures or if the resulting data is not valid
+ """
+ log.info("Decrypting private key using pre-generated Fernet key")
+ try:
+ fernet = Fernet(decrypt_key)
+ decrypted_key = fernet.decrypt(key)
+ except InvalidToken:
+ raise PackagingError("Failed to decrypt private key, got invalid Fernet token") from None
+ try:
+ return RSAKey(file_obj=StringIO(decrypted_key.decode(encoding="utf-8")))
+ except SSHException:
+ raise PackagingError("Failed to create RSA key object, invalid key format?") from None
+
+
+def download_remote_file_sftp(
+ remote_host: Optional[str] = os.getenv("SFTP_ADDRESS"),
+ remote_path: Optional[Path] = None,
+ local_path: Optional[Path] = None,
+ username: Optional[str] = os.getenv("SFTP_USER"),
+ private_key: Optional[Union[str, RSAKey, AgentKey]] = None
+) -> None:
+ """
+ Transfer the given remote file to a given folder via SFTP
+
+ Args:
+ remote_host: An address or a hostname for the remote server
+ remote_path: A file system path on the remote server to transfer from
+ local_path: A file system path where to save the file, by default set to current work dir
+ username: Name used for authenticating with the remote server
+ private_key: A private key object or string path to a key file when not using the default
+ Raises:
+ PackagingError: Raised on missing arguments
+ RemoteConfigError: Re-raised on SFTP errors from pysftp.Connection
+ """
+ if private_key is None: # get default RSA private key
+ private_key = _decrypt_private_key(_get_private_key(), _get_decrypt_key())
+ if not remote_host or remote_path is None:
+ raise PackagingError("Remote host address and/or source path not specified")
+ if not all((username, private_key)):
+ raise PackagingError("SSH public key authentication options not specified")
+ log.debug("Transfer '%s:%s' -> '%s'", remote_host, remote_path, local_path)
+ cnopts = pysftp.CnOpts()
+ cnopts.hostkeys = None # disable host key checking
+ try:
+ with pysftp.Connection(
+ host=remote_host,
+ username=username,
+ private_key=private_key,
+ cnopts=cnopts,
+ ) as sftp:
+ sftp.get(remotepath=remote_path.as_posix(), localpath=local_path)
+ except pysftp.ConnectionException:
+ raise RemoteConfigError("Connection to the remote server failed") from None
+ except pysftp.CredentialException:
+ raise RemoteConfigError("Problem with credentials") from None
+ except (IOError, OSError):
+ raise RemoteConfigError("File doesn't exist on the remote or unable to save it") from None
+ except PasswordRequiredException:
+ raise RemoteConfigError("Private key was not decrypted before use") from None
+ except AuthenticationException:
+ raise RemoteConfigError("Authenticating with credentials failed") from None
+ except SSHException:
+ raise RemoteConfigError("SSH2 protocol failure") from None
-def parse_packaging_keys_config(config: str) -> ConfigParser:
+
+def _read_remote_config_sftp(remote_ini_path: Path) -> str:
+ """
+ Transfer the given remote config file to a temporary dir via SFTP and return the file content
+
+ Args:
+ remote_ini_path: A file system path on the remote host to read from
+
+ Returns:
+ Remote config .ini contents
+
+ Raises:
+ RemoteConfigError: Re-raised on config download error from download_remote_file_sftp
+ """
+ with TemporaryDirectory() as temp_dir:
+ local_path = Path(temp_dir) / "config.ini"
+ try:
+ download_remote_file_sftp(remote_path=remote_ini_path, local_path=local_path)
+ except RemoteConfigError as err:
+ raise RemoteConfigError("Failed to receive remote config!") from err
+ with open(local_path, "rb") as config:
+ return config.read().decode('utf-8').strip()
+
+
+def _parse_remote_config(config: str) -> ConfigParser:
+ """
+ Parse config using ConfigParser
+
+ Args:
+ config: A string containing the .ini file content
+
+ Returns:
+ An instance of ConfigParser with the parsed config
+ """
buf = StringIO(config)
settings = ConfigParser()
-
settings.read_file(buf)
return settings
@@ -61,29 +219,104 @@ def parse_packaging_keys_config(config: str) -> ConfigParser:
def get_pkg_value(
key: str,
section: str = "packaging",
- url: Optional[str] = os.getenv("PACKAGING_KEYS_CONFIG_URL"),
+ remote_cfg_path: Optional[Path] = None
) -> str:
- if getattr(get_pkg_value, 'pkg_remote_settings', None) is None:
- if not url:
- raise RemotePkgConfigError("Remote config URL not specified")
- config = read_packaging_keys_config_url(url)
- get_pkg_value.pkg_remote_settings = parse_packaging_keys_config(config) # type: ignore
- return get_pkg_value.pkg_remote_settings.get(section, key) # type: ignore
+ """
+ Get value for section and key in remote packaging config ini (sftp)
+ Configs dict will be cached as a function attribute 'cfg_cache' for future calls
+
+ Args:
+ key: A key in the config section
+ section: A section in the config (if empty, first section is used)
+ remote_cfg_path: A file system location for the config file on the remote
+
+ Returns:
+ Value for key (and section) or empty string if it doesn't exist
+
+ Raises:
+ PackagingError: When the config path is not specified or found
+ """
+ # Use the default packaging config ini from env if not specified
+ if remote_cfg_path is None:
+ try:
+ default_config_path_env = os.environ["PACKAGING_KEYS_CONFIG_PATH"]
+ except KeyError as err:
+ raise PackagingError("Remote config path not found from env or not specified") from err
+ remote_cfg_path = Path(default_config_path_env)
+ # Cache config to a function attribute
+ if getattr(get_pkg_value, 'cfg_cache', None) is None:
+ get_pkg_value.cfg_cache: Dict[Path, ConfigParser] = {} # type: ignore
+ if get_pkg_value.cfg_cache.get(remote_cfg_path, None) is None: # type: ignore
+ try:
+ config = _read_remote_config_sftp(remote_cfg_path)
+ except RemoteConfigError as err:
+ raise RemoteConfigError("Error while receiving config from the server") from err
+ get_pkg_value.cfg_cache[remote_cfg_path] = _parse_remote_config(config) # type: ignore
+ # Use the first section if an empty section was specified
+ section = section or get_pkg_value.cfg_cache[remote_cfg_path].sections()[0] # type: ignore
+ # Return the value for the key, or an empty string
+ return get_pkg_value.cfg_cache[remote_cfg_path].get(section, key, fallback="") # type: ignore
def main() -> None:
"""Main"""
parser = argparse.ArgumentParser(prog="Read values from remote config .ini file")
- parser.add_argument("--url", dest="url", type=str, default=os.getenv("PACKAGING_KEYS_CONFIG_URL"),
- help="Url pointing to file to be read")
- parser.add_argument("--section", type=str, default="packaging", help="The config section within the .ini")
- parser.add_argument("--key", type=str, required=True, help="The config key within the section")
+ subparsers = parser.add_subparsers(dest="command")
+ # Subparser for read-remote-env
+ p_read_remote = subparsers.add_parser(
+ "read-remote-env", help="Read environment value from SFTP remote config"
+ )
+ p_read_remote.add_argument(
+ "--config", dest="config", type=str, default=os.getenv("PACKAGING_KEYS_CONFIG_PATH"),
+ help="A file system path on the remote pointing to file to be read"
+ )
+ p_read_remote.add_argument(
+ "--section", type=str, default="packaging", help="The config section within the .ini"
+ )
+ p_read_remote.add_argument(
+ "--key", type=str, required=True, help="The config key within the section"
+ )
+ # Subparser for fetch-remote-file
+ p_fetch_file = subparsers.add_parser(
+ "fetch-remote-file", help="Fetch a file from a remote SFTP server"
+ )
+ p_fetch_file.add_argument(
+ "--remote-path", type=str, required=True, help="Remote sftp path e.g. [user@][server:]path"
+ )
+ p_fetch_file.add_argument(
+ "--output-path", type=Path, default=None, help="Local save path for file (default=cwd)"
+ )
+ # Parse args
args = parser.parse_args(sys.argv[1:])
- if not args.url or not args.section:
+ if args.command == "read-remote-env":
+ if not all((args.config, args.section, args.key)):
+ p_read_remote.print_help(sys.stderr)
+ raise SystemExit("Invalid/missing arguments for read-remote-env")
+ log.info("%s: '%s'", args.key, get_pkg_value(args.key, args.section, args.config))
+ elif args.command == "fetch-remote-file":
+ if not args.remote_path:
+ p_fetch_file.print_help(sys.stderr)
+ raise SystemExit("Missing --remote-path for fetch-remote-file")
+ username = None
+ hostname = None
+ try:
+ if "@" in args.remote_path: # user@server:path
+ username, args.remote_path = args.remote_path.split("@")
+ hostname, args.remote_path = args.remote_path.split(":")
+ elif ":" in args.remote_path: # server:path
+ hostname, args.remote_path = args.remote_path.split(":")
+ remote_path = Path(args.remote_path)
+ except ValueError:
+ p_fetch_file.print_help(sys.stderr)
+ raise SystemExit("Invalid --remote-path: Expected [user@][server:]path") from None
+ download_remote_file_sftp(
+ remote_host=hostname or os.getenv("SFTP_ADDRESS"),
+ remote_path=remote_path,
+ local_path=args.output_path,
+ username=username or os.getenv("SFTP_USER"),
+ )
+ else:
parser.print_help(sys.stderr)
- raise SystemExit("--url or --section missing")
-
- log.info("%s: '%s'", args.key, get_pkg_value(args.key, args.section, args.url))
if __name__ == "__main__":