Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ RUN curl -Lo torchtitan-requirements.txt https://raw.githubusercontent.com/pytor
python3 -m pip install --no-cache-dir -r torchtitan-requirements.txt
RUN python3 -m pip install --no-cache-dir megatron-core==0.13.1 transformers==4.41.2 deepspeed==0.17.5 torchtitan==0.1.0

RUN python3 -m pip install --no-cache-dir --no-deps colossalai==0.5.0
RUN python3 -m pip install --no-cache-dir peft==0.10.0
RUN python3 -m pip install --no-cache-dir galore-torch==1.0

# DeepSpeed needs passwordless ssh
COPY config/sshconfig /root/.ssh/config
COPY config/id_ed25519 /root/.ssh/id_ed25519
Expand Down
3 changes: 3 additions & 0 deletions tests/docker/ColossalAI/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/compose.yaml
/netconfig.toml
/config.sh
113 changes: 113 additions & 0 deletions tests/docker/ColossalAI/config_gen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#!/usr/bin/env python3

SIMULATOR_TEMPLATE = r"""
simulator:
image: "phantora:latest"
volumes:
- /run/phantora:/run/phantora
- ./netconfig.toml:/netconfig.toml:ro
pid: host
ipc: host
environment:
- PHANTORA_LOG=${{PHANTORA_LOG:-info}}
- PHANTORA_SOCKET_PREFIX=/run/phantora/phantora
command: /phantora/dist/phantora_server --netconfig /netconfig.toml
cpuset: '{cpuset}'
deploy:
resources:
reservations:
devices:
- driver: nvidia
device_ids: ['0']
capabilities: [gpu]
"""

HOST_TEMPLATE = r"""
host-{host_id}:
image: "phantora:latest"
volumes:
- /run/phantora:/run/phantora
- ../..:/phantora/tests:ro
pid: host
ipc: host
environment:
- CUDA_DEVICE_MAX_CONNECTIONS=1
- PHANTORA_NGPU={ngpu}
- PHANTORA_VRAM_MIB={vram_mib}
- PHANTORA_IGNORE_CPU_TIME=1
- PHANTORA_SOCKET_PREFIX=/run/phantora/phantora
hostname: host-{host_id}
command: sleep infinity
cpuset: '{cpuset}'
depends_on:
- simulator
"""

NETCONFIG_TEMPLATE = r"""
host_mapping = {host_list}

[simulator]
loopback_speed = 2880
fairness = "PerFlowMaxMin"

[topology]
type = "TwoLayerMultiPath"

[topology.args]
nspines = 2
nracks = {nracks}
rack_size = 2
host_bw = 800
rack_uplink_port_bw = 800
load_balancer_type = "EcmpEverything"
"""

if __name__ == '__main__':
import argparse
from os.path import dirname, realpath, join
from multiprocessing import cpu_count
script_dir = dirname(realpath(__file__))

nproc = cpu_count()
if nproc <= 2:
default_sim_core = str(nproc - 1)
default_host_cpuset = str(nproc - 1)
else:
default_sim_core = str(nproc // 2)
default_host_cpuset = f"{nproc // 2 + 1}-{nproc - 1}"

parser = argparse.ArgumentParser()
parser.add_argument("--nhost", type=int, default=4)
parser.add_argument("--ngpu", type=int, default=4)
parser.add_argument("--vram_mib", type=int, default=143771)
parser.add_argument("--cpuset_sim", type=str, default=default_sim_core)
parser.add_argument("--cpuset_host", type=str, default=default_host_cpuset)
args = parser.parse_args()

nhosts = args.nhost
ngpu = args.ngpu

with open(join(script_dir, "compose.yaml"), "w") as f:
f.write("services:")
f.write(SIMULATOR_TEMPLATE.format(cpuset=args.cpuset_sim))
for i in range(1, nhosts + 1):
f.write(
HOST_TEMPLATE.format(
host_id=i,
ngpu=ngpu,
vram_mib=args.vram_mib,
cpuset=args.cpuset_host,
)
)

with open(join(script_dir, "netconfig.toml"), "w") as f:
host_list = str([f"host-{i}" for i in range(1, nhosts + 1)])
f.write(
NETCONFIG_TEMPLATE.format(
host_list=host_list, nracks=(nhosts + 1) // 2
)
)

with open(join(script_dir, "config.sh"), "w") as f:
f.write(f"EVAL_NHOST={nhosts}\n")
f.write(f"EVAL_NGPU={ngpu}\n")
17 changes: 17 additions & 0 deletions tests/docker/ColossalAI/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/usr/bin/env bash

WORKDIR=`dirname $(realpath $0)`
source $WORKDIR/config.sh

compose_file=$WORKDIR/compose.yaml

docker compose -f $compose_file down --remove-orphans
docker compose -f $compose_file up -d
sleep 1

cmd="/phantora/dist/phantora_run torchrun --nproc_per_node $EVAL_NGPU --nnodes $EVAL_NHOST --rdzv_backend c10d --rdzv_endpoint=\"host-1:12345\" /phantora/tests/test_ColossalAI.py $@"

for w in $(seq 2 $EVAL_NHOST); do
docker compose -f $compose_file exec -it -d host-$w bash -c "$cmd"
done
docker compose -f $compose_file exec -it host-1 bash -c "$cmd"
6 changes: 6 additions & 0 deletions tests/docker/ColossalAI/stop.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env bash

WORKDIR=`dirname $(realpath $0)`
compose_file=$WORKDIR/compose.yaml
docker compose -f $compose_file down --remove-orphans
sudo rm -f /run/phantora/phantora*
2 changes: 2 additions & 0 deletions tests/phantora_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def time_pair() -> float:
LIB.get_time_double.restype = ctypes.c_double
_get_time = LIB.get_time_double
_perf_counter = _time.perf_counter
_wall_time = _time.time

def time() -> float:
_read_timer()
Expand All @@ -34,6 +35,7 @@ def time_pair() -> float:
return t, t_wall

_time.perf_counter = time
#_time.time = time

# seems cannot patch `assert_ints_same_as_other_ranks`
# maybe due to decorator, but cannot reproduce in a mini example
Expand Down
152 changes: 152 additions & 0 deletions tests/test_ColossalAI.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from phantora_utils import (
time_pair,
enable_function_tracer,
disable_function_tracer,
RandomTokens,
)
import os
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader
from transformers import LlamaConfig, LlamaForCausalLM


def build_model(
device,
num_layers,
hidden_size,
ffn_hidden_size,
num_attention_heads,
vocab_size,
seq_len,
):
config = LlamaConfig(
vocab_size=vocab_size,
hidden_size=hidden_size,
intermediate_size=ffn_hidden_size,
num_hidden_layers=num_layers,
num_attention_heads=num_attention_heads,
max_position_embeddings=seq_len,
)
config._attn_implementation = "flash_attention_2"

dtype_orig = torch.get_default_dtype()
torch.set_default_dtype(torch.bfloat16)
with torch.device("meta"):
model = LlamaForCausalLM(config)
model = model.to_empty(device=device)
torch.set_default_dtype(dtype_orig)
return model


def main(
num_layers,
hidden_size,
ffn_hidden_size,
num_attention_heads,
vocab_size,
seq_len,
micro_batch_size,
iterations,
):
from colossalai import launch_from_torch
from colossalai.booster import Booster
from colossalai.booster.plugin import TorchDDPPlugin

launch_from_torch()

rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)
torch.cuda.memory.reset_peak_memory_stats(device)

