diff --git a/compass/landice/tests/ensemble_generator/sgh_restart_ensemble/ensemble_generator.cfg b/compass/landice/tests/ensemble_generator/sgh_restart_ensemble/ensemble_generator.cfg index b75c6735f3..b2d0be93de 100644 --- a/compass/landice/tests/ensemble_generator/sgh_restart_ensemble/ensemble_generator.cfg +++ b/compass/landice/tests/ensemble_generator/sgh_restart_ensemble/ensemble_generator.cfg @@ -6,19 +6,22 @@ # # To create a restart ensemble after running spinup: # -# 1. Analyze the completed spinup ensemble: -# from compass.landice.tests.ensemble_generator.ensemble_templates.sgh_ensemble.analysis import analyze_ensemble -# analyze_ensemble('/work/ensemble/spinup_ensemble', '/work/config.cfg') +# 1. Run the sgh_ensemble_analysis test case to produce analysis_summary.json +# compass setup -t landice/ensemble_generator/sgh_ensemble_analysis \ +# -w /work/analysis -f analysis.cfg +# compass run -w /work/analysis # # 2. Schedule restarts: -# from compass.landice.tests.ensemble_generator.ensemble_templates.sgh_ensemble.restart import schedule_restarts +# from compass.landice.tests.ensemble_generator.sgh_restart_ensemble \ +# .restart_scheduler import schedule_restarts # config_file, runs = schedule_restarts( -# '/work/ensemble/spinup_ensemble/analysis_summary.json', +# '/work/analysis/sgh_ensemble_analysis/analyze_ensemble/analysis_summary.json', # '/work/restart_ensemble' # ) # # 3. Set up and run the restart ensemble: -# compass setup -t restart_ensemble -w /work/restart_ensemble -f +# compass setup -t landice/ensemble_generator/sgh_restart_ensemble \ +# -w /work/restart_ensemble -f # compass run -w /work/restart_ensemble [ensemble_generator] @@ -31,6 +34,15 @@ ensemble_template = sgh_ensemble # the run000, run001, etc. subdirectories to be restarted spinup_work_dir = REPLACE_WITH_SPINUP_WORK_DIR +# Path to the analysis_summary.json produced by sgh_ensemble_analysis +# This file contains per-run steady-state and validation results used to +# determine which runs need restarting. +# Typical path: /sgh_ensemble_analysis/analyze_ensemble/ +# analysis_summary.json +# Set to None (default) if not available — runs will be skipped with +# "No analysis results to verify progress" +analysis_summary_file = None + # Maximum consecutive restart attempts per run # After this many restarts, the run will not be restarted again # (prevents infinite loops if a run keeps failing) diff --git a/compass/landice/tests/ensemble_generator/sgh_restart_ensemble/restart_member.py b/compass/landice/tests/ensemble_generator/sgh_restart_ensemble/restart_member.py index 2e401bea1c..dba0df7b91 100644 --- a/compass/landice/tests/ensemble_generator/sgh_restart_ensemble/restart_member.py +++ b/compass/landice/tests/ensemble_generator/sgh_restart_ensemble/restart_member.py @@ -108,6 +108,21 @@ def setup(self): print(f'Setting config_do_restart = .true. in {namelist_path}') _set_restart_in_namelist(namelist_path) + # Create a restart_attempt_N/ directory to track how many restarts + # have been attempted for this run. configure() counts these dirs + # to enforce max_consecutive_restarts. List the directory once to + # find the highest existing attempt number, then create the next one. + existing_nums = [ + int(d[len('restart_attempt_'):]) + for d in os.listdir(run_dir) + if d.startswith('restart_attempt_') and + d[len('restart_attempt_'):].isdigit() + ] + attempt_num = max(existing_nums, default=0) + 1 + attempt_dir = os.path.join(run_dir, f'restart_attempt_{attempt_num}') + os.makedirs(attempt_dir, exist_ok=True) + print(f'Tracking restart attempt {attempt_num} in {attempt_dir}') + # Register MALI executable so compass knows this step needs the model self.add_model_as_input() diff --git a/compass/landice/tests/ensemble_generator/sgh_restart_ensemble/restart_scheduler.py b/compass/landice/tests/ensemble_generator/sgh_restart_ensemble/restart_scheduler.py index 9aaecb6628..05bd8f4220 100644 --- a/compass/landice/tests/ensemble_generator/sgh_restart_ensemble/restart_scheduler.py +++ b/compass/landice/tests/ensemble_generator/sgh_restart_ensemble/restart_scheduler.py @@ -149,6 +149,9 @@ def create_config_file(self, restart_runs, base_config_file=None): # Path to the spinup ensemble to restart from spinup_work_dir = {self.original_ensemble_dir} +# Path to the analysis_summary.json from sgh_ensemble_analysis +analysis_summary_file = {self.summary_file} + # Restart configuration # Maximum consecutive restart attempts per run (prevents infinite loops) max_consecutive_restarts = 3 @@ -242,12 +245,12 @@ def schedule_restarts( Examples -------- - >>> from compass.landice.tests.ensemble_generator. - ensemble_templates.sgh_ensemble.restart - import schedule_restarts + >>> from compass.landice.tests.ensemble_generator.sgh_restart_ensemble \ + ... .restart_scheduler import schedule_restarts >>> >>> config_file, restart_runs = schedule_restarts( - ... '/work/ensemble1/spinup_ensemble/analysis_summary.json', + ... '/work/analysis/sgh_ensemble_analysis/analyze_ensemble/' + ... 'analysis_summary.json', ... '/work/ensemble2', ... min_years=50.0, ... max_attempts=3 diff --git a/compass/landice/tests/ensemble_generator/sgh_restart_ensemble/test_case.py b/compass/landice/tests/ensemble_generator/sgh_restart_ensemble/test_case.py index 16388c1dd0..efeb02beaf 100644 --- a/compass/landice/tests/ensemble_generator/sgh_restart_ensemble/test_case.py +++ b/compass/landice/tests/ensemble_generator/sgh_restart_ensemble/test_case.py @@ -15,6 +15,7 @@ import json import os +import compass.namelist from compass.landice.tests.ensemble_generator.ensemble_manager import ( EnsembleManager, ) @@ -57,7 +58,14 @@ def configure(self): 4. Sets up ensemble_manager to handle job submission """ config = self.config - section = config.get('restart_ensemble', {}) + # Load shipped defaults before reading any options so that the + # compass config infrastructure (MpasConfigParser) can resolve them + # without needing fallback= kwargs. + self.config.add_from_package( + 'compass.landice.tests.ensemble_generator.sgh_restart_ensemble', + 'ensemble_generator.cfg') + # Bug fix: use dict-style access to get a SectionProxy, not .get() + section = config['restart_ensemble'] spinup_work_dir = section.get('spinup_work_dir') @@ -73,11 +81,26 @@ def configure(self): raise ValueError(f"spinup_work_dir not found: {spinup_work_dir}") # Get restart configuration - max_consecutive_restarts = section.getint( - 'max_consecutive_restarts', 3) + max_consecutive_restarts = section.getint('max_consecutive_restarts') min_simulation_years = section.getfloat( - 'min_simulation_years_before_restart', 50.0) - auto_restart = section.getboolean('auto_restart_incomplete', True) + 'min_simulation_years_before_restart') + auto_restart = section.getboolean('auto_restart_incomplete') + + # Load per-run analysis results from analysis_summary.json if provided. + # The sgh_ensemble_analysis test case writes this file; it contains + # an 'individual_results' dict keyed by run number (strings in JSON). + analysis_summary = {} + analysis_summary_file = section.get('analysis_summary_file') + if analysis_summary_file and \ + analysis_summary_file.lower() != 'none': + if not os.path.exists(analysis_summary_file): + raise ValueError( + "analysis_summary_file not found: " + f"{analysis_summary_file}") + with open(analysis_summary_file, 'r') as f: + summary = json.load(f) + # individual_results keys are ints in Python but strings in JSON + analysis_summary = summary.get('individual_results', {}) # Scan for existing run directories run_dirs = sorted(glob.glob(os.path.join(spinup_work_dir, 'run*'))) @@ -92,13 +115,17 @@ def configure(self): except ValueError: continue + # Look up per-run results from the summary (JSON int keys → str) + run_results = analysis_summary.get(str(run_num)) + # Check if run should be restarted should_restart, reason = self._should_restart_run( run_dir=run_dir, run_num=run_num, min_years=min_simulation_years, max_restarts=max_consecutive_restarts, - auto_restart=auto_restart + auto_restart=auto_restart, + run_results=run_results ) if should_restart: @@ -131,7 +158,8 @@ def _should_restart_run( run_num, min_years, max_restarts, - auto_restart): + auto_restart, + run_results=None): """ Determine if a run should be restarted. @@ -152,6 +180,11 @@ def _should_restart_run( auto_restart : bool Whether to automatically restart incomplete runs + run_results : dict or None + Per-run results dict from ``individual_results[run_num]`` in + ``analysis_summary.json``. When *None* the run cannot be + verified and will be skipped. + Returns ------- tuple @@ -174,7 +207,6 @@ def _should_restart_run( with open(restart_timestamp_file, 'r') as f: current_time = f.read().strip() - import compass.namelist namelist = compass.namelist.ingest(namelist_file) stop_time = \ namelist['time_management']['config_stop_time'].strip( @@ -186,37 +218,27 @@ def _should_restart_run( except Exception as e: return False, f"Error reading completion status: {e}" - # Check analysis results - analysis_file = os.path.join(run_dir, 'analysis_results.json') + # Check analysis results supplied from analysis_summary.json. + # run_results is the per-run dict from individual_results[run_num]. + if run_results is not None: + ss_info = run_results.get('steady_state') or {} - if os.path.exists(analysis_file): - try: - with open(analysis_file, 'r') as f: - results = json.load(f) - - # If at steady state, don't restart - ss_info = results.get('steady_state', {}) - if ss_info.get('is_steady_state', False): - return False, "Already at steady state" - - # Check simulation length - metrics = ss_info.get('metrics', {}) - sim_length = metrics.get('final_year', 0.0) - - if sim_length < min_years: - return False, f"Too short ({ - sim_length:.1f} < { - min_years:.1f} yrs)" - - except (json.JSONDecodeError, IOError): - # If analysis file is malformed, still allow restart - pass + # If at steady state, don't restart + if ss_info.get('is_steady_state', False): + return False, "Already at steady state" + + # Check simulation length + metrics = ss_info.get('metrics') or {} + sim_length = metrics.get('final_year', 0.0) + + if sim_length < min_years: + return False, (f"Too short " + f"({sim_length:.1f} < {min_years:.1f} yrs)") else: - # No analysis file - if we can't verify it reached min years, don't - # restart + # No analysis results — cannot verify progress; skip return False, "No analysis results to verify progress" - # Check number of restart attempts + # Check number of restart attempts (tracked as restart_attempt_N/ dirs) restart_attempts = 0 if os.path.exists(run_dir): restart_dirs = [d for d in os.listdir(run_dir) @@ -224,8 +246,8 @@ def _should_restart_run( restart_attempts = len(restart_dirs) if restart_attempts >= max_restarts: - return False, f"Max restart attempts reached \ - ({restart_attempts}/{max_restarts})" + return False, (f"Max restart attempts reached " + f"({restart_attempts}/{max_restarts})") # If all checks pass and auto_restart is enabled if not auto_restart: