Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,36 +38,47 @@ def _generate_logs_list(
container_logs_list_path: pathlib.Path,
parsed_args: argparse.Namespace,
) -> None:
host_logs_list_path = parsed_args.path_list
if InputType.FS == input_type:
host_logs_list_path = parsed_args.path_list
with open(container_logs_list_path, "w") as container_logs_list_file:
if host_logs_list_path is not None:
with open(host_logs_list_path, "r") as host_logs_list_file:
for line in host_logs_list_file:
stripped_path_str = line.rstrip()
if "" == stripped_path_str:
stripped_url_str = line.rstrip()
if "" == stripped_url_str:
# Skip empty paths
continue
resolved_path = pathlib.Path(stripped_path_str).resolve()
resolved_path = pathlib.Path(stripped_url_str).resolve()
mounted_path = CONTAINER_INPUT_LOGS_ROOT_DIR / resolved_path.relative_to(
resolved_path.anchor
)
container_logs_list_file.write(f"{mounted_path}\n")

for path in parsed_args.paths:
resolved_path = pathlib.Path(path).resolve()
for url in parsed_args.paths:
resolved_path = pathlib.Path(url).resolve()
mounted_path = CONTAINER_INPUT_LOGS_ROOT_DIR / resolved_path.relative_to(
resolved_path.anchor
)
container_logs_list_file.write(f"{mounted_path}\n")

elif InputType.S3 == input_type:
with open(container_logs_list_path, "w") as container_logs_list_file:
container_logs_list_file.write(f"{parsed_args.paths[0]}\n")

else:
elif InputType.S3 != input_type:
raise ValueError(f"Unsupported input type: {input_type}.")

# Handle S3 inputs
with open(container_logs_list_path, "w") as container_logs_list_file:
if host_logs_list_path is None:
for url in parsed_args.paths:
container_logs_list_file.write(f"{url}\n")
return

with open(host_logs_list_path, "r") as host_logs_list_file:
for line in host_logs_list_file:
stripped_url_str = line.rstrip()
if "" == stripped_url_str:
# Skip empty lines
continue
container_logs_list_file.write(f"{stripped_url_str}\n")


