Skip to content

Commit

Permalink
Merge pull request #29035 from loganharbour/test_harness_module
Browse files Browse the repository at this point in the history
Enable HPC scheduler to work with INL HPC containerized modules
  • Loading branch information
loganharbour authored Nov 30, 2024
2 parents 4928830 + 6f6d970 commit cedfd05
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 71 deletions.
34 changes: 29 additions & 5 deletions python/TestHarness/TestHarness.py
Original file line number Diff line number Diff line change
Expand Up @@ -1154,11 +1154,9 @@ def parseCLArgs(self, argv):
# Try to guess the --hpc option if --hpc-host is set
if self.options.hpc_host and not self.options.hpc:
hpc_host = self.options.hpc_host[0]
if 'sawtooth' in hpc_host or 'lemhi' in hpc_host:
self.options.hpc = 'pbs'
elif 'bitterroot' in hpc_host:
self.options.hpc = 'slurm'
if self.options.hpc:
hpc_config = TestHarness.queryHPCCluster(hpc_host)
if hpc_config is not None:
self.options.hpc = hpc_config.scheduler
print(f'INFO: Setting --hpc={self.options.hpc} for known host {hpc_host}')

self.options.runtags = [tag for tag in self.options.run.split(',') if tag != '']
Expand Down Expand Up @@ -1244,3 +1242,29 @@ def preRun(self):

def getOptions(self):
return self.options

# Helper tuple for storing information about a cluster
HPCCluster = namedtuple('HPCCluster', ['scheduler', 'apptainer_modules'])
# The modules that we want to load when running in a non-moduled
# container on INL HPC
inl_modules = ['use.moose', 'moose-dev-container-openmpi/5.0.5_0']
# Define INL HPC clusters
hpc_configs = {'sawtooth': HPCCluster(scheduler='pbs',
apptainer_modules=inl_modules),
'bitterroot': HPCCluster(scheduler='slurm',
apptainer_modules=inl_modules)}

@staticmethod
def queryHPCCluster(hostname: str):
"""
Attempt to get the HPC cluster configuration given a host
Args:
hostname: The HPC system hostname
Returns:
HPCCluster: The config, if found, otherwise None
"""
for host, config in TestHarness.hpc_configs.items():
if host in hostname:
return config
return None
12 changes: 10 additions & 2 deletions python/TestHarness/runners/HPCRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ def spawn(self, timer):
self.hpc_job = self.run_hpc.queueJob(self.job)

def wait(self, timer):
# Sanity check on having a job
if self.hpc_job is None:
self.job.setStatus(self.job.error, 'HPCRUNNER MISSING HPCJOB')
return

# The states that we should wait on. Anything else should
# be an invalid state for waiting
wait_states = [self.hpc_job.State.held,
Expand Down Expand Up @@ -98,8 +103,11 @@ def wait(self, timer):
if self.fileIsReady(file):
# Store the result
if file == result_file:
with open(file, 'r') as f:
result = yaml.safe_load(f)
try:
with open(file, 'r') as f:
result = yaml.safe_load(f)
except:
continue
self.exit_code = result['exit_code']
walltime = result['walltime']

Expand Down
178 changes: 123 additions & 55 deletions python/TestHarness/schedulers/RunHPC.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@

import urllib.parse
from RunParallel import RunParallel
import threading, os, re, sys, datetime, shlex, socket, threading, time, urllib, contextlib
import threading, os, re, sys, datetime, shlex, socket, threading, time, urllib, contextlib, copy
from enum import Enum
import paramiko
import jinja2
import statistics
import contextlib
from collections import namedtuple

from multiprocessing.pool import ThreadPool
from TestHarness import util

Expand Down Expand Up @@ -85,6 +84,8 @@ class RunHPC(RunParallel):
Base scheduler for jobs that are ran on HPC.
"""
def __init__(self, harness, params):
import paramiko

super().__init__(harness, params)

self.params = params
Expand Down Expand Up @@ -165,14 +166,13 @@ def __init__(self, harness, params):
# held in the background without blocking
self.submit_job_pool = None if self.options.hpc_no_hold else ThreadPool(processes=10)

# Build the base submission environemnt for a job
self.submit_env, self.app_exec_prefix, self.app_exec_suffix = self.setupRunEnvironment(harness)

if os.environ.get('APPTAINER_CONTAINER'):
if not self.ssh_hosts:
print('ERROR: --hpc-host must be set when using HPC jobs within apptainer')
sys.exit(1)
if not self.options.hpc_pre_source:
default_pre_source = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'hpc_source')
self.options.hpc_pre_source = default_pre_source
print(f'INFO: Setting --hpc-pre-source={default_pre_source}')
else:
if self.options.hpc_apptainer_bindpath:
print('ERROR: --hpc-apptainer-bindpath is unused when not executing with apptainer')
Expand All @@ -192,13 +192,15 @@ def __init__(self, harness, params):
sys.exit(1)

# Load the pre-source if it exists
self.source_contents = None
self.pre_source_contents = None
if self.options.hpc_pre_source:
self.source_contents = open(self.options.hpc_pre_source, 'r').read()
with open(self.options.hpc_pre_source, 'r') as f:
self.pre_source_contents = f.read()

# Load the submission template
template_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'hpc_template')
self.submission_template = open(template_path, 'r').read()
with open(template_path, 'r') as f:
self.submission_template = f.read()

class CallHPCException(Exception):
"""
Expand All @@ -219,6 +221,8 @@ def _getSSHClient(self, reconnect=False):
This is threaded so that we can operate a few connections at once.
"""
import paramiko

process = threading.get_ident()
with self.ssh_clients_lock:
if process not in self.ssh_clients or reconnect:
Expand Down Expand Up @@ -345,6 +349,8 @@ def submitJob(self, job, hold, lock=True):
Returns the resulting HPCJob.
"""
import jinja2

# If we're submitting this Job to be held, but the Job status isn't
# currently held, it means that we've hit job in the submit_job_pool
# that was submitted previously but has already been set to be skipped
Expand Down Expand Up @@ -385,45 +391,32 @@ def submitJob(self, job, hold, lock=True):
if os.path.exists(file):
os.remove(file)

# Add MOOSE's python path for python scripts
moose_python = os.path.abspath(os.path.join(os.path.abspath(os.path.dirname(__file__)), '../..'))

# Start building the jinja environment for the submission script
submission_env = {'SCHEDULER_NAME': self.getHPCSchedulerName(),
'NAME': self.getHPCJobName(job),
'CWD': tester.getTestDir(),
'OUTPUT': output_file,
'RESULT': result_file,
'SUBMISSION_SCRIPT': submission_script,
'WALLTIME': str(datetime.timedelta(seconds=tester.getMaxTime())),
'PROJECT': self.options.hpc_project,
'TEST_SPEC': tester.getSpecFile(),
'TEST_NAME': tester.getTestNameShort(),
'SUBMITTED_HOSTNAME': socket.gethostname(),
'MOOSE_PYTHONPATH': moose_python,
'NUM_PROCS': int(tester.getProcs(options)),
'NUM_THREADS': int(tester.getThreads(options)),
'ENDING_COMMENT': self.getOutputEndingComment(f'${self.getHPCJobIDVariable()}'),
'JOB_ID_VARIABLE': self.getHPCJobIDVariable(),
'PLACE': tester.getHPCPlace(options)}
submission_env = copy.deepcopy(self.submit_env)
submission_env.update({'NAME': self.getHPCJobName(job),
'CWD': tester.getTestDir(),
'OUTPUT': output_file,
'RESULT': result_file,
'SUBMISSION_SCRIPT': submission_script,
'WALLTIME': str(datetime.timedelta(seconds=tester.getMaxTime())),
'TEST_SPEC': tester.getSpecFile(),
'TEST_NAME': tester.getTestNameShort(),
'NUM_PROCS': int(tester.getProcs(options)),
'NUM_THREADS': int(tester.getThreads(options)),
'PLACE': tester.getHPCPlace(options)})
if hold:
submission_env['HOLD'] = 1
if self.options.hpc_pre_source:
submission_env['SOURCE_FILE'] = options.hpc_pre_source
if self.source_contents:
submission_env['SOURCE_CONTENTS'] = self.source_contents

# Get the unescaped command
command = tester.getCommand(options)

# Parse out the mpi command from the command if we're running in apptainer.
# We do this before any of the other escaping
APPTAINER_CONTAINER = os.environ.get('APPTAINER_CONTAINER')
apptainer_command_prefix = ''
if APPTAINER_CONTAINER:
# Parse out the mpi command from the command if we're wrapping
# things around the mpi command
mpi_prefix = ''
if self.app_exec_prefix:
mpi_command = self.parseMPICommand(command)
if mpi_command:
apptainer_command_prefix = mpi_command
mpi_prefix = mpi_command
command = command.replace(mpi_command, '')

# Replace newlines, clean up spaces, and encode the command. We encode the
Expand All @@ -442,27 +435,22 @@ def submitJob(self, job, hold, lock=True):
# we need to manipulate the command like such
# Original command: <mpiexec ...> </path/to/binary ...>
# New command: <mpiexec ...> apptainer exec /path/to/image '</path/to/binary ...>'
APPTAINER_CONTAINER = os.environ.get('APPTAINER_CONTAINER')
if APPTAINER_CONTAINER:
job_command = apptainer_command_prefix
job_command = mpi_prefix

# The root filesystem path that we're in so that we can be sure to bind
# it into the container, if not already set
if self.options.hpc_apptainer_bindpath:
bindpath = self.options.hpc_apptainer_bindpath
else:
bindpath = '/' + os.path.abspath(tester.getTestDir()).split(os.path.sep)[1]
# The apptainer command that will get sandwiched in the middle
apptainer_command = ['apptainer', 'exec', '-B', bindpath]
if self.options.hpc_apptainer_no_home:
apptainer_command.append('--no-home')
apptainer_command.append(APPTAINER_CONTAINER)
apptainer_command = shlex.join(apptainer_command)
submission_env['VARS']['APPTAINER_BINDPATH'] = bindpath + ',${APPTAINER_BINDPATH}'

# Append the apptainer command along with the command to be ran
job_command += f"{apptainer_command} {hpc_run} {command_encoded}"
serial_command = shlex.join(self.app_exec_prefix + self.app_exec_suffix)

# Set that we're using apptainer
submission_env['USING_APPTAINER'] = '1'
# Append the apptainer command along with the command to be ran
job_command += f"{serial_command} {hpc_run} {command_encoded}"
# Not in apptainer, so we can just use the escaped command as is
else:
job_command = f'{hpc_run} {command_encoded}'
Expand Down Expand Up @@ -801,7 +789,8 @@ def killHPCJobs(self, functor):

def killRemaining(self, keyboard=False):
"""Kills all currently running HPC jobs"""
functor = lambda hpc_job: hpc_job.state not in [hpc_job.State.killed, hpc_job.State.done]
running_states = [HPCJob.State.killed, HPCJob.State.done]
functor = lambda hpc_job: hpc_job is not None and hpc_job.state not in running_states
killed_jobs = self.killHPCJobs(functor)
if keyboard and killed_jobs:
print(f'\nAttempted to kill remaining {killed_jobs} HPC jobs...')
Expand Down Expand Up @@ -952,8 +941,6 @@ def appendResultFooter(self, stats):

def appendResultFileHeader(self):
entry = {'scheduler': self.options.hpc,
'pre_source_file': self.options.hpc_pre_source,
'pre_source': self.source_contents,
'hosts': self.options.hpc_host if isinstance(self.options.hpc_host, list) else [self.options.hpc_host]}
return {'hpc': entry}

Expand All @@ -971,3 +958,84 @@ def callHPCShouldRetry(self, pool_type, result: str):
retry a command given a failure with a certain result.
"""
return False

def setupRunEnvironment(self, harness):
"""
Sets up the run environment for all HPC jobs
"""
hpc_cluster = harness.queryHPCCluster(self.ssh_hosts[0])

# HPC containerized module that we're in, if any
module_name = os.environ.get('CONTAINER_MODULE_NAME')
# Container that we're in, if any
apptainer_container = os.environ.get('APPTAINER_CONTAINER')

# Base submission environment
submit_env = {# Name of the scheduler
'SCHEDULER_NAME': self.getHPCSchedulerName(),
# Project to submit to within the cluster scheduler
'PROJECT': self.options.hpc_project,
# Ending comment for output files
'ENDING_COMMENT': self.getOutputEndingComment(f'${self.getHPCJobIDVariable()}'),
# Env var on the compute node that contains the HPC scheduler JOB id
'JOB_ID_VARIABLE': self.getHPCJobIDVariable(),
# Modules to load
'LOAD_MODULES': [],
# Environment variables to set before loading the modules
'PRE_MODULE_VARS': {},
# Environment variables to set after loading the modules
'VARS': {},
# The host the HPC job was submitted from
'SUBMITTED_HOSTNAME': socket.gethostname(),
# Whether or not we're using apptainer
'USING_APPTAINER': apptainer_container is not None}

# The prefix and suffix to wrap around what we're actually
# running within the HPC job
app_exec_prefix = []
app_exec_suffix = []

# --hpc-pre-source contents
if self.options.hpc_pre_source:
submission_env['PRE_SOURCE_FILE'] = self.options.hpc_pre_source
submission_env['PRE_SOURCE_CONTENTS'] = self.source_contents

# If running on INL HPC, minimize the bindpath; this is a configuration
# option for the moose-dev-container module on INL HPC
if hpc_cluster is not None:
submit_env['PRE_MODULE_VARS']['MOOSE_DEV_CONTAINER_MINIMAL_BINDPATH'] = '1'

# Pass apptainer options
if self.options.hpc_apptainer_no_home:
submit_env['VARS']['APPTAINER_NO_HOME'] = '1'

# Add MOOSE's pythonpath
moose_python = os.path.abspath(os.path.join(os.path.abspath(os.path.dirname(__file__)), '../..'))
submit_env['VARS']['PYTHONPATH'] = moose_python + ':${PYTHONPATH}'

# We're in a loaded container module on INL HPC, use that environment
# and the associated <container>-exec command to run apptainer
if module_name:
assert apptainer_container is not None
module_version = os.environ['CONTAINER_MODULE_VERSION']
module_exec = os.environ['CONTAINER_MODULE_EXEC']
required_modules = os.environ.get('CONTAINER_MODULE_REQUIRED_MODULES', '').split(' ')
modules = required_modules + [f'{module_name}/{module_version}']
submit_env['LOAD_MODULES'] = modules
app_exec_prefix = [module_exec]
# We're in a container without the container module environment, use
# a direct apptainer exec command
elif apptainer_container:
# If on INL HPC, use the wrapped environment
if hpc_cluster is not None:
submit_env['LOAD_MODULES'] = hpc_cluster.apptainer_modules
app_exec_prefix = ['apptainer', 'exec']
app_exec_suffix = [apptainer_container]

if submit_env['LOAD_MODULES']:
print(f'INFO: Using modules "{" ".join(submit_env["LOAD_MODULES"])}" for HPC environment')
if app_exec_prefix:
exec_combined = app_exec_prefix + app_exec_suffix
print(f'INFO: Using "{" ".join(exec_combined)}" as HPC execution wrapper')

return submit_env, app_exec_prefix, app_exec_suffix
6 changes: 5 additions & 1 deletion python/TestHarness/schedulers/Scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,11 @@ def runJob(self, job, jobs):
job.setStatus(StatusSystem().finished)

with self.activity_lock:
self.__active_jobs.remove(job)
if job in self.__active_jobs:
self.__active_jobs.remove(job)
else:
job.setStatus(StatusSystem().error, 'SCHEDULER ERROR')
job.appendOutput(f'Failed to remove job from active jobs in Scheduler; did not exist')

# Not enough slots to run the job...
else:
Expand Down
2 changes: 0 additions & 2 deletions python/TestHarness/schedulers/hpc_source

This file was deleted.

Loading

0 comments on commit cedfd05

Please sign in to comment.