Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions scripts/gpu_heartbeat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""GPU heartbeat: keeps utilization above threshold to prevent job reclamation.

Monitors GPU utilization via nvidia-smi and performs matrix multiplications
when utilization drops below THRESHOLD. Steps aside when real training is active.
"""

import subprocess
import time
import torch

THRESHOLD = 65 # percent GPU utilization to maintain
CHECK_INTERVAL = 0.05 # seconds between checks
N = 6144 # matrix size for dummy work
BURST_ITERATIONS = 60 # number of matmuls per burst

Comment on lines +11 to +15
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default heartbeat workload allocates two 6144x6144 FP32 matrices (plus outputs), which can consume ~450MB+ of GPU memory and may be too large for smaller GPUs or memory-constrained jobs. Consider making N, THRESHOLD, and BURST_ITERATIONS configurable via CLI flags/env vars (with a safer default), so users can tune the heartbeat to their hardware and policies.

Copilot uses AI. Check for mistakes.

def get_gpu_utilization():
try:
result = subprocess.run(
["nvidia-smi", "--query-gpu=utilization.gpu", "--format=csv,noheader,nounits"],
capture_output=True,
text=True,
timeout=5,
)
return int(result.stdout.strip().split("\n")[0])
except Exception:
return 100 # assume busy if query fails


def main():
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main() assumes CUDA is available and will crash with a RuntimeError on CPU-only nodes or if the job was submitted without GPUs. If --heartbeat can be enabled independently of GPU allocation, it would be safer for the heartbeat script to check torch.cuda.is_available() (and/or CUDA_VISIBLE_DEVICES) and exit with a clear message or no-op when no GPU is present.

Suggested change
def main():
def main():
if not torch.cuda.is_available():
print("GPU heartbeat skipped: CUDA is not available.")
return

Copilot uses AI. Check for mistakes.
device = torch.device("cuda")
x = torch.randn(N, N, device=device)
y = torch.randn(N, N, device=device)
Comment on lines +17 to +33
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_gpu_utilization() only reads the first line of nvidia-smi output, and main() always uses torch.device('cuda') (i.e., the first visible GPU). For multi-GPU jobs, other GPUs can still sit at 0% utilization and may still trigger reclamation depending on policy. Consider querying/monitoring all visible GPUs and either keeping each above the threshold or using the minimum utilization across them to decide when to generate load.

Copilot uses AI. Check for mistakes.

print(f"GPU heartbeat started (threshold={THRESHOLD}%, matrix={N}x{N})")

while True:
util = get_gpu_utilization()
if util < THRESHOLD:
for _ in range(BURST_ITERATIONS):
torch.mm(x, y)
torch.cuda.synchronize()
else:
time.sleep(CHECK_INTERVAL)


if __name__ == "__main__":
main()
155 changes: 155 additions & 0 deletions scripts/rebuild_on_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""Submit a SLURM job to rebuild the PufferDrive C extension inside the Singularity container.

Avoids the nested quoting hell of sbatch --wrap by writing a standalone bash script
to a temp location and sbatch-ing that file. The script runs `setup.py build_ext`
inside the container overlay where torch is installed.

Example:
python scripts/rebuild_on_cluster.py
python scripts/rebuild_on_cluster.py --account torch_pr_924_general
python scripts/rebuild_on_cluster.py --project-root /scratch/$USER/code/PufferDrive --wait
"""

import argparse
import os
import subprocess
import sys
import time


DEFAULT_IMAGE = "/share/apps/images/cuda12.8.1-cudnn9.8.0-ubuntu24.04.2.sif"


def parse_args():
user = os.environ.get("USER", "")
parser = argparse.ArgumentParser(description="Rebuild PufferDrive C extension on SLURM cluster")
parser.add_argument("--account", default="torch_pr_924_general", help="SLURM account")
parser.add_argument("--user", default=user, help="Cluster username (default: $USER)")
parser.add_argument(
"--project-root",
default=None,
help="Path to PufferDrive on the cluster (default: /scratch/<user>/code/PufferDrive)",
)
parser.add_argument(
"--overlay",
default=None,
help="Singularity overlay path (default: /scratch/<user>/images/PufferDrive/overlay-15GB-500K.ext3)",
)
parser.add_argument("--image", default=DEFAULT_IMAGE, help="Singularity image path")
parser.add_argument("--time", default="15", help="SLURM time limit in minutes")
parser.add_argument("--mem", default="16gb", help="SLURM memory")
parser.add_argument("--cpus", default="8", help="SLURM cpus-per-task")
parser.add_argument("--wait", action="store_true", help="Poll until the job finishes and print its log")
parser.add_argument("--dry", action="store_true", help="Print the script and sbatch command without submitting")
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--dry help text says it prints "the script and sbatch command without submitting", but the dry-run branch currently only prints the script plus destination/log paths and never prints the actual sbatch ... command that would be executed. Either print sbatch_cmd in the dry-run output or adjust the flag help text.

