Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@
#
# To create a restart ensemble after running spinup:
#
# 1. Analyze the completed spinup ensemble:
# from compass.landice.tests.ensemble_generator.ensemble_templates.sgh_ensemble.analysis import analyze_ensemble
# analyze_ensemble('/work/ensemble/spinup_ensemble', '/work/config.cfg')
# 1. Run the sgh_ensemble_analysis test case to produce analysis_summary.json
# compass setup -t landice/ensemble_generator/sgh_ensemble_analysis \
# -w /work/analysis -f analysis.cfg
# compass run -w /work/analysis
#
# 2. Schedule restarts:
# from compass.landice.tests.ensemble_generator.ensemble_templates.sgh_ensemble.restart import schedule_restarts
# from compass.landice.tests.ensemble_generator.sgh_restart_ensemble \
# .restart_scheduler import schedule_restarts
# config_file, runs = schedule_restarts(
# '/work/ensemble/spinup_ensemble/analysis_summary.json',
# '/work/analysis/sgh_ensemble_analysis/analyze_ensemble/analysis_summary.json',
# '/work/restart_ensemble'
# )
#
# 3. Set up and run the restart ensemble:
# compass setup -t restart_ensemble -w /work/restart_ensemble -f <config_file>
# compass setup -t landice/ensemble_generator/sgh_restart_ensemble \
# -w /work/restart_ensemble -f <config_file>
# compass run -w /work/restart_ensemble

[ensemble_generator]
Expand All @@ -31,6 +34,15 @@ ensemble_template = sgh_ensemble
# the run000, run001, etc. subdirectories to be restarted
spinup_work_dir = REPLACE_WITH_SPINUP_WORK_DIR

# Path to the analysis_summary.json produced by sgh_ensemble_analysis
# This file contains per-run steady-state and validation results used to
# determine which runs need restarting.
# Typical path: <analysis_work_dir>/sgh_ensemble_analysis/analyze_ensemble/
# analysis_summary.json
# Set to None (default) if not available — runs will be skipped with
# "No analysis results to verify progress"
analysis_summary_file = None

# Maximum consecutive restart attempts per run
# After this many restarts, the run will not be restarted again
# (prevents infinite loops if a run keeps failing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,21 @@ def setup(self):
print(f'Setting config_do_restart = .true. in {namelist_path}')
_set_restart_in_namelist(namelist_path)

# Create a restart_attempt_N/ directory to track how many restarts
# have been attempted for this run. configure() counts these dirs
# to enforce max_consecutive_restarts. List the directory once to
# find the highest existing attempt number, then create the next one.
existing_nums = [
int(d[len('restart_attempt_'):])
for d in os.listdir(run_dir)
if d.startswith('restart_attempt_') and
d[len('restart_attempt_'):].isdigit()
]
attempt_num = max(existing_nums, default=0) + 1
attempt_dir = os.path.join(run_dir, f'restart_attempt_{attempt_num}')
os.makedirs(attempt_dir, exist_ok=True)
print(f'Tracking restart attempt {attempt_num} in {attempt_dir}')

# Register MALI executable so compass knows this step needs the model
self.add_model_as_input()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ def create_config_file(self, restart_runs, base_config_file=None):
# Path to the spinup ensemble to restart from
spinup_work_dir = {self.original_ensemble_dir}

# Path to the analysis_summary.json from sgh_ensemble_analysis
analysis_summary_file = {self.summary_file}