model = build_model(
device=device,
num_layers=num_layers,
hidden_size=hidden_size,
ffn_hidden_size=ffn_hidden_size,
num_attention_heads=num_attention_heads,
vocab_size=vocab_size,
seq_len=seq_len,
)
model.train()

if rank == 0:
print(f"Model size: {sum(p.numel() for p in model.parameters())}")

optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
dataset = RandomTokens(vocab_size, seq_len, iterations * micro_batch_size)
data_loader = DataLoader(dataset, batch_size=micro_batch_size)

booster = Booster(plugin=TorchDDPPlugin())
model, optimizer, _, data_loader, _ = booster.boost(
model, optimizer, dataloader=data_loader
)

duras = []
duras_wall = []
for step, (source, label) in enumerate(data_loader):
if step >= iterations:
break
start, start_wall = time_pair()
source = source.to(device)
label = label.to(device)
loss = model(source, labels=label).loss
booster.backward(loss, optimizer)
optimizer.step()
optimizer.zero_grad(set_to_none=True)
torch.cuda.synchronize()
end, end_wall = time_pair()
print(
f"rank {rank} iter {step} time: {end - start:.2f} wall: {end_wall - start_wall:.2f}\n",
end="",
)
duras.append(end - start)
duras_wall.append(end_wall - start_wall)

peak_vram_mib = torch.cuda.max_memory_allocated(device) / (1024 * 1024)
if len(duras) > 1:
avg_time = sum(duras[1:]) / (len(duras) - 1)
avg_wall = sum(duras_wall[1:]) / (len(duras_wall) - 1)
elif len(duras) == 1:
avg_time = duras[0]
avg_wall = duras_wall[0]
else:
avg_time = 0.0
avg_wall = 0.0
print(f"Rank {rank} Time: {duras} Avg Time: {avg_time:.2f}\n", end="")
print(f"Rank {rank} Peak: {peak_vram_mib:<.2f}MiB\n", end="")
print(f"Rank {rank} Wall: {duras_wall} Avg Wall: {avg_wall:.2f}\n", end="")

if dist.is_initialized():
dist.destroy_process_group()


if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--num_layers", type=int, default=32)
parser.add_argument("--hidden_size", type=int, default=4096)
parser.add_argument("--ffn_hidden_size", type=int, default=11008)
parser.add_argument("--num_attention_heads", type=int, default=32)
parser.add_argument("--vocab_size", type=int, default=32000)
parser.add_argument("--sequence_length", type=int, default=4096)
parser.add_argument("--micro_batch_size", type=int, default=1)
parser.add_argument("--iterations", type=int, default=4)
args = parser.parse_args()
enable_function_tracer()
try:
main(
num_layers=args.num_layers,
hidden_size=args.hidden_size,
ffn_hidden_size=args.ffn_hidden_size,
num_attention_heads=args.num_attention_heads,
vocab_size=args.vocab_size,
seq_len=args.sequence_length,
micro_batch_size=args.micro_batch_size,
iterations=args.iterations,
)
finally:
disable_function_tracer()