def _generate_compress_cmd(
parsed_args: argparse.Namespace,
Expand Down Expand Up @@ -96,6 +107,8 @@ def _generate_compress_cmd(
compress_cmd.append(parsed_args.tags)
if parsed_args.no_progress_reporting is True:
compress_cmd.append("--no-progress-reporting")
if parsed_args.s3_single_prefix is True:
compress_cmd.append("--s3-single-prefix")

compress_cmd.append("--logs-list")
compress_cmd.append(str(logs_list_path))
Expand Down Expand Up @@ -126,10 +139,20 @@ def _validate_s3_input_args(
f"Input type {InputType.S3} is only supported for the storage engine"
f" {StorageEngine.CLP_S}."
)
if len(parsed_args.paths) != 1:
args_parser.error(f"Only one URL can be specified for input type {InputType.S3}.")
if parsed_args.path_list is not None:
args_parser.error(f"Path list file is unsupported for input type {InputType.S3}.")

if parsed_args.s3_single_prefix:
if len(parsed_args.paths) != 1:
args_parser.error(f"Only one URL can be specified for input type {InputType.S3}.")
if parsed_args.path_list is not None:
# TODO: Do we want to support path lists for S3 input?
args_parser.error(f"Path list file is unsupported for input type {InputType.S3}.")
return

if len(parsed_args.paths) == 0 and parsed_args.path_list is None:
args_parser.error("No URLs specified.")

if len(parsed_args.paths) > 0 and parsed_args.path_list is not None:
args_parser.error("URLs cannot be specified on the command line AND through a file.")


def main(argv):
Expand Down Expand Up @@ -173,6 +196,14 @@ def main(argv):
args_parser.add_argument(
"-f", "--path-list", dest="path_list", help="A file listing all paths to compress."
)
args_parser.add_argument(
"--s3-single-prefix",
action="store_true",
help=(
"Treat the S3 URL as a single prefix. If set, only a single S3 URL should be provided"
" and it must be explicitly given as a positional argument."
),
)
Comment on lines +199 to +206
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check whether this flag name makes sense @kirkrodrigues @junhaoliao


parsed_args = args_parser.parse_args(argv[1:])
if parsed_args.verbose:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

import argparse
import datetime
import logging
import os
import pathlib
import sys
import time
Expand Down Expand Up @@ -153,7 +156,11 @@ def _generate_clp_io_config(
timestamp_key=parsed_args.timestamp_key,
path_prefix_to_remove=str(CONTAINER_INPUT_LOGS_ROOT_DIR),
)
elif InputType.S3 == input_type:
elif InputType.S3 != input_type:
raise ValueError(f"Unsupported input type: {input_type}")

# Generate S3 input config
if parsed_args.s3_single_prefix:
if len(logs_to_compress) == 0:
raise ValueError("No URLs given.")
elif len(logs_to_compress) != 1:
Expand All @@ -170,8 +177,58 @@ def _generate_clp_io_config(
aws_authentication=aws_authentication,
timestamp_key=parsed_args.timestamp_key,
)
else:
raise ValueError(f"Unsupported input type: {input_type}")

return _generate_s3_input_config_with_key_validation(clp_config, logs_to_compress, parsed_args)


def _generate_s3_input_config_with_key_validation(
clp_config: CLPConfig, logs_to_compress: List[str], parsed_args: argparse.Namespace
) -> S3InputConfig:
# TODO: Add docstring
if len(logs_to_compress) == 0:
raise ValueError("No URLs given.")

# Validate all the given URLs
region_code: str | None = None
bucket_name: str | None = None
keys = set()
for s3_url in logs_to_compress:
parsed_region_code, parsed_bucket_name, key = parse_s3_url(s3_url)
if region_code is None:
region_code = parsed_region_code
elif region_code != parsed_region_code:
raise ValueError(
"All S3 URLs must be in the same region."
f" Found {region_code} and {parsed_region_code}."
)

if bucket_name is None:
bucket_name = parsed_bucket_name
elif bucket_name != parsed_bucket_name:
raise ValueError(
"All S3 URLs must be in the same bucket."
f" Found {bucket_name} and {parsed_bucket_name}."
)

if key in keys:
raise ValueError(f"Duplicate S3 key found: {key}.")
keys.add(key)

# Determine the common prefix
key_list = list(keys)
prefix = os.path.commonprefix(key_list)
if len(prefix) == 0:
raise ValueError("The given S3 URLs do not share a common prefix.")

return S3InputConfig(
dataset=parsed_args.dataset,
region_code=region_code,
bucket=bucket_name,
keys=key_list,
key_prefix=prefix,
aws_authentication=clp_config.logs_input.aws_authentication,
timestamp_key=parsed_args.timestamp_key,
)


def _get_logs_to_compress(logs_list_path: pathlib.Path) -> List[str]:
Expand Down Expand Up @@ -226,6 +283,9 @@ def main(argv):
"--timestamp-key",
help="The path (e.g. x.y) for the field containing the log event's timestamp.",
)
args_parser.add_argument(
"--s3-single-prefix", action="store_true", help="Treat the input S3 URL as a single prefix."
)
args_parser.add_argument(
"-t", "--tags", help="A comma-separated list of tags to apply to the compressed archives."
)
Expand Down
165 changes: 144 additions & 21 deletions components/clp-py-utils/clp_py_utils/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Dict, Final, List, Optional, Set, Tuple, Union

import boto3
import botocore
from botocore.config import Config
from job_orchestration.scheduler.job_config import S3InputConfig

Expand Down Expand Up @@ -254,36 +255,27 @@ def generate_s3_virtual_hosted_style_url(

def s3_get_object_metadata(s3_input_config: S3InputConfig) -> List[FileMetadata]:
"""
Gets the metadata of all objects under the <bucket>/<key_prefix> specified by s3_input_config.
Gets the metadata of all objects specified by the given input config.

NOTE: We reuse FileMetadata to store the metadata of S3 objects where the object's key is stored
as `path` in FileMetadata.

:param s3_input_config:
:return: List[FileMetadata] containing the object's metadata on success,
:raises: Propagates `boto3.client`'s exceptions.
:raises: Propagates `boto3.client.get_paginator`'s exceptions.
:raises: Propagates `boto3.paginator`'s exceptions.
:return: A list of `FileMetadata` containing the object's metadata on success.
:raises: Propagates `_s3_get_object_metadata_from_single_prefix`'s exceptions.
:raises: Propagates `_s3_get_object_metadata_from_keys`'s exceptions.
"""

s3_client = _create_s3_client(s3_input_config.region_code, s3_input_config.aws_authentication)

file_metadata_list: List[FileMetadata] = list()
paginator = s3_client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=s3_input_config.bucket, Prefix=s3_input_config.key_prefix)
for page in pages:
contents = page.get("Contents", None)
if contents is None:
continue

for obj in contents:
object_key = obj["Key"]
if object_key.endswith("/"):
# Skip any object that resolves to a directory-like path
continue

file_metadata_list.append(FileMetadata(Path(object_key), obj["Size"]))
if s3_input_config.keys is None:
return _s3_get_object_metadata_from_single_prefix(
s3_client, s3_input_config.bucket, s3_input_config.key_prefix
)

return file_metadata_list
return _s3_get_object_metadata_from_keys(
s3_client, s3_input_config.bucket, s3_input_config.key_prefix, s3_input_config.keys
)


def s3_put(s3_config: S3Config, src_file: Path, dest_path: str) -> None:
Expand Down Expand Up @@ -392,3 +384,134 @@ def _gen_deletion_config(objects_list: List[str]):
Bucket=s3_config.bucket,
Delete=_gen_deletion_config(objects_to_delete),
)


