diff --git a/components/clp-package-utils/clp_package_utils/scripts/compress.py b/components/clp-package-utils/clp_package_utils/scripts/compress.py index ae9de02092..84303bfbf1 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/compress.py @@ -38,36 +38,47 @@ def _generate_logs_list( container_logs_list_path: pathlib.Path, parsed_args: argparse.Namespace, ) -> None: + host_logs_list_path = parsed_args.path_list if InputType.FS == input_type: - host_logs_list_path = parsed_args.path_list with open(container_logs_list_path, "w") as container_logs_list_file: if host_logs_list_path is not None: with open(host_logs_list_path, "r") as host_logs_list_file: for line in host_logs_list_file: - stripped_path_str = line.rstrip() - if "" == stripped_path_str: + stripped_url_str = line.rstrip() + if "" == stripped_url_str: # Skip empty paths continue - resolved_path = pathlib.Path(stripped_path_str).resolve() + resolved_path = pathlib.Path(stripped_url_str).resolve() mounted_path = CONTAINER_INPUT_LOGS_ROOT_DIR / resolved_path.relative_to( resolved_path.anchor ) container_logs_list_file.write(f"{mounted_path}\n") - for path in parsed_args.paths: - resolved_path = pathlib.Path(path).resolve() + for url in parsed_args.paths: + resolved_path = pathlib.Path(url).resolve() mounted_path = CONTAINER_INPUT_LOGS_ROOT_DIR / resolved_path.relative_to( resolved_path.anchor ) container_logs_list_file.write(f"{mounted_path}\n") - elif InputType.S3 == input_type: - with open(container_logs_list_path, "w") as container_logs_list_file: - container_logs_list_file.write(f"{parsed_args.paths[0]}\n") - - else: + elif InputType.S3 != input_type: raise ValueError(f"Unsupported input type: {input_type}.") + # Handle S3 inputs + with open(container_logs_list_path, "w") as container_logs_list_file: + if host_logs_list_path is None: + for url in parsed_args.paths: + container_logs_list_file.write(f"{url}\n") + return + + with open(host_logs_list_path, "r") as host_logs_list_file: + for line in host_logs_list_file: + stripped_url_str = line.rstrip() + if "" == stripped_url_str: + # Skip empty lines + continue + container_logs_list_file.write(f"{stripped_url_str}\n") + def _generate_compress_cmd( parsed_args: argparse.Namespace, @@ -96,6 +107,8 @@ def _generate_compress_cmd( compress_cmd.append(parsed_args.tags) if parsed_args.no_progress_reporting is True: compress_cmd.append("--no-progress-reporting") + if parsed_args.s3_single_prefix is True: + compress_cmd.append("--s3-single-prefix") compress_cmd.append("--logs-list") compress_cmd.append(str(logs_list_path)) @@ -126,10 +139,20 @@ def _validate_s3_input_args( f"Input type {InputType.S3} is only supported for the storage engine" f" {StorageEngine.CLP_S}." ) - if len(parsed_args.paths) != 1: - args_parser.error(f"Only one URL can be specified for input type {InputType.S3}.") - if parsed_args.path_list is not None: - args_parser.error(f"Path list file is unsupported for input type {InputType.S3}.") + + if parsed_args.s3_single_prefix: + if len(parsed_args.paths) != 1: + args_parser.error(f"Only one URL can be specified for input type {InputType.S3}.") + if parsed_args.path_list is not None: + # TODO: Do we want to support path lists for S3 input? + args_parser.error(f"Path list file is unsupported for input type {InputType.S3}.") + return + + if len(parsed_args.paths) == 0 and parsed_args.path_list is None: + args_parser.error("No URLs specified.") + + if len(parsed_args.paths) > 0 and parsed_args.path_list is not None: + args_parser.error("URLs cannot be specified on the command line AND through a file.") def main(argv): @@ -173,6 +196,14 @@ def main(argv): args_parser.add_argument( "-f", "--path-list", dest="path_list", help="A file listing all paths to compress." ) + args_parser.add_argument( + "--s3-single-prefix", + action="store_true", + help=( + "Treat the S3 URL as a single prefix. If set, only a single S3 URL should be provided" + " and it must be explicitly given as a positional argument." + ), + ) parsed_args = args_parser.parse_args(argv[1:]) if parsed_args.verbose: diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py index e56a6722b9..6b5e468ae9 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py @@ -1,6 +1,9 @@ +from __future__ import annotations + import argparse import datetime import logging +import os import pathlib import sys import time @@ -153,7 +156,11 @@ def _generate_clp_io_config( timestamp_key=parsed_args.timestamp_key, path_prefix_to_remove=str(CONTAINER_INPUT_LOGS_ROOT_DIR), ) - elif InputType.S3 == input_type: + elif InputType.S3 != input_type: + raise ValueError(f"Unsupported input type: {input_type}") + + # Generate S3 input config + if parsed_args.s3_single_prefix: if len(logs_to_compress) == 0: raise ValueError("No URLs given.") elif len(logs_to_compress) != 1: @@ -170,8 +177,58 @@ def _generate_clp_io_config( aws_authentication=aws_authentication, timestamp_key=parsed_args.timestamp_key, ) - else: - raise ValueError(f"Unsupported input type: {input_type}") + + return _generate_s3_input_config_with_key_validation(clp_config, logs_to_compress, parsed_args) + + +def _generate_s3_input_config_with_key_validation( + clp_config: CLPConfig, logs_to_compress: List[str], parsed_args: argparse.Namespace +) -> S3InputConfig: + # TODO: Add docstring + if len(logs_to_compress) == 0: + raise ValueError("No URLs given.") + + # Validate all the given URLs + region_code: str | None = None + bucket_name: str | None = None + keys = set() + for s3_url in logs_to_compress: + parsed_region_code, parsed_bucket_name, key = parse_s3_url(s3_url) + if region_code is None: + region_code = parsed_region_code + elif region_code != parsed_region_code: + raise ValueError( + "All S3 URLs must be in the same region." + f" Found {region_code} and {parsed_region_code}." + ) + + if bucket_name is None: + bucket_name = parsed_bucket_name + elif bucket_name != parsed_bucket_name: + raise ValueError( + "All S3 URLs must be in the same bucket." + f" Found {bucket_name} and {parsed_bucket_name}." + ) + + if key in keys: + raise ValueError(f"Duplicate S3 key found: {key}.") + keys.add(key) + + # Determine the common prefix + key_list = list(keys) + prefix = os.path.commonprefix(key_list) + if len(prefix) == 0: + raise ValueError("The given S3 URLs do not share a common prefix.") + + return S3InputConfig( + dataset=parsed_args.dataset, + region_code=region_code, + bucket=bucket_name, + keys=key_list, + key_prefix=prefix, + aws_authentication=clp_config.logs_input.aws_authentication, + timestamp_key=parsed_args.timestamp_key, + ) def _get_logs_to_compress(logs_list_path: pathlib.Path) -> List[str]: @@ -226,6 +283,9 @@ def main(argv): "--timestamp-key", help="The path (e.g. x.y) for the field containing the log event's timestamp.", ) + args_parser.add_argument( + "--s3-single-prefix", action="store_true", help="Treat the input S3 URL as a single prefix." + ) args_parser.add_argument( "-t", "--tags", help="A comma-separated list of tags to apply to the compressed archives." ) diff --git a/components/clp-py-utils/clp_py_utils/s3_utils.py b/components/clp-py-utils/clp_py_utils/s3_utils.py index 47d97d1884..4626ce6db9 100644 --- a/components/clp-py-utils/clp_py_utils/s3_utils.py +++ b/components/clp-py-utils/clp_py_utils/s3_utils.py @@ -4,6 +4,7 @@ from typing import Dict, Final, List, Optional, Set, Tuple, Union import boto3 +import botocore from botocore.config import Config from job_orchestration.scheduler.job_config import S3InputConfig @@ -254,36 +255,27 @@ def generate_s3_virtual_hosted_style_url( def s3_get_object_metadata(s3_input_config: S3InputConfig) -> List[FileMetadata]: """ - Gets the metadata of all objects under the / specified by s3_input_config. + Gets the metadata of all objects specified by the given input config. + NOTE: We reuse FileMetadata to store the metadata of S3 objects where the object's key is stored as `path` in FileMetadata. :param s3_input_config: - :return: List[FileMetadata] containing the object's metadata on success, - :raises: Propagates `boto3.client`'s exceptions. - :raises: Propagates `boto3.client.get_paginator`'s exceptions. - :raises: Propagates `boto3.paginator`'s exceptions. + :return: A list of `FileMetadata` containing the object's metadata on success. + :raises: Propagates `_s3_get_object_metadata_from_single_prefix`'s exceptions. + :raises: Propagates `_s3_get_object_metadata_from_keys`'s exceptions. """ s3_client = _create_s3_client(s3_input_config.region_code, s3_input_config.aws_authentication) - file_metadata_list: List[FileMetadata] = list() - paginator = s3_client.get_paginator("list_objects_v2") - pages = paginator.paginate(Bucket=s3_input_config.bucket, Prefix=s3_input_config.key_prefix) - for page in pages: - contents = page.get("Contents", None) - if contents is None: - continue - - for obj in contents: - object_key = obj["Key"] - if object_key.endswith("/"): - # Skip any object that resolves to a directory-like path - continue - - file_metadata_list.append(FileMetadata(Path(object_key), obj["Size"])) + if s3_input_config.keys is None: + return _s3_get_object_metadata_from_single_prefix( + s3_client, s3_input_config.bucket, s3_input_config.key_prefix + ) - return file_metadata_list + return _s3_get_object_metadata_from_keys( + s3_client, s3_input_config.bucket, s3_input_config.key_prefix, s3_input_config.keys + ) def s3_put(s3_config: S3Config, src_file: Path, dest_path: str) -> None: @@ -392,3 +384,134 @@ def _gen_deletion_config(objects_list: List[str]): Bucket=s3_config.bucket, Delete=_gen_deletion_config(objects_to_delete), ) + + +def _s3_get_object_metadata_from_single_prefix( + s3_client: boto3.client, bucket: str, key_prefix: str +) -> List[FileMetadata]: + """ + Gets the metadata of all objects under the <`bucket`>/<`key_prefix`>. + + :param s3_client: + :param bucket: + :param key_prefix: + :return: A list of `FileMetadata` containing the object's metadata on success. + :raises: Propagates `boto3.client`'s exceptions. + :raises: Propagates `boto3.client.get_paginator`'s exceptions. + :raises: Propagates `boto3.paginator`'s exceptions. + """ + file_metadata_list: List[FileMetadata] = list() + paginator = s3_client.get_paginator("list_objects_v2") + pages = paginator.paginate(Bucket=bucket, Prefix=key_prefix) + for page in pages: + contents = page.get("Contents", None) + if contents is None: + continue + + for obj in contents: + object_key = obj["Key"] + if object_key.endswith("/"): + # Skip any object that resolves to a directory-like path + continue + + file_metadata_list.append(FileMetadata(Path(object_key), obj["Size"])) + + return file_metadata_list + + +def _s3_get_object_metadata_from_keys( + s3_client: boto3.client, bucket: str, key_prefix: str, keys: List[str] +) -> List[FileMetadata]: + """ + Gets the metadata of all objects specified in `keys` under the <`bucket`>. + + :param s3_client: + :param bucket: + :param keys: + :return: A list of `FileMetadata` containing the object's metadata on success. + :raises: ValueError if `keys` is an empty list. + :raises: ValueError if any key in `keys` doesn't start with `key_prefix`. + :raises: ValueError if duplicate keys are found in `keys`. + :raises: ValueError if any key in `keys` doesn't exist in the bucket. + :raises: Propagates `_s3_get_object_metadata_from_key`'s exceptions. + :raises: Propagates `boto3.client.get_paginator`'s exceptions. + :raises: Propagates `boto3.paginator`'s exceptions. + """ + # Key validation + if len(keys) == 0: + raise ValueError("The list of keys is empty.") + + keys.sort() + for idx, key in enumerate(keys): + if not key.startswith(key_prefix): + raise ValueError(f"Key `{key}` doesn't start with the specified prefix `{key_prefix}`.") + if idx > 0 and key == keys[idx - 1]: + raise ValueError(f"Duplicate key found: `{key}`.") + + key_iterator = iter(keys) + first_key = next(key_iterator) + file_metadata_list: List[FileMetadata] = list() + file_metadata_list.append(_s3_get_object_metadata_from_key(s3_client, bucket, first_key)) + + next_key = next(key_iterator, None) + if next_key is None: + return file_metadata_list + + paginator = s3_client.get_paginator("list_objects_v2") + for page in paginator.paginate(Bucket=bucket, Prefix=key_prefix, StartAfter=first_key): + contents = page.get("Contents", None) + if contents is None: + continue + + for obj in contents: + object_key = obj["Key"] + if object_key.endswith("/"): + # Skip any object that resolves to a directory-like path + continue + + # We need to do both < and > checks since they are handled differently. Ideally, we can + # do it with a single comparison. However, Python doesn't support three-way comparison. + if object_key < next_key: + continue + if object_key > next_key: + raise ValueError(f"Key `{next_key}` doesn't exist in the bucket `{bucket}`.") + + file_metadata_list.append(FileMetadata(Path(object_key), obj["Size"])) + next_key = next(key_iterator, None) + if next_key is None: + # Early exit sine all keys have been found. + return file_metadata_list + + # If control flow reaches here, it means there are still keys left to find. + absent_keys = [] + while next_key is not None: + absent_keys.append(next_key) + next_key = next(key_iterator, None) + serialized_absent_keys = "\n".join(absent_keys) + raise ValueError( + f"Cannot find following keys in the bucket `{bucket}`:\n{serialized_absent_keys}" + ) + + +def _s3_get_object_metadata_from_key( + s3_client: boto3.client, bucket: str, key: str +) -> FileMetadata: + """ + Gets the metadata of an object specified by the `key` under the <`bucket`>. + + :param s3_client: + :param bucket: + :param key: + :return: A `FileMetadata` containing the object's metadata on success. + :raises: ValueError if the object doesn't exist or fails to read the metadata. + :raises: Propagates `boto3.client.head_object`'s exceptions. + """ + try: + return FileMetadata( + Path(key), s3_client.head_object(Bucket=bucket, Key=key)["ContentLength"] + ) + except botocore.exceptions.ClientError as e: + raise ValueError( + f"Failed to read metadata of the key `{key}` from the bucket `{bucket}`" + f" with the error: {e}." + ) diff --git a/components/job-orchestration/job_orchestration/scheduler/job_config.py b/components/job-orchestration/job_orchestration/scheduler/job_config.py index 9b941bec73..f8b3b5fa77 100644 --- a/components/job-orchestration/job_orchestration/scheduler/job_config.py +++ b/components/job-orchestration/job_orchestration/scheduler/job_config.py @@ -30,6 +30,7 @@ class FsInputConfig(BaseModel): class S3InputConfig(S3Config): type: Literal[InputType.S3.value] = InputType.S3.value + keys: Optional[List[str]] = None dataset: Optional[str] = None timestamp_key: Optional[str] = None