-
Notifications
You must be signed in to change notification settings - Fork 1
Extract and validate path artifacts during ingestion #167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| keyed by case.id, to avoid repeated DB queries. | ||
| """ | ||
|
|
||
| import shlex | ||
| from dataclasses import dataclass, field, replace | ||
| from datetime import datetime, timezone | ||
| from pathlib import Path | ||
|
|
@@ -32,9 +33,9 @@ | |
| from app.features.ingestion.parsers.types import ParsedSimulation | ||
| from app.features.machine.utils import resolve_machine_by_name | ||
| from app.features.simulation.config_delta import SimulationConfigSnapshot | ||
| from app.features.simulation.enums import SimulationStatus, SimulationType | ||
| from app.features.simulation.enums import ArtifactKind, SimulationStatus, SimulationType | ||
| from app.features.simulation.models import Case, Simulation | ||
| from app.features.simulation.schemas import SimulationCreate | ||
| from app.features.simulation.schemas import ArtifactCreate, SimulationCreate | ||
|
|
||
| logger = _setup_custom_logger(__name__) | ||
|
|
||
|
|
@@ -357,6 +358,7 @@ def _build_simulation_create( | |
| simulation = _validate_simulation_create( | ||
| replace(prevalidated_draft, case_id=case.id) | ||
| ) | ||
| simulation = _attach_path_artifacts(simulation, parsed_simulation) | ||
| logger.info( | ||
| "Mapped reference simulation from %s: %s", | ||
| parsed_simulation.execution_dir, | ||
|
|
@@ -372,6 +374,7 @@ def _build_simulation_create( | |
| run_config_deltas=delta if delta else None, | ||
| ) | ||
| simulation = _validate_simulation_create(simulation_draft) | ||
| simulation = _attach_path_artifacts(simulation, parsed_simulation) | ||
|
|
||
| if delta: | ||
| logger.info( | ||
|
|
@@ -388,6 +391,134 @@ def _build_simulation_create( | |
| return simulation | ||
|
|
||
|
|
||
| def _attach_path_artifacts( | ||
| simulation: SimulationCreate, | ||
| parsed_simulation: ParsedSimulation, | ||
| ) -> SimulationCreate: | ||
| path_artifacts = _build_path_artifacts(parsed_simulation) | ||
| if not path_artifacts: | ||
| return simulation | ||
|
|
||
| return simulation.model_copy(update={"artifacts": path_artifacts}) | ||
|
|
||
|
|
||
| def _build_path_artifacts(parsed_simulation: ParsedSimulation) -> list[ArtifactCreate]: | ||
| execution_dir = parsed_simulation.execution_dir | ||
| path_artifacts: list[ArtifactCreate] = [] | ||
|
|
||
| output_path = _validate_existing_path( | ||
| parsed_simulation.output_path, | ||
| source_name="RUNDIR", | ||
| execution_dir=execution_dir, | ||
| ) | ||
| archive_path = _validate_existing_path( | ||
| parsed_simulation.archive_path, | ||
| source_name="DOUT_S_ROOT", | ||
| execution_dir=execution_dir, | ||
| ) | ||
| run_script_path = _derive_case_run_script_path(parsed_simulation.case_root) | ||
| run_script_path = _validate_existing_path( | ||
| run_script_path, | ||
| source_name="CASEROOT/.case.run", | ||
| execution_dir=execution_dir, | ||
| ) | ||
| postprocessing_path = _extract_postprocessing_script_path( | ||
| parsed_simulation.postprocessing_script, | ||
| execution_dir=execution_dir, | ||
| ) | ||
| postprocessing_path = _validate_existing_path( | ||
| postprocessing_path, | ||
| source_name="POSTRUN_SCRIPT", | ||
| execution_dir=execution_dir, | ||
| ) | ||
|
|
||
| _append_path_artifact(path_artifacts, ArtifactKind.OUTPUT, output_path) | ||
| _append_path_artifact(path_artifacts, ArtifactKind.ARCHIVE, archive_path) | ||
| _append_path_artifact(path_artifacts, ArtifactKind.RUN_SCRIPT, run_script_path) | ||
| _append_path_artifact( | ||
| path_artifacts, | ||
| ArtifactKind.POSTPROCESS_SCRIPT, | ||
| postprocessing_path, | ||
| ) | ||
|
|
||
| return path_artifacts | ||
|
|
||
|
|
||
| def _append_path_artifact( | ||
| artifacts: list[ArtifactCreate], kind: ArtifactKind, uri: str | None | ||
| ) -> None: | ||
| if uri is None: | ||
| return | ||
|
|
||
| artifacts.append(ArtifactCreate(kind=kind, uri=uri)) | ||
|
|
||
|
|
||
| def _derive_case_run_script_path(case_root: str | None) -> str | None: | ||
| normalized_case_root = _normalize_path_candidate(case_root) | ||
| if normalized_case_root is None: | ||
| return None | ||
|
|
||
| return str(Path(normalized_case_root) / ".case.run") | ||
|
|
||
|
|
||
| def _extract_postprocessing_script_path( | ||
| postprocessing_script: str | None, | ||
| execution_dir: str, | ||
| ) -> str | None: | ||
| normalized_script = _normalize_path_candidate(postprocessing_script) | ||
| if normalized_script is None: | ||
| return None | ||
|
|
||
| try: | ||
| tokens = shlex.split(normalized_script) | ||
| except ValueError: | ||
| logger.warning( | ||
| "Skipping POSTRUN_SCRIPT artifact for '%s': could not parse value '%s'.", | ||
| execution_dir, | ||
| normalized_script, | ||
| ) | ||
| return None | ||
|
|
||
| if not tokens: | ||
| return None | ||
|
|
||
| return tokens[0] | ||
|
|
||
|
|
||
| def _validate_existing_path( | ||
| path_value: str | None, | ||
| *, | ||
| source_name: str, | ||
| execution_dir: str, | ||
| ) -> str | None: | ||
| normalized_path = _normalize_path_candidate(path_value) | ||
| if normalized_path is None: | ||
| return None | ||
|
|
||
| candidate_path = Path(normalized_path).expanduser() | ||
| if not candidate_path.exists(): | ||
| logger.warning( | ||
| "Skipping %s artifact for '%s': path does not exist on ingest host: %s", | ||
| source_name, | ||
| execution_dir, | ||
| normalized_path, | ||
| ) | ||
| return None | ||
|
|
||
| return str(candidate_path) | ||
|
Comment on lines
+488
to
+508
|
||
|
|
||
|
Comment on lines
+394
to
+509
|
||
|
|
||
| def _normalize_path_candidate(path_value: str | None) -> str | None: | ||
| if path_value is None: | ||
| return None | ||
|
|
||
| normalized = path_value.strip() | ||
| if not normalized: | ||
| return None | ||
|
|
||
| return normalized | ||
|
|
||
|
|
||
| def _prevalidate_simulation_create( | ||
| parsed_simulation: ParsedSimulation, | ||
| machine_id: UUID, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,7 @@ def parse_env_case(env_case_path: str | Path) -> dict[str, str | None]: | |
| machine = _extract_value_from_file(env_case_path, "MACH") | ||
| user = _extract_value_from_file(env_case_path, "REALUSER") | ||
| compset_alias = _extract_value_from_file(env_case_path, "COMPSET") | ||
| case_root = _extract_value_from_file(env_case_path, "CASEROOT") | ||
|
|
||
| # Extract metadata that requires special handling | ||
| campaign, experiment_type = _extract_campaign_and_experiment_type(case_name) | ||
|
|
@@ -50,6 +51,7 @@ def parse_env_case(env_case_path: str | Path) -> dict[str, str | None]: | |
| "campaign": campaign, | ||
| "experiment_type": experiment_type, | ||
| "compset_alias": compset_alias, | ||
| "case_root": case_root, | ||
| } | ||
|
Comment on lines
41
to
55
|
||
|
|
||
|
|
||
|
|
@@ -95,6 +97,9 @@ def parse_env_run(env_run_path: str | Path) -> dict[str, str | None]: | |
| stop_option = _extract_value_from_file(env_run_path, "STOP_OPTION") | ||
| stop_n = _extract_value_from_file(env_run_path, "STOP_N") | ||
| stop_date = _extract_value_from_file(env_run_path, "STOP_DATE") | ||
| output_path = _extract_value_from_file(env_run_path, "RUNDIR") | ||
| archive_path = _extract_value_from_file(env_run_path, "DOUT_S_ROOT") | ||
| postprocessing_script = _extract_value_from_file(env_run_path, "POSTRUN_SCRIPT") | ||
|
|
||
| simulation_start_date = ( | ||
| run_ref_date if initialization_type == "branch" else run_start_date | ||
|
|
@@ -110,6 +115,9 @@ def parse_env_run(env_run_path: str | Path) -> dict[str, str | None]: | |
| "initialization_type": initialization_type, | ||
| "simulation_start_date": simulation_start_date, | ||
| "simulation_end_date": simulation_end_date, | ||
| "output_path": output_path, | ||
| "archive_path": archive_path, | ||
| "postprocessing_script": postprocessing_script, | ||
| } | ||
|
|
||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_validate_existing_pathchecksPath(normalized_path).expanduser()directly, so any relative value fromPOSTRUN_SCRIPT/CASEROOTis resolved against the API process working directory instead of the simulation context. In archives where these entries are relative (a common shell-script pattern), valid artifacts in the case/execution tree are treated as missing and silently omitted, so ingestion loses artifact links even though the files exist.Useful? React with 👍 / 👎.