Suggested change
parser.add_argument("--dry", action="store_true", help="Print the script and sbatch command without submitting")
parser.add_argument("--dry", action="store_true", help="Print the script, destination, and log paths without submitting")

Copilot uses AI. Check for mistakes.
return parser.parse_args()


def build_rebuild_script(project_root: str, overlay: str, image: str) -> str:
"""Return a bash script that runs the rebuild inside the container.

Matches the training launcher's invocation exactly: read-only overlay mount,
no fakeroot, sources /ext3/env.sh which activates the venv/conda env with torch
and other deps installed.
"""
inner = (
"source /ext3/env.sh && "
f"cd {project_root} && "
"which python3 && "
'python3 -c "import torch; print(\\"torch:\\", torch.__version__)" && '
"python3 setup.py build_ext --inplace --force && "
'python3 -c "from pufferlib.ocean.drive import binding; print(\\"C binding loaded OK\\")"'
)
return (
"#!/bin/bash\n"
"set -e\n"
f"cd {project_root}\n"
f"singularity exec --nv \\\n"
f" --overlay {overlay}:ro \\\n"
f" {image} \\\n"
f" bash -c '{inner}'\n"
)
Comment on lines +54 to +70
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_rebuild_script() interpolates project_root, overlay, and image directly into shell commands without quoting, and later run_ssh(...)/sbatch_cmd also embed user-controlled strings unquoted. This can break on paths with spaces and is also a shell-injection risk when args come from the CLI. Use shlex.quote (or otherwise safely quote) all interpolated shell values, including the remote cat > ... command and sbatch arguments.

Copilot uses AI. Check for mistakes.


def run_ssh(cmd: str, check: bool = True) -> str:
"""Run a command on the cluster via ssh and return stdout."""
result = subprocess.run(["ssh", "torch", cmd], capture_output=True, text=True)
if check and result.returncode != 0:
print(result.stdout)
print(result.stderr, file=sys.stderr)
raise SystemExit(f"ssh command failed: {cmd}")
return result.stdout
Comment on lines +73 to +80
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SSH destination host is hardcoded as torch in both run_ssh() and the script upload call. This makes the helper unusable in environments where that host alias doesn’t exist. Consider adding a --host (or --ssh-target) argument with default torch, and using it consistently.

Copilot uses AI. Check for mistakes.


def main():
args = parse_args()
project_root = args.project_root or f"/scratch/{args.user}/code/PufferDrive"
overlay = args.overlay or f"/scratch/{args.user}/images/PufferDrive/overlay-15GB-500K.ext3"

script = build_rebuild_script(project_root, overlay, args.image)
# Use a scratch location for script and log so they survive the compute node.
log_dir = f"/scratch/{args.user}/rebuild_logs"
script_path = f"{log_dir}/rebuild_pufferdrive.sh"
log_path = f"{log_dir}/rebuild_pufferdrive_%j.log"
run_ssh(f"mkdir -p {log_dir}")

if args.dry:
print("=== rebuild script ===")
print(script)
print(f"=== sbatch destination: {script_path} ===")
print(f"=== log path: {log_path} ===")
return 0

# Write script to cluster via ssh
subprocess.run(
["ssh", "torch", f"cat > {script_path} && chmod +x {script_path}"],
input=script,
text=True,
check=True,
)

# Submit
sbatch_cmd = (
f"sbatch --account={args.account} --gres=gpu:1 "
f"--cpus-per-task={args.cpus} --mem={args.mem} --time={args.time} "
f"-o {log_path} {script_path}"
)
stdout = run_ssh(sbatch_cmd)
print(stdout.strip())

# Parse job id from "Submitted batch job 12345"
parts = stdout.strip().split()
if len(parts) < 4 or not parts[-1].isdigit():
print("Could not parse job id from sbatch output", file=sys.stderr)
return 1
job_id = parts[-1]
resolved_log = log_path.replace("%j", job_id)
print(f"Job ID: {job_id}")
print(f"Log: {resolved_log}")

if not args.wait:
return 0

# Poll for completion
print("Waiting for job to finish...")
while True:
time.sleep(20)
state = run_ssh(
f"sacct -j {job_id} --format=State -n -P 2>/dev/null | head -1",
check=False,
).strip()
if not state:
print(" (job not yet registered in sacct)")
continue
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sacct call uses -P, which outputs pipe-delimited fields and typically includes a trailing | (e.g., COMPLETED|). Since state is compared against strings like "COMPLETED", the loop may never break and --wait can hang indefinitely. Consider dropping -P or normalizing with state = state.split('|', 1)[0] before comparing.

Suggested change
continue
continue
state = state.split("|", 1)[0]

Copilot uses AI. Check for mistakes.
print(f" state: {state}")
if state in ("COMPLETED", "FAILED", "CANCELLED", "TIMEOUT", "NODE_FAIL"):
break

