diff --git a/python/TestHarness/TestHarness.py b/python/TestHarness/TestHarness.py index 0ae971eccda9..78475de55d00 100644 --- a/python/TestHarness/TestHarness.py +++ b/python/TestHarness/TestHarness.py @@ -1154,11 +1154,9 @@ def parseCLArgs(self, argv): # Try to guess the --hpc option if --hpc-host is set if self.options.hpc_host and not self.options.hpc: hpc_host = self.options.hpc_host[0] - if 'sawtooth' in hpc_host or 'lemhi' in hpc_host: - self.options.hpc = 'pbs' - elif 'bitterroot' in hpc_host: - self.options.hpc = 'slurm' - if self.options.hpc: + hpc_config = TestHarness.queryHPCCluster(hpc_host) + if hpc_config is not None: + self.options.hpc = hpc_config.scheduler print(f'INFO: Setting --hpc={self.options.hpc} for known host {hpc_host}') self.options.runtags = [tag for tag in self.options.run.split(',') if tag != ''] @@ -1244,3 +1242,29 @@ def preRun(self): def getOptions(self): return self.options + + # Helper tuple for storing information about a cluster + HPCCluster = namedtuple('HPCCluster', ['scheduler', 'apptainer_modules']) + # The modules that we want to load when running in a non-moduled + # container on INL HPC + inl_modules = ['use.moose', 'moose-dev-container-openmpi/5.0.5_0'] + # Define INL HPC clusters + hpc_configs = {'sawtooth': HPCCluster(scheduler='pbs', + apptainer_modules=inl_modules), + 'bitterroot': HPCCluster(scheduler='slurm', + apptainer_modules=inl_modules)} + + @staticmethod + def queryHPCCluster(hostname: str): + """ + Attempt to get the HPC cluster configuration given a host + + Args: + hostname: The HPC system hostname + Returns: + HPCCluster: The config, if found, otherwise None + """ + for host, config in TestHarness.hpc_configs.items(): + if host in hostname: + return config + return None diff --git a/python/TestHarness/runners/HPCRunner.py b/python/TestHarness/runners/HPCRunner.py index 86015cf2964d..c7cec147e9a8 100644 --- a/python/TestHarness/runners/HPCRunner.py +++ b/python/TestHarness/runners/HPCRunner.py @@ -42,6 +42,11 @@ def spawn(self, timer): self.hpc_job = self.run_hpc.queueJob(self.job) def wait(self, timer): + # Sanity check on having a job + if self.hpc_job is None: + self.job.setStatus(self.job.error, 'HPCRUNNER MISSING HPCJOB') + return + # The states that we should wait on. Anything else should # be an invalid state for waiting wait_states = [self.hpc_job.State.held, @@ -98,8 +103,11 @@ def wait(self, timer): if self.fileIsReady(file): # Store the result if file == result_file: - with open(file, 'r') as f: - result = yaml.safe_load(f) + try: + with open(file, 'r') as f: + result = yaml.safe_load(f) + except: + continue self.exit_code = result['exit_code'] walltime = result['walltime'] diff --git a/python/TestHarness/schedulers/RunHPC.py b/python/TestHarness/schedulers/RunHPC.py index ba2df6001f2f..ab8d5658da64 100644 --- a/python/TestHarness/schedulers/RunHPC.py +++ b/python/TestHarness/schedulers/RunHPC.py @@ -9,12 +9,11 @@ import urllib.parse from RunParallel import RunParallel -import threading, os, re, sys, datetime, shlex, socket, threading, time, urllib, contextlib +import threading, os, re, sys, datetime, shlex, socket, threading, time, urllib, contextlib, copy from enum import Enum -import paramiko -import jinja2 import statistics -import contextlib +from collections import namedtuple + from multiprocessing.pool import ThreadPool from TestHarness import util @@ -85,6 +84,8 @@ class RunHPC(RunParallel): Base scheduler for jobs that are ran on HPC. """ def __init__(self, harness, params): + import paramiko + super().__init__(harness, params) self.params = params @@ -165,14 +166,13 @@ def __init__(self, harness, params): # held in the background without blocking self.submit_job_pool = None if self.options.hpc_no_hold else ThreadPool(processes=10) + # Build the base submission environemnt for a job + self.submit_env, self.app_exec_prefix, self.app_exec_suffix = self.setupRunEnvironment(harness) + if os.environ.get('APPTAINER_CONTAINER'): if not self.ssh_hosts: print('ERROR: --hpc-host must be set when using HPC jobs within apptainer') sys.exit(1) - if not self.options.hpc_pre_source: - default_pre_source = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'hpc_source') - self.options.hpc_pre_source = default_pre_source - print(f'INFO: Setting --hpc-pre-source={default_pre_source}') else: if self.options.hpc_apptainer_bindpath: print('ERROR: --hpc-apptainer-bindpath is unused when not executing with apptainer') @@ -192,13 +192,15 @@ def __init__(self, harness, params): sys.exit(1) # Load the pre-source if it exists - self.source_contents = None + self.pre_source_contents = None if self.options.hpc_pre_source: - self.source_contents = open(self.options.hpc_pre_source, 'r').read() + with open(self.options.hpc_pre_source, 'r') as f: + self.pre_source_contents = f.read() # Load the submission template template_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'hpc_template') - self.submission_template = open(template_path, 'r').read() + with open(template_path, 'r') as f: + self.submission_template = f.read() class CallHPCException(Exception): """ @@ -219,6 +221,8 @@ def _getSSHClient(self, reconnect=False): This is threaded so that we can operate a few connections at once. """ + import paramiko + process = threading.get_ident() with self.ssh_clients_lock: if process not in self.ssh_clients or reconnect: @@ -345,6 +349,8 @@ def submitJob(self, job, hold, lock=True): Returns the resulting HPCJob. """ + import jinja2 + # If we're submitting this Job to be held, but the Job status isn't # currently held, it means that we've hit job in the submit_job_pool # that was submitted previously but has already been set to be skipped @@ -385,45 +391,32 @@ def submitJob(self, job, hold, lock=True): if os.path.exists(file): os.remove(file) - # Add MOOSE's python path for python scripts - moose_python = os.path.abspath(os.path.join(os.path.abspath(os.path.dirname(__file__)), '../..')) - # Start building the jinja environment for the submission script - submission_env = {'SCHEDULER_NAME': self.getHPCSchedulerName(), - 'NAME': self.getHPCJobName(job), - 'CWD': tester.getTestDir(), - 'OUTPUT': output_file, - 'RESULT': result_file, - 'SUBMISSION_SCRIPT': submission_script, - 'WALLTIME': str(datetime.timedelta(seconds=tester.getMaxTime())), - 'PROJECT': self.options.hpc_project, - 'TEST_SPEC': tester.getSpecFile(), - 'TEST_NAME': tester.getTestNameShort(), - 'SUBMITTED_HOSTNAME': socket.gethostname(), - 'MOOSE_PYTHONPATH': moose_python, - 'NUM_PROCS': int(tester.getProcs(options)), - 'NUM_THREADS': int(tester.getThreads(options)), - 'ENDING_COMMENT': self.getOutputEndingComment(f'${self.getHPCJobIDVariable()}'), - 'JOB_ID_VARIABLE': self.getHPCJobIDVariable(), - 'PLACE': tester.getHPCPlace(options)} + submission_env = copy.deepcopy(self.submit_env) + submission_env.update({'NAME': self.getHPCJobName(job), + 'CWD': tester.getTestDir(), + 'OUTPUT': output_file, + 'RESULT': result_file, + 'SUBMISSION_SCRIPT': submission_script, + 'WALLTIME': str(datetime.timedelta(seconds=tester.getMaxTime())), + 'TEST_SPEC': tester.getSpecFile(), + 'TEST_NAME': tester.getTestNameShort(), + 'NUM_PROCS': int(tester.getProcs(options)), + 'NUM_THREADS': int(tester.getThreads(options)), + 'PLACE': tester.getHPCPlace(options)}) if hold: submission_env['HOLD'] = 1 - if self.options.hpc_pre_source: - submission_env['SOURCE_FILE'] = options.hpc_pre_source - if self.source_contents: - submission_env['SOURCE_CONTENTS'] = self.source_contents # Get the unescaped command command = tester.getCommand(options) - # Parse out the mpi command from the command if we're running in apptainer. - # We do this before any of the other escaping - APPTAINER_CONTAINER = os.environ.get('APPTAINER_CONTAINER') - apptainer_command_prefix = '' - if APPTAINER_CONTAINER: + # Parse out the mpi command from the command if we're wrapping + # things around the mpi command + mpi_prefix = '' + if self.app_exec_prefix: mpi_command = self.parseMPICommand(command) if mpi_command: - apptainer_command_prefix = mpi_command + mpi_prefix = mpi_command command = command.replace(mpi_command, '') # Replace newlines, clean up spaces, and encode the command. We encode the @@ -442,8 +435,9 @@ def submitJob(self, job, hold, lock=True): # we need to manipulate the command like such # Original command: # New command: apptainer exec /path/to/image '' + APPTAINER_CONTAINER = os.environ.get('APPTAINER_CONTAINER') if APPTAINER_CONTAINER: - job_command = apptainer_command_prefix + job_command = mpi_prefix # The root filesystem path that we're in so that we can be sure to bind # it into the container, if not already set @@ -451,18 +445,12 @@ def submitJob(self, job, hold, lock=True): bindpath = self.options.hpc_apptainer_bindpath else: bindpath = '/' + os.path.abspath(tester.getTestDir()).split(os.path.sep)[1] - # The apptainer command that will get sandwiched in the middle - apptainer_command = ['apptainer', 'exec', '-B', bindpath] - if self.options.hpc_apptainer_no_home: - apptainer_command.append('--no-home') - apptainer_command.append(APPTAINER_CONTAINER) - apptainer_command = shlex.join(apptainer_command) + submission_env['VARS']['APPTAINER_BINDPATH'] = bindpath + ',${APPTAINER_BINDPATH}' - # Append the apptainer command along with the command to be ran - job_command += f"{apptainer_command} {hpc_run} {command_encoded}" + serial_command = shlex.join(self.app_exec_prefix + self.app_exec_suffix) - # Set that we're using apptainer - submission_env['USING_APPTAINER'] = '1' + # Append the apptainer command along with the command to be ran + job_command += f"{serial_command} {hpc_run} {command_encoded}" # Not in apptainer, so we can just use the escaped command as is else: job_command = f'{hpc_run} {command_encoded}' @@ -801,7 +789,8 @@ def killHPCJobs(self, functor): def killRemaining(self, keyboard=False): """Kills all currently running HPC jobs""" - functor = lambda hpc_job: hpc_job.state not in [hpc_job.State.killed, hpc_job.State.done] + running_states = [HPCJob.State.killed, HPCJob.State.done] + functor = lambda hpc_job: hpc_job is not None and hpc_job.state not in running_states killed_jobs = self.killHPCJobs(functor) if keyboard and killed_jobs: print(f'\nAttempted to kill remaining {killed_jobs} HPC jobs...') @@ -952,8 +941,6 @@ def appendResultFooter(self, stats): def appendResultFileHeader(self): entry = {'scheduler': self.options.hpc, - 'pre_source_file': self.options.hpc_pre_source, - 'pre_source': self.source_contents, 'hosts': self.options.hpc_host if isinstance(self.options.hpc_host, list) else [self.options.hpc_host]} return {'hpc': entry} @@ -971,3 +958,84 @@ def callHPCShouldRetry(self, pool_type, result: str): retry a command given a failure with a certain result. """ return False + + def setupRunEnvironment(self, harness): + """ + Sets up the run environment for all HPC jobs + """ + hpc_cluster = harness.queryHPCCluster(self.ssh_hosts[0]) + + # HPC containerized module that we're in, if any + module_name = os.environ.get('CONTAINER_MODULE_NAME') + # Container that we're in, if any + apptainer_container = os.environ.get('APPTAINER_CONTAINER') + + # Base submission environment + submit_env = {# Name of the scheduler + 'SCHEDULER_NAME': self.getHPCSchedulerName(), + # Project to submit to within the cluster scheduler + 'PROJECT': self.options.hpc_project, + # Ending comment for output files + 'ENDING_COMMENT': self.getOutputEndingComment(f'${self.getHPCJobIDVariable()}'), + # Env var on the compute node that contains the HPC scheduler JOB id + 'JOB_ID_VARIABLE': self.getHPCJobIDVariable(), + # Modules to load + 'LOAD_MODULES': [], + # Environment variables to set before loading the modules + 'PRE_MODULE_VARS': {}, + # Environment variables to set after loading the modules + 'VARS': {}, + # The host the HPC job was submitted from + 'SUBMITTED_HOSTNAME': socket.gethostname(), + # Whether or not we're using apptainer + 'USING_APPTAINER': apptainer_container is not None} + + # The prefix and suffix to wrap around what we're actually + # running within the HPC job + app_exec_prefix = [] + app_exec_suffix = [] + + # --hpc-pre-source contents + if self.options.hpc_pre_source: + submission_env['PRE_SOURCE_FILE'] = self.options.hpc_pre_source + submission_env['PRE_SOURCE_CONTENTS'] = self.source_contents + + # If running on INL HPC, minimize the bindpath; this is a configuration + # option for the moose-dev-container module on INL HPC + if hpc_cluster is not None: + submit_env['PRE_MODULE_VARS']['MOOSE_DEV_CONTAINER_MINIMAL_BINDPATH'] = '1' + + # Pass apptainer options + if self.options.hpc_apptainer_no_home: + submit_env['VARS']['APPTAINER_NO_HOME'] = '1' + + # Add MOOSE's pythonpath + moose_python = os.path.abspath(os.path.join(os.path.abspath(os.path.dirname(__file__)), '../..')) + submit_env['VARS']['PYTHONPATH'] = moose_python + ':${PYTHONPATH}' + + # We're in a loaded container module on INL HPC, use that environment + # and the associated -exec command to run apptainer + if module_name: + assert apptainer_container is not None + module_version = os.environ['CONTAINER_MODULE_VERSION'] + module_exec = os.environ['CONTAINER_MODULE_EXEC'] + required_modules = os.environ.get('CONTAINER_MODULE_REQUIRED_MODULES', '').split(' ') + modules = required_modules + [f'{module_name}/{module_version}'] + submit_env['LOAD_MODULES'] = modules + app_exec_prefix = [module_exec] + # We're in a container without the container module environment, use + # a direct apptainer exec command + elif apptainer_container: + # If on INL HPC, use the wrapped environment + if hpc_cluster is not None: + submit_env['LOAD_MODULES'] = hpc_cluster.apptainer_modules + app_exec_prefix = ['apptainer', 'exec'] + app_exec_suffix = [apptainer_container] + + if submit_env['LOAD_MODULES']: + print(f'INFO: Using modules "{" ".join(submit_env["LOAD_MODULES"])}" for HPC environment') + if app_exec_prefix: + exec_combined = app_exec_prefix + app_exec_suffix + print(f'INFO: Using "{" ".join(exec_combined)}" as HPC execution wrapper') + + return submit_env, app_exec_prefix, app_exec_suffix diff --git a/python/TestHarness/schedulers/Scheduler.py b/python/TestHarness/schedulers/Scheduler.py index b34832c430a1..6c76385915c3 100644 --- a/python/TestHarness/schedulers/Scheduler.py +++ b/python/TestHarness/schedulers/Scheduler.py @@ -528,7 +528,11 @@ def runJob(self, job, jobs): job.setStatus(StatusSystem().finished) with self.activity_lock: - self.__active_jobs.remove(job) + if job in self.__active_jobs: + self.__active_jobs.remove(job) + else: + job.setStatus(StatusSystem().error, 'SCHEDULER ERROR') + job.appendOutput(f'Failed to remove job from active jobs in Scheduler; did not exist') # Not enough slots to run the job... else: diff --git a/python/TestHarness/schedulers/hpc_source b/python/TestHarness/schedulers/hpc_source deleted file mode 100644 index abb65f042821..000000000000 --- a/python/TestHarness/schedulers/hpc_source +++ /dev/null @@ -1,2 +0,0 @@ -export MOOSE_DEV_CONTAINER_MINIMAL_BINDPATH=1 -module load use.moose moose-dev-container-openmpi diff --git a/python/TestHarness/schedulers/hpc_template b/python/TestHarness/schedulers/hpc_template index 730ff8e66184..62bf8ae9dce9 100644 --- a/python/TestHarness/schedulers/hpc_template +++ b/python/TestHarness/schedulers/hpc_template @@ -31,13 +31,19 @@ # Exit on failure set -e -{%- if SOURCE_FILE is defined %} -# Loaded from {{ SOURCE_FILE }} -{{ SOURCE_CONTENTS }} +{%- for key, val in PRE_MODULE_VARS.items() %} +export {{ key }}="{{ val }}" +{%- endfor %} +{%- if LOAD_MODULES is defined %} +module load {{ " ".join(LOAD_MODULES) }} +{%- endif %} +{%- for key, val in VARS.items() %} +export {{ key }}="{{ val }}" +{%- endfor %} +{%- if PRE_SOURCE_FILE is defined %} +# Loaded from {{ PRE_SOURCE_FILE }} +{{ PRE_SOURCE_CONTENTS }} {%- endif %} - -# Add MOOSE's python path for python scripts -export PYTHONPATH={{ MOOSE_PYTHONPATH }}:${PYTHONPATH} # Print a useful header echo "TestHarness {{ SCHEDULER_NAME }} job on $(hostname) in job ${{ JOB_ID_VARIABLE }}" @@ -101,6 +107,22 @@ if ((return_code > 128)); then echo "Apptainer exited with code $return_code, using $new_return_code instead" return_code=$new_return_code fi +# If we're using --sharens, make sure the instance is dead +if [ "$APPTAINER_SHARENS" == "1" ] && [ -n "$APPTAINER_CONFIGDIR" ]; then + apptainer instance list + set +e + instance_list=$(apptainer instance list | tail -n +2) + if [ -n "$instance_list" ]; then + instance_name=$(echo "$instance_list" | awk '{print $1}') + instance_pid=$(echo "$instance_list" | awk '{print $2}') + echo "Killing running apptainer instance \"${instance_name}\" in ${instance_pid}" + apptainer instance stop ${instance_name} + if ps -p $instance_pid > /dev/null; then + kill -9 $instance_pid + fi + fi + set -e +fi {%- endif %} # Load the execution time; we use a tail here because the process will # include a comment about a non-zero status first if the exit code is nonzero