-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtrain_pipeline.py
More file actions
160 lines (128 loc) · 4.23 KB
/
train_pipeline.py
File metadata and controls
160 lines (128 loc) · 4.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import random
import deepspeed
import numpy as np
import torch
from datasets import load_from_disk
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from data_obj import ModelArgs, ProgramArgs, TrainArgs
from data_obj.train_args import TrainArgs
from sf_trainer import SFTrainer
from utils import (build_logger, convert_batch_to_ids, count_parameters,
get_args, prepare_tokenizer)
def set_random_seed(seed):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
seed = 168
set_random_seed(seed)
def fix_grad(model, logger):
for param in model.parameters():
param.requires_grad = False
for i in range(-2, 0):
for param in model.blocks[i].parameters():
param.requires_grad = True
for param in model.ln.parameters():
param.requires_grad = True
param_grad_info = ['']
for name, module in model.named_modules():
for param_name, param in module.named_parameters():
param_grad_info.append(
f"Layer: {name} Parameter: {param_name} requires_grad: {param.requires_grad}")
logger.info('\n'.join(param_grad_info))
class SFTrainerForPipeline(SFTrainer):
def validate_batch(self, batch):
return self.model.forward(*batch).item()
def main(
prog_args: ProgramArgs,
model_args: ModelArgs,
train_args: TrainArgs
):
logger = build_logger(
train_args.deepspeed_ckpt_tag,
prog_args.log_path,
local_rank=train_args.local_rank,
)
tkn, VOCAB_SIZE = prepare_tokenizer(prog_args.tokenizer_path)
from models import SFLLM
base_model = SFLLM(
vocab_size=VOCAB_SIZE,
pad_token_id=tkn.pad_token_id,
args=model_args,
)
use_torch_ckpt = SFTrainer.validate_ckpt(
train_args.torch_ckpt_home,
train_args.torch_ckpt_tag
)
if use_torch_ckpt:
SFTrainer.load_ckpt(
train_args,
base_model,
None,
logger
)
pipe_model, loss_fn = base_model.pipeline_and_loss_fn()
param_num = count_parameters(pipe_model) * 1e-9
logger.info('Model parameters: %f B', param_num)
pipe_model = deepspeed.PipelineModule(
layers=pipe_model,
num_stages=train_args.world_size,
loss_fn=loss_fn,
)
model_engine, opt = deepspeed.initialize(
model=pipe_model,
config=prog_args.deepspeed_cfg,
model_parameters=[p for p in pipe_model.parameters()
if p.requires_grad],
)[:2]
use_ds_ckpt = SFTrainer.validate_ckpt(
train_args.deepspeed_ckpt_home,
train_args.deepspeed_ckpt_tag
)
if not use_torch_ckpt and use_ds_ckpt:
SFTrainer.load_ckpt(
train_args,
model_engine,
None,
logger
)
_, get_micro_batch_size, get_grad_accum_steps = model_engine.get_batch_info()
train_args.batch_size = get_micro_batch_size()
train_args.grad_accum_period = get_grad_accum_steps()
def collate_fn(batch): return convert_batch_to_ids(
tkn,
[line['text'] for line in batch],
max_len=model_args.max_len,
ext_factor=model_args.ext_factor,
device=model_engine.device
)
def build_data_loader(dataset): return DataLoader(
dataset=dataset,
batch_size=train_args.batch_size,
collate_fn=collate_fn
)
train_loader = build_data_loader(load_from_disk(prog_args.train_path))
validate_loader = build_data_loader(
load_from_disk(prog_args.validate_path).shuffle(
seed=train_args.start_batch))
tb_writer = SummaryWriter(log_dir=prog_args.tensorboard_path)
assert train_args.pipeline
trainer = SFTrainerForPipeline(
train_args=train_args,
model=model_engine,
opt=opt,
train_loader=train_loader,
validate_loader=validate_loader,
logger=logger,
tb_writer=tb_writer,
)
trainer.train()
if __name__ == '__main__':
prog_args, model_args, train_args = get_args()
deepspeed.init_distributed(dist_backend='nccl')
main(
prog_args=prog_args,
model_args=model_args,
train_args=train_args,
)