# Restart configuration
# Maximum consecutive restart attempts per run (prevents infinite loops)
max_consecutive_restarts = 3
Expand Down Expand Up @@ -242,12 +245,12 @@ def schedule_restarts(

Examples
--------
>>> from compass.landice.tests.ensemble_generator.
ensemble_templates.sgh_ensemble.restart
import schedule_restarts
>>> from compass.landice.tests.ensemble_generator.sgh_restart_ensemble \
... .restart_scheduler import schedule_restarts
>>>
>>> config_file, restart_runs = schedule_restarts(
... '/work/ensemble1/spinup_ensemble/analysis_summary.json',
... '/work/analysis/sgh_ensemble_analysis/analyze_ensemble/'
... 'analysis_summary.json',
... '/work/ensemble2',
... min_years=50.0,
... max_attempts=3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import json
import os

import compass.namelist
from compass.landice.tests.ensemble_generator.ensemble_manager import (
EnsembleManager,
)
Expand Down Expand Up @@ -57,7 +58,14 @@ def configure(self):
4. Sets up ensemble_manager to handle job submission
"""
config = self.config
section = config.get('restart_ensemble', {})
# Load shipped defaults before reading any options so that the
# compass config infrastructure (MpasConfigParser) can resolve them
# without needing fallback= kwargs.
self.config.add_from_package(
'compass.landice.tests.ensemble_generator.sgh_restart_ensemble',
'ensemble_generator.cfg')
# Bug fix: use dict-style access to get a SectionProxy, not .get()
section = config['restart_ensemble']

spinup_work_dir = section.get('spinup_work_dir')

Expand All @@ -73,11 +81,26 @@ def configure(self):
raise ValueError(f"spinup_work_dir not found: {spinup_work_dir}")

# Get restart configuration
max_consecutive_restarts = section.getint(
'max_consecutive_restarts', 3)
max_consecutive_restarts = section.getint('max_consecutive_restarts')
min_simulation_years = section.getfloat(
'min_simulation_years_before_restart', 50.0)
auto_restart = section.getboolean('auto_restart_incomplete', True)
'min_simulation_years_before_restart')
auto_restart = section.getboolean('auto_restart_incomplete')

# Load per-run analysis results from analysis_summary.json if provided.
# The sgh_ensemble_analysis test case writes this file; it contains
# an 'individual_results' dict keyed by run number (strings in JSON).
analysis_summary = {}
analysis_summary_file = section.get('analysis_summary_file')
if analysis_summary_file and \
analysis_summary_file.lower() != 'none':
if not os.path.exists(analysis_summary_file):
raise ValueError(
"analysis_summary_file not found: "
f"{analysis_summary_file}")
with open(analysis_summary_file, 'r') as f:
summary = json.load(f)
# individual_results keys are ints in Python but strings in JSON
analysis_summary = summary.get('individual_results', {})

# Scan for existing run directories
run_dirs = sorted(glob.glob(os.path.join(spinup_work_dir, 'run*')))
Expand All @@ -92,13 +115,17 @@ def configure(self):
except ValueError:
continue

# Look up per-run results from the summary (JSON int keys → str)
run_results = analysis_summary.get(str(run_num))

# Check if run should be restarted
should_restart, reason = self._should_restart_run(
run_dir=run_dir,
run_num=run_num,
min_years=min_simulation_years,
max_restarts=max_consecutive_restarts,
auto_restart=auto_restart
auto_restart=auto_restart,
run_results=run_results
)

if should_restart:
Expand Down Expand Up @@ -131,7 +158,8 @@ def _should_restart_run(
run_num,
min_years,
max_restarts,
auto_restart):
auto_restart,
run_results=None):
"""
Determine if a run should be restarted.

Expand All @@ -152,6 +180,11 @@ def _should_restart_run(
auto_restart : bool
Whether to automatically restart incomplete runs

run_results : dict or None
Per-run results dict from ``individual_results[run_num]`` in
``analysis_summary.json``. When *None* the run cannot be
verified and will be skipped.

Returns
-------
tuple
Expand All @@ -174,7 +207,6 @@ def _should_restart_run(
with open(restart_timestamp_file, 'r') as f:
current_time = f.read().strip()

import compass.namelist
namelist = compass.namelist.ingest(namelist_file)
stop_time = \
namelist['time_management']['config_stop_time'].strip(
Expand All @@ -186,46 +218,36 @@ def _should_restart_run(
except Exception as e:
return False, f"Error reading completion status: {e}"

# Check analysis results
analysis_file = os.path.join(run_dir, 'analysis_results.json')
# Check analysis results supplied from analysis_summary.json.
# run_results is the per-run dict from individual_results[run_num].
if run_results is not None:
ss_info = run_results.get('steady_state') or {}

if os.path.exists(analysis_file):
try:
with open(analysis_file, 'r') as f:
results = json.load(f)

# If at steady state, don't restart
ss_info = results.get('steady_state', {})
if ss_info.get('is_steady_state', False):
return False, "Already at steady state"

# Check simulation length
metrics = ss_info.get('metrics', {})
sim_length = metrics.get('final_year', 0.0)

if sim_length < min_years:
return False, f"Too short ({
sim_length:.1f} < {
min_years:.1f} yrs)"

except (json.JSONDecodeError, IOError):
# If analysis file is malformed, still allow restart
pass
# If at steady state, don't restart
if ss_info.get('is_steady_state', False):
return False, "Already at steady state"

# Check simulation length
metrics = ss_info.get('metrics') or {}
sim_length = metrics.get('final_year', 0.0)

if sim_length < min_years:
return False, (f"Too short "
f"({sim_length:.1f} < {min_years:.1f} yrs)")
else:
# No analysis file - if we can't verify it reached min years, don't
# restart
# No analysis results — cannot verify progress; skip
return False, "No analysis results to verify progress"

# Check number of restart attempts
# Check number of restart attempts (tracked as restart_attempt_N/ dirs)
restart_attempts = 0
if os.path.exists(run_dir):
restart_dirs = [d for d in os.listdir(run_dir)
if d.startswith('restart_attempt_')]
restart_attempts = len(restart_dirs)

if restart_attempts >= max_restarts:
return False, f"Max restart attempts reached \
({restart_attempts}/{max_restarts})"
return False, (f"Max restart attempts reached "
f"({restart_attempts}/{max_restarts})")

# If all checks pass and auto_restart is enabled
if not auto_restart:
Expand Down