print()
print("=== log ===")
log_content = run_ssh(f"cat {resolved_log} 2>/dev/null || echo '(no log)'", check=False)
print(log_content)
return 0 if state == "COMPLETED" else 1


if __name__ == "__main__":
sys.exit(main())
29 changes: 27 additions & 2 deletions scripts/submit_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ def parse_args():
"--args", type=str, nargs="+", default=None, help="Args to override/sweep (e.g., learning_rate=1e-4:3e-4)"
)

# GPU heartbeat: keeps utilization above threshold to prevent job reclamation on NYU cluster
parser.add_argument(
"--heartbeat",
action="store_true",
help="Run scripts/gpu_heartbeat.py in background alongside training",
)

# Container settings
parser.add_argument("--container", action="store_true", help="Run inside Singularity container")
parser.add_argument(
Expand All @@ -94,7 +101,7 @@ def parse_args():
parser.add_argument(
"--container_overlay",
type=str,
default="/scratch/ev2237/containers/pufferdrive/overlay.ext3",
default=f"/scratch/{os.environ.get('USER', '')}/images/PufferDrive/overlay-15GB-500K.ext3",
help="Singularity overlay path",
)
Comment on lines 101 to 106
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--container_overlay default is built from os.environ.get('USER', ''); if USER is unset this becomes /scratch//images/... and silently points to a non-existent overlay. Consider falling back to a more reliable username lookup (e.g., getpass.getuser()), or erroring when USER is empty so the user must pass --container_overlay explicitly.

Copilot uses AI. Check for mistakes.

Expand Down Expand Up @@ -355,6 +362,17 @@ def launch_training(args, from_config, cmd, save_dir, project_root, container_co
# Add save_dir to command
full_cmd = base_cmd + cmd + ["--train.data-dir", save_dir]

# If heartbeat is enabled, wrap the training command in a brace group that:
# 1. backgrounds python scripts/gpu_heartbeat.py
# 2. runs training in the foreground
# 3. kills the heartbeat on training exit, preserving training's exit code
# Brace groups `{ ... ; }` run in the current shell (unlike parens) so the
# preceding `cd` and env exports still apply to the training command. The `&`
# backgrounds only the python call, not the whole compound statement.
def wrap_with_heartbeat(train_cmd_str):
hb = "python scripts/gpu_heartbeat.py > /tmp/gpu_heartbeat.log 2>&1 & HEARTBEAT_PID=$!"
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heartbeat uses a fixed log file /tmp/gpu_heartbeat.log. On nodes that run multiple jobs for the same user (or requeues), this can be overwritten/contended. Consider including $SLURM_JOB_ID (or $HEARTBEAT_PID) in the log filename to make it per-job/per-process.

Suggested change
hb = "python scripts/gpu_heartbeat.py > /tmp/gpu_heartbeat.log 2>&1 & HEARTBEAT_PID=$!"
hb = (
'HEARTBEAT_LOG="/tmp/gpu_heartbeat.${SLURM_JOB_ID:-$$}.log"; '
'python scripts/gpu_heartbeat.py > "$HEARTBEAT_LOG" 2>&1 & HEARTBEAT_PID=$!'
)

Copilot uses AI. Check for mistakes.
return f"{{ {hb}; {train_cmd_str}; TRAIN_EXIT=$?; kill $HEARTBEAT_PID 2>/dev/null; exit $TRAIN_EXIT; }}"

# Wrap with singularity if container mode is enabled
if container_config is not None:
env_setup = "source /ext3/env.sh"
Expand All @@ -368,7 +386,10 @@ def launch_training(args, from_config, cmd, save_dir, project_root, container_co
f"export WANDB_DIR={scratch_dir}/wandb_data && "
f"mkdir -p {scratch_dir}/cache"
)
inner_cmd = f"{env_setup} && {cache_exports} && cd {project_root} && " + " ".join(full_cmd)
train_str = " ".join(full_cmd)
if args.heartbeat:
train_str = wrap_with_heartbeat(train_str)
inner_cmd = f"{env_setup} && {cache_exports} && cd {project_root} && {train_str}"
Comment on lines +389 to +392
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The training command is converted to a shell string via ' '.join(full_cmd) and then embedded into bash -c (and potentially into a brace group). This will break if any argument contains spaces or shell-special characters, and it also makes quoting/escaping fragile. Prefer building the shell string with shlex.join(full_cmd) (Python 3.8+) and applying shlex.quote to interpolated paths like project_root/save_dir.

Copilot uses AI. Check for mistakes.
full_cmd = [
"singularity",
"exec",
Expand All @@ -388,6 +409,10 @@ def launch_training(args, from_config, cmd, save_dir, project_root, container_co
inner_cmd,
]
)
elif args.heartbeat:
# No container: still need to wrap in bash -c so the brace group parses.
train_str = " ".join(full_cmd)
full_cmd = ["bash", "-c", wrap_with_heartbeat(train_str)]

print(f">>> Job: {job_name}")
print(f">>> Working directory: {project_root}")
Expand Down
Loading