def _s3_get_object_metadata_from_single_prefix(
s3_client: boto3.client, bucket: str, key_prefix: str
) -> List[FileMetadata]:
"""
Gets the metadata of all objects under the <`bucket`>/<`key_prefix`>.

:param s3_client:
:param bucket:
:param key_prefix:
:return: A list of `FileMetadata` containing the object's metadata on success.
:raises: Propagates `boto3.client`'s exceptions.
:raises: Propagates `boto3.client.get_paginator`'s exceptions.
:raises: Propagates `boto3.paginator`'s exceptions.
"""
file_metadata_list: List[FileMetadata] = list()
paginator = s3_client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=bucket, Prefix=key_prefix)
for page in pages:
contents = page.get("Contents", None)
if contents is None:
continue

for obj in contents:
object_key = obj["Key"]
if object_key.endswith("/"):
# Skip any object that resolves to a directory-like path
continue

file_metadata_list.append(FileMetadata(Path(object_key), obj["Size"]))

return file_metadata_list


def _s3_get_object_metadata_from_keys(
s3_client: boto3.client, bucket: str, key_prefix: str, keys: List[str]
) -> List[FileMetadata]:
"""
Gets the metadata of all objects specified in `keys` under the <`bucket`>.

:param s3_client:
:param bucket:
:param keys:
:return: A list of `FileMetadata` containing the object's metadata on success.
:raises: ValueError if `keys` is an empty list.
:raises: ValueError if any key in `keys` doesn't start with `key_prefix`.
:raises: ValueError if duplicate keys are found in `keys`.
:raises: ValueError if any key in `keys` doesn't exist in the bucket.
:raises: Propagates `_s3_get_object_metadata_from_key`'s exceptions.
:raises: Propagates `boto3.client.get_paginator`'s exceptions.
:raises: Propagates `boto3.paginator`'s exceptions.
"""
# Key validation
if len(keys) == 0:
raise ValueError("The list of keys is empty.")

keys.sort()
for idx, key in enumerate(keys):
if not key.startswith(key_prefix):
raise ValueError(f"Key `{key}` doesn't start with the specified prefix `{key_prefix}`.")
if idx > 0 and key == keys[idx - 1]:
raise ValueError(f"Duplicate key found: `{key}`.")

key_iterator = iter(keys)
first_key = next(key_iterator)
file_metadata_list: List[FileMetadata] = list()
file_metadata_list.append(_s3_get_object_metadata_from_key(s3_client, bucket, first_key))

next_key = next(key_iterator, None)
if next_key is None:
return file_metadata_list

paginator = s3_client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=key_prefix, StartAfter=first_key):
contents = page.get("Contents", None)
if contents is None:
continue

for obj in contents:
object_key = obj["Key"]
if object_key.endswith("/"):
# Skip any object that resolves to a directory-like path
continue

# We need to do both < and > checks since they are handled differently. Ideally, we can
# do it with a single comparison. However, Python doesn't support three-way comparison.
if object_key < next_key:
continue
if object_key > next_key:
raise ValueError(f"Key `{next_key}` doesn't exist in the bucket `{bucket}`.")

file_metadata_list.append(FileMetadata(Path(object_key), obj["Size"]))
next_key = next(key_iterator, None)
if next_key is None:
# Early exit sine all keys have been found.
return file_metadata_list

# If control flow reaches here, it means there are still keys left to find.
absent_keys = []
while next_key is not None:
absent_keys.append(next_key)
next_key = next(key_iterator, None)
serialized_absent_keys = "\n".join(absent_keys)
raise ValueError(
f"Cannot find following keys in the bucket `{bucket}`:\n{serialized_absent_keys}"
)


def _s3_get_object_metadata_from_key(
s3_client: boto3.client, bucket: str, key: str
) -> FileMetadata:
"""
Gets the metadata of an object specified by the `key` under the <`bucket`>.

:param s3_client:
:param bucket:
:param key:
:return: A `FileMetadata` containing the object's metadata on success.
:raises: ValueError if the object doesn't exist or fails to read the metadata.
:raises: Propagates `boto3.client.head_object`'s exceptions.
"""
try:
return FileMetadata(
Path(key), s3_client.head_object(Bucket=bucket, Key=key)["ContentLength"]
)
except botocore.exceptions.ClientError as e:
raise ValueError(
f"Failed to read metadata of the key `{key}` from the bucket `{bucket}`"
f" with the error: {e}."
)
Loading
Loading