diff --git a/Dockerfile b/Dockerfile index ceb3863..e0cad8f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -52,6 +52,10 @@ RUN curl -Lo torchtitan-requirements.txt https://raw.githubusercontent.com/pytor python3 -m pip install --no-cache-dir -r torchtitan-requirements.txt RUN python3 -m pip install --no-cache-dir megatron-core==0.13.1 transformers==4.41.2 deepspeed==0.17.5 torchtitan==0.1.0 +RUN python3 -m pip install --no-cache-dir --no-deps colossalai==0.5.0 +RUN python3 -m pip install --no-cache-dir peft==0.10.0 +RUN python3 -m pip install --no-cache-dir galore-torch==1.0 + # DeepSpeed needs passwordless ssh COPY config/sshconfig /root/.ssh/config COPY config/id_ed25519 /root/.ssh/id_ed25519 diff --git a/tests/docker/ColossalAI/.gitignore b/tests/docker/ColossalAI/.gitignore new file mode 100644 index 0000000..de9f6a0 --- /dev/null +++ b/tests/docker/ColossalAI/.gitignore @@ -0,0 +1,3 @@ +/compose.yaml +/netconfig.toml +/config.sh diff --git a/tests/docker/ColossalAI/config_gen.py b/tests/docker/ColossalAI/config_gen.py new file mode 100644 index 0000000..eb386b3 --- /dev/null +++ b/tests/docker/ColossalAI/config_gen.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 + +SIMULATOR_TEMPLATE = r""" + simulator: + image: "phantora:latest" + volumes: + - /run/phantora:/run/phantora + - ./netconfig.toml:/netconfig.toml:ro + pid: host + ipc: host + environment: + - PHANTORA_LOG=${{PHANTORA_LOG:-info}} + - PHANTORA_SOCKET_PREFIX=/run/phantora/phantora + command: /phantora/dist/phantora_server --netconfig /netconfig.toml + cpuset: '{cpuset}' + deploy: + resources: + reservations: + devices: + - driver: nvidia + device_ids: ['0'] + capabilities: [gpu] +""" + +HOST_TEMPLATE = r""" + host-{host_id}: + image: "phantora:latest" + volumes: + - /run/phantora:/run/phantora + - ../..:/phantora/tests:ro + pid: host + ipc: host + environment: + - CUDA_DEVICE_MAX_CONNECTIONS=1 + - PHANTORA_NGPU={ngpu} + - PHANTORA_VRAM_MIB={vram_mib} + - PHANTORA_IGNORE_CPU_TIME=1 + - PHANTORA_SOCKET_PREFIX=/run/phantora/phantora + hostname: host-{host_id} + command: sleep infinity + cpuset: '{cpuset}' + depends_on: + - simulator +""" + +NETCONFIG_TEMPLATE = r""" +host_mapping = {host_list} + +[simulator] +loopback_speed = 2880 +fairness = "PerFlowMaxMin" + +[topology] +type = "TwoLayerMultiPath" + +[topology.args] +nspines = 2 +nracks = {nracks} +rack_size = 2 +host_bw = 800 +rack_uplink_port_bw = 800 +load_balancer_type = "EcmpEverything" +""" + +if __name__ == '__main__': + import argparse + from os.path import dirname, realpath, join + from multiprocessing import cpu_count + script_dir = dirname(realpath(__file__)) + + nproc = cpu_count() + if nproc <= 2: + default_sim_core = str(nproc - 1) + default_host_cpuset = str(nproc - 1) + else: + default_sim_core = str(nproc // 2) + default_host_cpuset = f"{nproc // 2 + 1}-{nproc - 1}" + + parser = argparse.ArgumentParser() + parser.add_argument("--nhost", type=int, default=4) + parser.add_argument("--ngpu", type=int, default=4) + parser.add_argument("--vram_mib", type=int, default=143771) + parser.add_argument("--cpuset_sim", type=str, default=default_sim_core) + parser.add_argument("--cpuset_host", type=str, default=default_host_cpuset) + args = parser.parse_args() + + nhosts = args.nhost + ngpu = args.ngpu + + with open(join(script_dir, "compose.yaml"), "w") as f: + f.write("services:") + f.write(SIMULATOR_TEMPLATE.format(cpuset=args.cpuset_sim)) + for i in range(1, nhosts + 1): + f.write( + HOST_TEMPLATE.format( + host_id=i, + ngpu=ngpu, + vram_mib=args.vram_mib, + cpuset=args.cpuset_host, + ) + ) + + with open(join(script_dir, "netconfig.toml"), "w") as f: + host_list = str([f"host-{i}" for i in range(1, nhosts + 1)]) + f.write( + NETCONFIG_TEMPLATE.format( + host_list=host_list, nracks=(nhosts + 1) // 2 + ) + ) + + with open(join(script_dir, "config.sh"), "w") as f: + f.write(f"EVAL_NHOST={nhosts}\n") + f.write(f"EVAL_NGPU={ngpu}\n") diff --git a/tests/docker/ColossalAI/run.sh b/tests/docker/ColossalAI/run.sh new file mode 100755 index 0000000..eed8d81 --- /dev/null +++ b/tests/docker/ColossalAI/run.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash + +WORKDIR=`dirname $(realpath $0)` +source $WORKDIR/config.sh + +compose_file=$WORKDIR/compose.yaml + +docker compose -f $compose_file down --remove-orphans +docker compose -f $compose_file up -d +sleep 1 + +cmd="/phantora/dist/phantora_run torchrun --nproc_per_node $EVAL_NGPU --nnodes $EVAL_NHOST --rdzv_backend c10d --rdzv_endpoint=\"host-1:12345\" /phantora/tests/test_ColossalAI.py $@" + +for w in $(seq 2 $EVAL_NHOST); do + docker compose -f $compose_file exec -it -d host-$w bash -c "$cmd" +done +docker compose -f $compose_file exec -it host-1 bash -c "$cmd" diff --git a/tests/docker/ColossalAI/stop.sh b/tests/docker/ColossalAI/stop.sh new file mode 100755 index 0000000..bf27321 --- /dev/null +++ b/tests/docker/ColossalAI/stop.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +WORKDIR=`dirname $(realpath $0)` +compose_file=$WORKDIR/compose.yaml +docker compose -f $compose_file down --remove-orphans +sudo rm -f /run/phantora/phantora* diff --git a/tests/phantora_utils.py b/tests/phantora_utils.py index 71a3a04..fd6e760 100644 --- a/tests/phantora_utils.py +++ b/tests/phantora_utils.py @@ -22,6 +22,7 @@ def time_pair() -> float: LIB.get_time_double.restype = ctypes.c_double _get_time = LIB.get_time_double _perf_counter = _time.perf_counter + _wall_time = _time.time def time() -> float: _read_timer() @@ -34,6 +35,7 @@ def time_pair() -> float: return t, t_wall _time.perf_counter = time + #_time.time = time # seems cannot patch `assert_ints_same_as_other_ranks` # maybe due to decorator, but cannot reproduce in a mini example diff --git a/tests/test_ColossalAI.py b/tests/test_ColossalAI.py new file mode 100644 index 0000000..9a1869a --- /dev/null +++ b/tests/test_ColossalAI.py @@ -0,0 +1,152 @@ +from phantora_utils import ( + time_pair, + enable_function_tracer, + disable_function_tracer, + RandomTokens, +) +import os +import torch +import torch.distributed as dist +from torch.utils.data import DataLoader +from transformers import LlamaConfig, LlamaForCausalLM + + +def build_model( + device, + num_layers, + hidden_size, + ffn_hidden_size, + num_attention_heads, + vocab_size, + seq_len, +): + config = LlamaConfig( + vocab_size=vocab_size, + hidden_size=hidden_size, + intermediate_size=ffn_hidden_size, + num_hidden_layers=num_layers, + num_attention_heads=num_attention_heads, + max_position_embeddings=seq_len, + ) + config._attn_implementation = "flash_attention_2" + + dtype_orig = torch.get_default_dtype() + torch.set_default_dtype(torch.bfloat16) + with torch.device("meta"): + model = LlamaForCausalLM(config) + model = model.to_empty(device=device) + torch.set_default_dtype(dtype_orig) + return model + + +def main( + num_layers, + hidden_size, + ffn_hidden_size, + num_attention_heads, + vocab_size, + seq_len, + micro_batch_size, + iterations, +): + from colossalai import launch_from_torch + from colossalai.booster import Booster + from colossalai.booster.plugin import TorchDDPPlugin + + launch_from_torch() + + rank = int(os.environ["RANK"]) + local_rank = int(os.environ["LOCAL_RANK"]) + torch.cuda.set_device(local_rank) + device = torch.device("cuda", local_rank) + torch.cuda.memory.reset_peak_memory_stats(device) + + model = build_model( + device=device, + num_layers=num_layers, + hidden_size=hidden_size, + ffn_hidden_size=ffn_hidden_size, + num_attention_heads=num_attention_heads, + vocab_size=vocab_size, + seq_len=seq_len, + ) + model.train() + + if rank == 0: + print(f"Model size: {sum(p.numel() for p in model.parameters())}") + + optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5) + dataset = RandomTokens(vocab_size, seq_len, iterations * micro_batch_size) + data_loader = DataLoader(dataset, batch_size=micro_batch_size) + + booster = Booster(plugin=TorchDDPPlugin()) + model, optimizer, _, data_loader, _ = booster.boost( + model, optimizer, dataloader=data_loader + ) + + duras = [] + duras_wall = [] + for step, (source, label) in enumerate(data_loader): + if step >= iterations: + break + start, start_wall = time_pair() + source = source.to(device) + label = label.to(device) + loss = model(source, labels=label).loss + booster.backward(loss, optimizer) + optimizer.step() + optimizer.zero_grad(set_to_none=True) + torch.cuda.synchronize() + end, end_wall = time_pair() + print( + f"rank {rank} iter {step} time: {end - start:.2f} wall: {end_wall - start_wall:.2f}\n", + end="", + ) + duras.append(end - start) + duras_wall.append(end_wall - start_wall) + + peak_vram_mib = torch.cuda.max_memory_allocated(device) / (1024 * 1024) + if len(duras) > 1: + avg_time = sum(duras[1:]) / (len(duras) - 1) + avg_wall = sum(duras_wall[1:]) / (len(duras_wall) - 1) + elif len(duras) == 1: + avg_time = duras[0] + avg_wall = duras_wall[0] + else: + avg_time = 0.0 + avg_wall = 0.0 + print(f"Rank {rank} Time: {duras} Avg Time: {avg_time:.2f}\n", end="") + print(f"Rank {rank} Peak: {peak_vram_mib:<.2f}MiB\n", end="") + print(f"Rank {rank} Wall: {duras_wall} Avg Wall: {avg_wall:.2f}\n", end="") + + if dist.is_initialized(): + dist.destroy_process_group() + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("--num_layers", type=int, default=32) + parser.add_argument("--hidden_size", type=int, default=4096) + parser.add_argument("--ffn_hidden_size", type=int, default=11008) + parser.add_argument("--num_attention_heads", type=int, default=32) + parser.add_argument("--vocab_size", type=int, default=32000) + parser.add_argument("--sequence_length", type=int, default=4096) + parser.add_argument("--micro_batch_size", type=int, default=1) + parser.add_argument("--iterations", type=int, default=4) + args = parser.parse_args() + enable_function_tracer() + try: + main( + num_layers=args.num_layers, + hidden_size=args.hidden_size, + ffn_hidden_size=args.ffn_hidden_size, + num_attention_heads=args.num_attention_heads, + vocab_size=args.vocab_size, + seq_len=args.sequence_length, + micro_batch_size=args.micro_batch_size, + iterations=args.iterations, + ) + finally: + disable_function_tracer()