Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
compass run -w /work/restart
"""

import configparser
import glob
import json
import os
Expand Down Expand Up @@ -52,14 +53,23 @@ def configure(self):

This method:
1. Reads the spinup ensemble directory
2. Checks analysis results to identify incomplete runs
3. Creates RestartMember steps for runs needing continuation
4. Sets up ensemble_manager to handle job submission
2. Optionally loads an analysis_summary_file to pre-filter candidates
3. Checks per-run analysis results to identify incomplete runs
4. Creates RestartMember steps for runs needing continuation
5. Sets up ensemble_manager to handle job submission
"""
config = self.config
section = config.get('restart_ensemble', {})

spinup_work_dir = section.get('spinup_work_dir')
# Read required spinup_work_dir
try:
spinup_work_dir = config.get('restart_ensemble', 'spinup_work_dir')
except (configparser.NoSectionError, configparser.NoOptionError):
raise ValueError(
"restart_ensemble config must specify spinup_work_dir\n"
"Add to config file:\n"
"[restart_ensemble]\n"
"spinup_work_dir = /path/to/spinup/ensemble"
)

if not spinup_work_dir:
raise ValueError(
Expand All @@ -72,12 +82,47 @@ def configure(self):
if not os.path.exists(spinup_work_dir):
raise ValueError(f"spinup_work_dir not found: {spinup_work_dir}")

# Get restart configuration
max_consecutive_restarts = section.getint(
'max_consecutive_restarts', 3)
min_simulation_years = section.getfloat(
'min_simulation_years_before_restart', 50.0)
auto_restart = section.getboolean('auto_restart_incomplete', True)
# Read optional analysis_summary_file
analysis_summary_file = None
try:
val = config.get('restart_ensemble', 'analysis_summary_file')
if val and val.lower() not in ('none', ''):
analysis_summary_file = val
except (configparser.NoSectionError, configparser.NoOptionError):
pass

# If provided, validate it exists
if analysis_summary_file and not os.path.exists(analysis_summary_file):
raise ValueError(
f"analysis_summary_file not found: {analysis_summary_file}")

# Load the set of run numbers flagged for restart from the summary file
summary_restart_runs = None
if analysis_summary_file:
with open(analysis_summary_file, 'r') as f:
summary = json.load(f)
summary_restart_runs = set(summary.get('restart_needed_runs', []))
print(f"Loaded {len(summary_restart_runs)} restart candidates "
f"from {analysis_summary_file}")

# Read optional restart configuration
try:
max_consecutive_restarts = config.getint(
'restart_ensemble', 'max_consecutive_restarts')
except (configparser.NoSectionError, configparser.NoOptionError):
max_consecutive_restarts = 3

try:
min_simulation_years = config.getfloat(
'restart_ensemble', 'min_simulation_years_before_restart')
except (configparser.NoSectionError, configparser.NoOptionError):
min_simulation_years = 50.0

try:
auto_restart = config.getboolean(
'restart_ensemble', 'auto_restart_incomplete')
except (configparser.NoSectionError, configparser.NoOptionError):
auto_restart = True

# Scan for existing run directories
run_dirs = sorted(glob.glob(os.path.join(spinup_work_dir, 'run*')))
Expand All @@ -92,7 +137,13 @@ def configure(self):
except ValueError:
continue

# Check if run should be restarted
# If a summary file was provided, only consider runs listed there
if summary_restart_runs is not None and \
run_num not in summary_restart_runs:
skipped_runs.append(
(run_num, "Not in analysis_summary restart list"))
continue

should_restart, reason = self._should_restart_run(
run_dir=run_dir,
run_num=run_num,
Expand All @@ -105,7 +156,6 @@ def configure(self):
restart_runs.append(run_num)
print(f"Scheduling restart for {run_name}")

# Add restart member step
self.add_step(InPlaceRestartMember(
test_case=self,
run_num=run_num,
Expand Down