Skip to content

Commit 5712c89

Browse files
authored
Merge branch 'main' into Antler2
2 parents 33f4afe + 56ebca1 commit 5712c89

File tree

7 files changed

+129
-34
lines changed

7 files changed

+129
-34
lines changed

Cargo.lock

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

components/clp-package-utils/clp_package_utils/scripts/native/search.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def create_and_monitor_job_in_db(
7777

7878
if do_count_aggregation is None and count_by_time_bucket_size is None:
7979
return
80-
with pymongo.MongoClient(results_cache.get_uri()) as client:
80+
with pymongo.MongoClient(results_cache.get_uri(), directConnection=True) as client:
8181
search_results_collection = client[results_cache.db_name][str(job_id)]
8282
if do_count_aggregation is not None:
8383
for document in search_results_collection.find():

components/clp-rust-utils/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ edition = "2024"
55

66
[dependencies]
77
aws-config = "1.8.8"
8-
aws-sdk-s3 = { version = "1.106.0" }
8+
aws-sdk-s3 = "1.106.0"
9+
aws-sdk-sqs = "1.86.0"
910
secrecy = { version = "0.10.3", features = ["serde"] }
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pub mod s3;
2+
pub mod sqs;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
mod client;
2+
3+
pub use client::create_new_client;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use aws_config::BehaviorVersion;
2+
use aws_sdk_sqs::{
3+
Client,
4+
config::{Builder, Credentials, Region},
5+
};
6+
use secrecy::{ExposeSecret, SecretString};
7+
8+
/// Creates a new SQS client.
9+
/// The client is configured using the latest AWS SDK behavior version.
10+
///
11+
/// # Returns
12+
///
13+
/// A newly created SQS client.
14+
#[must_use]
15+
pub async fn create_new_client(
16+
region_id: &str,
17+
access_key_id: &str,
18+
secret_access_key: &SecretString,
19+
) -> Client {
20+
let credential = Credentials::new(
21+
access_key_id,
22+
secret_access_key.expose_secret(),
23+
None,
24+
None,
25+
"clp-credential-provider",
26+
);
27+
let region = Region::new(region_id.to_owned());
28+
let base_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
29+
let config = Builder::from(&base_config)
30+
.credentials_provider(credential)
31+
.region(region)
32+
.build();
33+
Client::from_conf(config)
34+
}

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py

Lines changed: 65 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -180,36 +180,45 @@ def _process_s3_input(
180180
paths_to_compress_buffer.add_file(object_metadata)
181181

182182

183-
def _write_failed_path_log(
184-
invalid_path_messages: List[str], logs_directory: Path, job_id: Any
183+
def _write_user_failure_log(
184+
title: str,
185+
content: List[str],
186+
logs_directory: Path,
187+
job_id: Any,
188+
filename_suffix: str,
185189
) -> Optional[Path]:
186190
"""
187-
Writes the error messages in `invalid_path_messages` to a log file,
188-
`{logs_directory}/user/failed_paths_{job_id}.txt`. The directory will be created if it doesn't
189-
already exist.
190-
:param invalid_path_messages:
191+
Writes a user-oriented failure log to
192+
`{logs_directory}/user/job_{job_id}_{filename_suffix}.txt`. The `{logs_directory}/user`
193+
directory will be created if it does not already exist.
194+
195+
:param title:
196+
:param content:
191197
:param logs_directory:
192198
:param job_id:
193-
:return: Path to the written log file or `None` if error is encountered.
199+
:param filename_suffix:
200+
:return: Path to the written log file relative to `logs_directory`, or `None` on error.
194201
"""
195-
196-
user_logs_dir = Path(logs_directory) / "user"
202+
relative_log_path = Path("user") / f"job_{job_id}_{filename_suffix}.txt"
203+
user_logs_dir = logs_directory / relative_log_path.parent
197204
try:
198205
user_logs_dir.mkdir(parents=True, exist_ok=True)
199-
except Exception:
206+
except Exception as e:
207+
logger.error("Failed to create user logs directory: '%s' - %s", user_logs_dir, e)
200208
return None
201209

202-
log_path = user_logs_dir / f"failed_paths_{job_id}.txt"
210+
log_path = logs_directory / relative_log_path
203211
try:
204212
with log_path.open("w", encoding="utf-8") as f:
205213
timestamp = datetime.datetime.now().isoformat(timespec="seconds")
206-
f.write(f"Failed input paths log.\nGenerated at {timestamp}.\n\n")
207-
for msg in invalid_path_messages:
208-
f.write(f"{msg.rstrip()}\n")
209-
except Exception:
214+
f.write(f"{title}\nGenerated at {timestamp}.\n\n")
215+
for item in content:
216+
f.write(f"{item.rstrip()}\n")
217+
except Exception as e:
218+
logger.error("Failed to write compression failure user log: '%s' - %s", log_path, e)
210219
return None
211220

212-
return log_path
221+
return relative_log_path
213222

214223

215224
def search_and_schedule_new_tasks(
@@ -274,18 +283,22 @@ def search_and_schedule_new_tasks(
274283
if input_type == InputType.FS.value:
275284
invalid_path_messages = _process_fs_input_paths(input_config, paths_to_compress_buffer)
276285
if len(invalid_path_messages) > 0:
277-
base_msg = "At least one of your input paths could not be processed."
278-
279-
user_log_path = _write_failed_path_log(
280-
invalid_path_messages, clp_config.logs_directory, job_id
286+
user_log_relative_path = _write_user_failure_log(
287+
title="Failed input paths log.",
288+
content=invalid_path_messages,
289+
logs_directory=clp_config.logs_directory,
290+
job_id=job_id,
291+
filename_suffix="failed_paths",
292+
)
293+
if user_log_relative_path is None:
294+
err_msg = "Failed to write user log for invalid input paths."
295+
raise RuntimeError(err_msg)
296+
297+
error_msg = (
298+
"At least one of your input paths could not be processed."
299+
f" See the error log at '{user_log_relative_path}' inside your configured logs"
300+
" directory (`logs_directory`) for more details."
281301
)
282-
if user_log_path is None:
283-
error_msg = base_msg + (
284-
f" Check the compression scheduler logs in {clp_config.logs_directory} for"
285-
" more details."
286-
)
287-
else:
288-
error_msg = base_msg + f" Check {user_log_path} for more details."
289302

290303
update_compression_job_metadata(
291304
db_cursor,
@@ -384,7 +397,7 @@ def search_and_schedule_new_tasks(
384397
scheduled_jobs[job_id] = job
385398

386399

387-
def poll_running_jobs(db_conn, db_cursor):
400+
def poll_running_jobs(logs_directory: Path, db_conn, db_cursor):
388401
"""
389402
Poll for running jobs and update their status.
390403
"""
@@ -395,7 +408,7 @@ def poll_running_jobs(db_conn, db_cursor):
395408
for job_id, job in scheduled_jobs.items():
396409
job_success = True
397410
duration = 0.0
398-
error_message = ""
411+
error_messages: List[str] = []
399412

400413
try:
401414
returned_results = job.result_handle.get_result()
@@ -412,7 +425,9 @@ def poll_running_jobs(db_conn, db_cursor):
412425
)
413426
else:
414427
job_success = False
415-
error_message += f"task {task_result.task_id}: {task_result.error_message}\n"
428+
error_messages.append(
429+
f"task {task_result.task_id}: {task_result.error_message}"
430+
)
416431
logger.error(
417432
f"Compression task job-{job_id}-task-{task_result.task_id} failed with"
418433
f" error: {task_result.error_message}."
@@ -434,12 +449,30 @@ def poll_running_jobs(db_conn, db_cursor):
434449
)
435450
else:
436451
logger.error(f"Job {job_id} failed. See worker logs or status_msg for details.")
452+
453+
error_log_relative_path = _write_user_failure_log(
454+
title="Compression task errors.",
455+
content=error_messages,
456+
logs_directory=logs_directory,
457+
job_id=job_id,
458+
filename_suffix="task_errors",
459+
)
460+
if error_log_relative_path is None:
461+
err_msg = "Failed to write user log for failed compression job."
462+
raise RuntimeError(err_msg)
463+
464+
error_msg = (
465+
"One or more compression tasks failed."
466+
f" See the error log at '{error_log_relative_path}' inside your configured logs"
467+
" directory (`logs_directory`) for more details."
468+
)
469+
437470
update_compression_job_metadata(
438471
db_cursor,
439472
job_id,
440473
dict(
441474
status=CompressionJobStatus.FAILED,
442-
status_msg=error_message,
475+
status_msg=error_msg,
443476
),
444477
)
445478
db_conn.commit()
@@ -515,7 +548,7 @@ def main(argv):
515548
clp_metadata_db_connection_config,
516549
task_manager,
517550
)
518-
poll_running_jobs(db_conn, db_cursor)
551+
poll_running_jobs(clp_config.logs_directory, db_conn, db_cursor)
519552
time.sleep(clp_config.compression_scheduler.jobs_poll_delay)
520553
except KeyboardInterrupt:
521554
logger.info("Forcefully shutting down")

0 commit comments

Comments
 (0)