diff --git a/templates/configs/_global.yaml b/templates/configs/_global.yaml index 8c7fda3..80c89e0 100644 --- a/templates/configs/_global.yaml +++ b/templates/configs/_global.yaml @@ -20,15 +20,13 @@ hydra: launcher: submitit_folder: ${hydra.sweep.dir}/submitit_logs/%j nodes: ${oc.select:compute.nodes,null} - gpus_per_node: ${oc.select:compute.slurm.gpus_per_node, ${compute.gpus_per_node}} - tasks_per_node: 1 - cpus_per_task: ${compute.cpus_per_task} + tasks_per_node: ${oc.select:compute.tasks_per_node, 1} + cpus_per_task: ${oc.select:compute.cpus_per_task, 4} mem_gb: ${compute.mem_gb} timeout_min: ${compute.timeout_min} - gres: ${oc.select:compute.gres,null} - partition: ${oc.select:compute.slurm.partition,null} - qos: ${oc.select:compute.slurm.qos,null} + gres: ${oc.select:compute.gres, null} + partition: ${oc.select:compute.slurm.partition, null} + qos: ${oc.select:compute.slurm.qos, null} account: ${user.slurm.account} max_num_timeout: 2 - additional_parameters: ${oc.select:user.slurm.additional_parameters, {}} - + additional_parameters: ${oc.select:compute.slurm.additional_parameters, ${oc.select:user.slurm.additional_parameters, {}}} diff --git a/templates/configs/compute/bon_echo/a100_1x.yaml b/templates/configs/compute/bon_echo/a100_1x.yaml index 9362079..8061669 100644 --- a/templates/configs/compute/bon_echo/a100_1x.yaml +++ b/templates/configs/compute/bon_echo/a100_1x.yaml @@ -2,12 +2,12 @@ cluster: bon_echo nodes: 1 gpu_type: a100 gpus_per_node: 1 -time_limit: "8:00:00" -timeout_min: 480 -work_root: /scratch/ssd004/scratch/${oc.env:USER} -mem_gb: 80 -cpus_per_task: 16 gres: gpu:${.gpu_type}:${.gpus_per_node} +tasks_per_node: ${.gpus_per_node} +cpus_per_task: 16 +mem_gb: 80 +work_root: /scratch/ssd004/scratch/${oc.env:USER} +timeout_min: 60 slurm: partition: a100 - gpus_per_node: null + additional_parameters: {} diff --git a/templates/configs/compute/bon_echo/a100_4x.yaml b/templates/configs/compute/bon_echo/a100_4x.yaml index 750ce59..2f8ffbd 100644 --- a/templates/configs/compute/bon_echo/a100_4x.yaml +++ b/templates/configs/compute/bon_echo/a100_4x.yaml @@ -2,12 +2,12 @@ cluster: bon_echo nodes: 1 gpu_type: a100 gpus_per_node: 4 -time_limit: "2:00:00" -timeout_min: 120 -work_root: /scratch/ssd004/scratch/${oc.env:USER} -mem_gb: 320 -cpus_per_task: 8 gres: gpu:${.gpu_type}:${.gpus_per_node} +tasks_per_node: ${.gpus_per_node} +cpus_per_task: 16 +mem_gb: 320 +work_root: /scratch/ssd004/scratch/${oc.env:USER} +timeout_min: 60 slurm: partition: a100 - gpus_per_node: null + additional_parameters: {} diff --git a/templates/configs/compute/bon_echo/a40_1x.yaml b/templates/configs/compute/bon_echo/a40_1x.yaml index e5fd4cb..fde10ac 100644 --- a/templates/configs/compute/bon_echo/a40_1x.yaml +++ b/templates/configs/compute/bon_echo/a40_1x.yaml @@ -2,12 +2,12 @@ cluster: bon_echo nodes: 1 gpu_type: a40 gpus_per_node: 1 -time_limit: "8:00:00" -timeout_min: 480 -work_root: /scratch/ssd004/scratch/${oc.env:USER} -mem_gb: 16 -cpus_per_task: 16 gres: gpu:${.gpu_type}:${.gpus_per_node} +tasks_per_node: ${.gpus_per_node} +cpus_per_task: 8 +mem_gb: 40 +work_root: /scratch/ssd004/scratch/${oc.env:USER} +timeout_min: 60 slurm: partition: a40 - gpus_per_node: null + additional_parameters: {} diff --git a/templates/configs/compute/bon_echo/a40_2x.yaml b/templates/configs/compute/bon_echo/a40_2x.yaml index c703c97..20ccc70 100644 --- a/templates/configs/compute/bon_echo/a40_2x.yaml +++ b/templates/configs/compute/bon_echo/a40_2x.yaml @@ -2,12 +2,12 @@ cluster: bon_echo nodes: 1 gpu_type: a40 gpus_per_node: 2 -time_limit: "2:00:00" -timeout_min: 120 -work_root: /scratch/ssd004/scratch/${oc.env:USER} -mem_gb: 64 -cpus_per_task: 8 gres: gpu:${.gpu_type}:${.gpus_per_node} +tasks_per_node: ${.gpus_per_node} +cpus_per_task: 8 +mem_gb: 80 +work_root: /scratch/ssd004/scratch/${oc.env:USER} +timeout_min: 60 slurm: partition: a40 - gpus_per_node: null + additional_parameters: {} diff --git a/templates/configs/compute/bon_echo/a40_4x.yaml b/templates/configs/compute/bon_echo/a40_4x.yaml new file mode 100644 index 0000000..fb1bb35 --- /dev/null +++ b/templates/configs/compute/bon_echo/a40_4x.yaml @@ -0,0 +1,13 @@ +cluster: bon_echo +nodes: 1 +gpu_type: a40 +gpus_per_node: 4 +gres: gpu:${.gpu_type}:${.gpus_per_node} +tasks_per_node: ${.gpus_per_node} +cpus_per_task: 8 +mem_gb: 160 +work_root: /scratch/ssd004/scratch/${oc.env:USER} +timeout_min: 60 +slurm: + partition: a40 + additional_parameters: {} diff --git a/templates/configs/compute/bon_echo/cpu_1x.yaml b/templates/configs/compute/bon_echo/cpu_1x.yaml index fadbc3f..2101d48 100644 --- a/templates/configs/compute/bon_echo/cpu_1x.yaml +++ b/templates/configs/compute/bon_echo/cpu_1x.yaml @@ -1,11 +1,10 @@ cluster: bon_echo nodes: 1 gpus_per_node: 0 +gres: null cpus_per_task: 2 mem_gb: 8 work_root: /scratch/ssd004/scratch/${oc.env:USER} -time_limit: "0:15:00" -timeout_min: 15 -gres: null +timeout_min: 60 slurm: - gpus_per_node: null + additional_parameters: {} diff --git a/templates/configs/compute/killarney/cpu_1x.yaml b/templates/configs/compute/killarney/cpu_1x.yaml deleted file mode 100644 index 4593fcb..0000000 --- a/templates/configs/compute/killarney/cpu_1x.yaml +++ /dev/null @@ -1,10 +0,0 @@ -cluster: killarney -nodes: 1 -gpus_per_node: 0 -cpus_per_task: 32 -mem_gb: 64 -work_root: /scratch/${oc.env:USER} -timeout_min: 60 -gres: null -slurm: - gpus_per_node: null diff --git a/templates/configs/compute/killarney/h100_1x.yaml b/templates/configs/compute/killarney/h100_1x.yaml index 3be27d7..a002a91 100644 --- a/templates/configs/compute/killarney/h100_1x.yaml +++ b/templates/configs/compute/killarney/h100_1x.yaml @@ -2,11 +2,11 @@ cluster: killarney nodes: 1 gpu_type: h100 gpus_per_node: 1 -time_limit: "1:00:00" -timeout_min: 60 -work_root: /scratch/${oc.env:USER} -mem_gb: 256 -cpus_per_task: 24 gres: gpu:${.gpu_type}:${.gpus_per_node} +tasks_per_node: ${.gpus_per_node} +cpus_per_task: 6 +mem_gb: 240 +work_root: /scratch/${oc.env:USER} +timeout_min: 60 slurm: - gpus_per_node: null + additional_parameters: {} diff --git a/templates/configs/compute/killarney/h100_2x.yaml b/templates/configs/compute/killarney/h100_2x.yaml new file mode 100644 index 0000000..97dfa63 --- /dev/null +++ b/templates/configs/compute/killarney/h100_2x.yaml @@ -0,0 +1,12 @@ +cluster: killarney +nodes: 1 +gpu_type: h100 +gpus_per_node: 2 +gres: gpu:${.gpu_type}:${.gpus_per_node} +tasks_per_node: ${.gpus_per_node} +cpus_per_task: 6 +mem_gb: 480 +work_root: /scratch/${oc.env:USER} +timeout_min: 60 +slurm: + additional_parameters: {} diff --git a/templates/configs/compute/killarney/h100_4x.yaml b/templates/configs/compute/killarney/h100_4x.yaml new file mode 100644 index 0000000..29c0218 --- /dev/null +++ b/templates/configs/compute/killarney/h100_4x.yaml @@ -0,0 +1,12 @@ +cluster: killarney +nodes: 1 +gpu_type: h100 +gpus_per_node: 4 +gres: gpu:${.gpu_type}:${.gpus_per_node} +tasks_per_node: ${.gpus_per_node} +cpus_per_task: 6 +mem_gb: 960 +work_root: /scratch/${oc.env:USER} +timeout_min: 60 +slurm: + additional_parameters: {} diff --git a/templates/configs/compute/killarney/h100_8x.yaml b/templates/configs/compute/killarney/h100_8x.yaml index 2530d37..900ec89 100644 --- a/templates/configs/compute/killarney/h100_8x.yaml +++ b/templates/configs/compute/killarney/h100_8x.yaml @@ -2,11 +2,11 @@ cluster: killarney nodes: 1 gpu_type: h100 gpus_per_node: 8 -time_limit: "1:00:00" -timeout_min: 60 -work_root: /scratch/${oc.env:USER} -mem_gb: 2048 -cpus_per_task: 96 gres: gpu:${.gpu_type}:${.gpus_per_node} +tasks_per_node: ${.gpus_per_node} +cpus_per_task: 6 +mem_gb: 1920 +work_root: /scratch/${oc.env:USER} +timeout_min: 60 slurm: - gpus_per_node: null + additional_parameters: {} diff --git a/templates/configs/compute/killarney/l40s_1x.yaml b/templates/configs/compute/killarney/l40s_1x.yaml index 956c01d..eac455a 100644 --- a/templates/configs/compute/killarney/l40s_1x.yaml +++ b/templates/configs/compute/killarney/l40s_1x.yaml @@ -2,11 +2,11 @@ cluster: killarney nodes: 1 gpu_type: l40s gpus_per_node: 1 -time_limit: "1:00:00" -timeout_min: 60 -work_root: /scratch/${oc.env:USER} -mem_gb: 64 -cpus_per_task: 32 gres: gpu:${.gpu_type}:${.gpus_per_node} +tasks_per_node: ${.gpus_per_node} +cpus_per_task: 16 +mem_gb: 120 +work_root: /scratch/${oc.env:USER} +timeout_min: 60 slurm: gpus_per_node: null diff --git a/templates/configs/compute/killarney/l40s_2x.yaml b/templates/configs/compute/killarney/l40s_2x.yaml index 26a09ce..d84976f 100644 --- a/templates/configs/compute/killarney/l40s_2x.yaml +++ b/templates/configs/compute/killarney/l40s_2x.yaml @@ -2,11 +2,11 @@ cluster: killarney nodes: 1 gpu_type: l40s gpus_per_node: 2 -time_limit: "1:00:00" -timeout_min: 60 -work_root: /scratch/${oc.env:USER} -mem_gb: 128 -cpus_per_task: 64 gres: gpu:${.gpu_type}:${.gpus_per_node} +tasks_per_node: ${.gpus_per_node} +cpus_per_task: 16 +mem_gb: 240 +work_root: /scratch/${oc.env:USER} +timeout_min: 60 slurm: - gpus_per_node: null + additional_parameters: {} diff --git a/templates/configs/compute/killarney/l40s_4x.yaml b/templates/configs/compute/killarney/l40s_4x.yaml new file mode 100644 index 0000000..8743ec6 --- /dev/null +++ b/templates/configs/compute/killarney/l40s_4x.yaml @@ -0,0 +1,12 @@ +cluster: killarney +nodes: 1 +gpu_type: l40s +gpus_per_node: 4 +gres: gpu:${.gpu_type}:${.gpus_per_node} +tasks_per_node: ${.gpus_per_node} +cpus_per_task: 16 +mem_gb: 480 +work_root: /scratch/${oc.env:USER} +timeout_min: 60 +slurm: + additional_parameters: {} diff --git a/templates/src/mlp/ddp/README.md b/templates/src/mlp/ddp/README.md index c962828..a7ca85d 100644 --- a/templates/src/mlp/ddp/README.md +++ b/templates/src/mlp/ddp/README.md @@ -1,17 +1,58 @@ -# Distributed Data Parallel Example +# MLP Distributed Data Parallel Template -> :warning: WIP: This template is a work in progress and does not use DDP in its current state. +*Data Parallelism* lets you to split your data across multiple accelerators so that you can train your model faster! -*Data Parallelism* lets you to split your data across multiple accelerators so that you can train your model faster! - -Most of the time all your accelerators (gpus) will be on the same machine (node), and that simplifies things. However if you are using a large number of gpus that can't fit on a single machine, then you'll have to use multiple machines (nodes). For example, on the Killarney cluster, L40's have a maximum of 4 per node and H100's have a maximum of 8 per nodes. Data Parallelism across multiple nodes is referred to as *Distributed Data Parallelism* (DDP). By default DDP works for both single node and multi-node settings. +Most of the time all your accelerators (GPUs) will be on the same machine (node), and that simplifies things. However if you are using a large number of GPUs that can't fit on a single machine, then you'll have to use multiple machines (nodes). For example, on the Killarney cluster, L40's have a maximum of 4 per node and H100's have a maximum of 8 per nodes. Data Parallelism across multiple nodes is referred to as *Distributed Data Parallelism* (DDP). By default DDP works for both single node and multi-node settings. This example implements a simple MLP using DDP. ## DDP Background -**World Size:** The total number of gpu's across all nodes +**World Size:** The total number of GPU's across all nodes + +**Rank:** Integer ID for a single GPU. Unique across all nodes. (from `0` to `world_size - 1`) + +**Local Rank:** Integer ID for a single GPU. Unique only within a node. (from `0` to `num_gpus_per_node - 1`) + +## DDP Setup + +Unlike `torchrun`, Submitit is a **job scheduler integration**, not a distributed orchestrator. +It spawns one process per GPU (or per `tasks_per_node`), but it does **not automatically set** the PyTorch environment variables (`RANK`, `LOCAL_RANK`, `WORLD_SIZE`, `MASTER_ADDR`, `MASTER_PORT`) required by `torch.distributed`. + +However, Submitit automatically determines the distributed context (each task’s **global rank**, **local rank**, **world size**, and **hostnames**). +You don’t manually assign local ranks; you retrieve them from `submitit.JobEnvironment()` and use them to initialize PyTorch DDP: + +```python +job_env = submitit.JobEnvironment() +rank = job_env.global_rank +local_rank = job_env.local_rank +world_size = job_env.num_tasks +``` + +Once you retrieve these values, export them as environment variables and call: + +```python +torch.distributed.init_process_group(init_method="env://", backend="nccl") +``` + +This pattern is the standard way to perform DDP initialization with Submitit when not using `torchrun` +([MosaicML Docs](https://docs.mosaicml.com/projects/composer/en/stable/examples/training_with_submitit.html), +[Hydra Submitit Launcher](https://hydra.cc/docs/plugins/submitit_launcher/), +[PyTorch Forum Discussion](https://discuss.pytorch.org/t/using-submitit-for-distributed-training/121881), +[Fairseq Example](https://github.com/facebookresearch/fairseq/blob/main/examples/language_model/submitit_train.py)). + +Submitit also provides an optional helper class, `submitit.helpers.TorchDistributedEnvironment`, which wraps `JobEnvironment`. +It automatically exports the standard PyTorch environment variables (`RANK`, `LOCAL_RANK`, `WORLD_SIZE`, `MASTER_ADDR`, and `MASTER_PORT`) so that you can initialize DDP with `init_method="env://"` directly. Think of it as a convenience layer built on top of `JobEnvironment`. `JobEnvironment` also exposes extra metadata like `hostnames` and `hostname`, which can be helpful for advanced or custom multi-node configurations. + +For a minimal example that uses `submitit.helpers.TorchDistributedEnvironment()` together with +`torch.distributed.init_process_group(init_method="env://")`, see the official Submitit example +[`docs/examples/torch_distributed.py`](https://github.com/facebookincubator/submitit/blob/main/docs/examples/torch_distributed.py). + + +### Logging in DDP (Hydra + Submitit) -**Rank:** Integer ID for a single gpu. Unique across all nodes. (from `0` to `world_size - 1`) +To avoid duplicated lines in the global Hydra log, we log with `logger` only on **rank 0**. +For per-rank visibility, use `print()` on non-zero ranks. Those messages appear only in that rank’s stdout (Submitit/Slurm per-task output). -**Local Rank:** Integer ID for a single gpu. Unique only within a node. (from `0` to `num_gpus_per_node - 1`) \ No newline at end of file +- `logger.info(...)` (rank 0): goes to the single, global Hydra log for the run. +- `print(...)` (ranks > 0): stays in the rank-local stdout, not in the global Hydra log. diff --git a/templates/src/mlp/ddp/train.py b/templates/src/mlp/ddp/train.py index 0e4d239..972c4f2 100644 --- a/templates/src/mlp/ddp/train.py +++ b/templates/src/mlp/ddp/train.py @@ -1,14 +1,14 @@ """Distributed MLP training using PyTorch DDP.""" -import os import logging +import os import submitit import torch import torch.distributed as dist +from omegaconf import DictConfig, OmegaConf from torch import nn, optim from torch.utils.data import DataLoader, DistributedSampler, TensorDataset -from omegaconf import DictConfig, OmegaConf logger = logging.getLogger(__name__) @@ -79,6 +79,58 @@ def _setup_distributed(self, rank, world_size): world_size=world_size, ) + def _wrap_distributed(self, model, world_size, local_rank): + """Wrap the model with DDP if running in distributed mode.""" + if world_size > 1: + return nn.parallel.DistributedDataParallel( + model, + device_ids=[local_rank] if torch.cuda.is_available() else None, + ) + return model + + def _configure_training(self, cfg): + """Extract core training hyperparameters from the configuration.""" + lr = OmegaConf.select(cfg, "trainer.learning_rate", default=1e-3) + num_epochs = OmegaConf.select(cfg, "trainer.num_epochs", default=1000) + seed = OmegaConf.select(cfg, "trainer.seed", default=42) + return lr, num_epochs, seed + + def _get_distributed_config(self): + """Retrieve distributed job configuration information from Submitit.""" + job_env = submitit.JobEnvironment() + return job_env, job_env.global_rank, job_env.local_rank, job_env.num_tasks + + def _prepare_environment(self, job_env, rank, local_rank, world_size): + """Set up distributed environment variables for PyTorch.""" + os.environ.setdefault("RANK", str(rank)) + os.environ.setdefault("LOCAL_RANK", str(local_rank)) + os.environ.setdefault("WORLD_SIZE", str(world_size)) + + if "MASTER_ADDR" not in os.environ: + master_addr = ( + job_env.hostnames[0] + if hasattr(job_env, "hostnames") + else job_env.hostname + ) + os.environ["MASTER_ADDR"] = str(master_addr) + + os.environ.setdefault("MASTER_PORT", "29500") + + def _log_run_configuration(self, seed, world_size, local_rank, rank): + """Log the configuration of the current DDP run.""" + if rank != 0: + return + logger.info(f"Starting DDP MLP training with seed {seed}") + logger.info(f"World size: {world_size}, Local rank: {local_rank}") + if torch.cuda.is_available(): + logger.info(f"Number of available GPUs: {torch.cuda.device_count()}") + + def _set_seed(self, seed): + """Set random seeds for reproducibility across PyTorch and CUDA.""" + torch.manual_seed(seed) + if torch.cuda.is_available(): + torch.cuda.manual_seed(seed) + def _initialize_device_and_model(self, cfg, local_rank): """Initialize device and model.""" input_dim = OmegaConf.select(cfg, "trainer.input_dim", default=10) @@ -109,7 +161,7 @@ def _initialize_data_and_loader(self, cfg, world_size, rank): num_classes = OmegaConf.select(cfg, "trainer.num_classes", default=3) batch_size = OmegaConf.select(cfg, "trainer.batch_size", default=32) - dataset = create_dummy_data(1000, input_dim, num_classes) + dataset = create_dummy_data(100000, input_dim, num_classes) sampler = ( DistributedSampler( dataset, num_replicas=world_size, rank=rank, shuffle=True @@ -157,7 +209,6 @@ def _train_epoch( device, epoch, world_size, - rank, ): """Train for one epoch and return metrics.""" # Set epoch for DistributedSampler to ensure proper shuffling across epochs @@ -193,64 +244,39 @@ def _train_epoch( def __call__(self, cfg): """Train the MLP model with DDP.""" - cfg : DictConfig = OmegaConf.create(cfg) # Ensure cfg is a DictConfig + cfg: DictConfig = OmegaConf.create(cfg) - # Create output directory out_dir = cfg.paths.out_dir os.makedirs(out_dir, exist_ok=True) - - # Get ckpt dir self.ckpt_dir = self._latest_checkpoint(out_dir) - # Configuration - lr = OmegaConf.select(cfg, "trainer.learning_rate", default=1e-3) - num_epochs = OmegaConf.select(cfg, "trainer.num_epochs", default=1000) - seed = OmegaConf.select(cfg, "trainer.seed", default=42) - - # Get distributed training info from environment - # TODO: None of these env vars are actually set at the moment. Need to fix this example. - rank = int(os.environ.get("RANK", "0")) - local_rank = int(os.environ.get("LOCAL_RANK", "0")) - world_size = int(os.environ.get("WORLD_SIZE", "1")) + lr, num_epochs, seed = self._configure_training(cfg) + job_env, rank, local_rank, world_size = self._get_distributed_config() - if rank == 0: - logger.info(f"Starting DDP MLP training with seed {seed}") - logger.info(f"World size: {world_size}, Local rank: {local_rank}") - - # Set seed for reproducibility (same seed on all processes) - torch.manual_seed(seed) - if torch.cuda.is_available(): - torch.cuda.manual_seed(seed) - logger.info(f"Number of available GPUs: {torch.cuda.device_count()}") + self._prepare_environment(job_env, rank, local_rank, world_size) + self._set_seed(seed) + self._log_run_configuration(seed, world_size, local_rank, rank) - # Setup distributed training self._setup_distributed(rank, world_size) - # Setup device and model device, model = self._initialize_device_and_model(cfg, local_rank) - + if rank == 0: + logger.info(f"[Rank {rank}] Initialized on device: {device}") + else: + print(f"[Rank {rank}] Initialized on device: {device}") if rank == 0: logger.info(f"Using device: {device}") - # Wrap model with DDP - if world_size > 1: - model = nn.parallel.DistributedDataParallel( - model, - device_ids=[local_rank] if torch.cuda.is_available() else None, - ) + model = self._wrap_distributed(model, world_size, local_rank) - # Setup data and training loader, sampler = self._initialize_data_and_loader(cfg, world_size, rank) optimizer = optim.Adam(model.parameters(), lr=lr) criterion = nn.CrossEntropyLoss() - # Resume from checkpoint if available start_epoch = self._load_checkpoint_if_exists(model, optimizer, device, rank) - if rank == 0: logger.info(f"Training from epoch {start_epoch} to {num_epochs}...") - # Training loop with DDP for epoch in range(start_epoch, num_epochs): loss_sum, correct, total = self._train_epoch( model, @@ -261,18 +287,20 @@ def __call__(self, cfg): device, epoch, world_size, - rank, ) + avg_loss = loss_sum / (len(loader) * world_size) + acc = 100.0 * correct / total + should_checkpoint = epoch % 100 == 0 or epoch == num_epochs - 1 + # Log metrics only on rank 0 if rank == 0: - acc = 100.0 * correct / total - avg_loss = loss_sum / len(loader) logger.info(f"Epoch {epoch}: loss={avg_loss:.4f} acc={acc:.2f}%") - if epoch % 100 == 0 or epoch == num_epochs - 1: - if world_size > 1: - dist.barrier() + if should_checkpoint: + if world_size > 1: + dist.barrier() + if rank == 0: self._save_checkpoint( model, optimizer, epoch, out_dir, avg_loss, acc, rank )