-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgenerate.py
More file actions
115 lines (104 loc) · 4.49 KB
/
generate.py
File metadata and controls
115 lines (104 loc) · 4.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import torch
import torch.nn.functional as F
def top_k_top_p_filtering(logits, temperature=1.0,
top_k=0, top_p=0.0):
# filter by top-k min score
if top_k > 0:
# Remove all tokens with a less than top-k probability
top_k_scores = torch.topk(logits, top_k).values
indices_to_remove = (logits < top_k_scores[..., -1])
logits[indices_to_remove] = -float("inf")
# sample by probs
if top_p > 0.0:
batch_size = logits.size()[0]
for b in range(len(logits)):
sorted_logits, sorted_idx = torch.sort(logits[b], descending=True)
den_probs = F.softmax(sorted_logits / temperature, dim=-1)
cum_probs = torch.cumsum(den_probs, dim=-1)
# remove a dim X if it associates to a draw with P(X) <= top_p
indices_to_remove = cum_probs > top_p
# shift the indices to ensure at least we keep the first token
indices_to_remove[1:] = indices_to_remove[:-1].clone()
indices_to_remove[0] = False
# remove the tail
indices_to_remove = sorted_idx[indices_to_remove]
logits[b][indices_to_remove] = -float("inf")
return logits
class Generater:
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
def get_device(self):
return next(self.model.parameters()).device
def generate(self, text_list, **kwargs):
tokens = self.tokenizer(text_list, return_tensors='pt')
tokens.to(self.get_device())
with torch.no_grad():
result = self._generate(tokens, **kwargs)
return result
def _generate(self, inputs, debug=False,
max_new_tokens=128, top_k=40, top_p=0.9, temperature=0.9,
**kwargs):
# length + 1 for EOS token
max_new_tokens += 1
# initially, input_ids and attention_mask is the inputs length
input_ids = inputs["input_ids"]
attention_mask = inputs["attention_mask"]
# short-hand constants
batch_size = input_ids.size(0)
start_idx = input_ids.size(-1)
# updating outer variables
past_caches = None # past key/value caches
done = [False for _ in range(batch_size)]
results = [None for _ in range(batch_size)]
# generate token by token ...
for i in range(max_new_tokens):
if i == 0:
# start with all input_ids and all attention_mask
logits, past_caches = self.model(
input_ids=input_ids,
attention_mask=attention_mask,
past_caches=None,
use_cache=True,
timestep=0
)
else:
# use *the last* input_ids but all attention_mask
logits, past_caches = self.model(
input_ids=input_ids[:, -1:],
attention_mask=attention_mask,
past_caches=past_caches,
use_cache=True,
timestep=i
)
# get the last-token logits
logits = logits[:, -1, :]
# filter next tokens
logits = top_k_top_p_filtering(logits,
temperature=temperature, top_k=top_k, top_p=top_p)
# sample the next token from the filtered distribution
probs = F.softmax(logits / temperature, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
if debug is True:
print(f'Generating token#{i}: ',
list(map(self.tokenizer.decode, next_token)))
# check termination, dealing each batch individually
for b in range(batch_size):
next_tok_of_b = next_token[b].item()
if not done[b] and (
next_tok_of_b == self.tokenizer.eos_token_id or
i == max_new_tokens - 1
):
done[b] = True
results[b] = input_ids[b, start_idx:].tolist()
if sum(done) == batch_size:
break
# in-place appending input_ids and attention_mask!!!
input_ids = torch.cat([input_ids, next_token], dim=-1)
attention_mask = torch.cat([
attention_mask, torch.ones((batch_size, 1),
dtype=torch.int, device=attention_mask.device
)], dim=-1,
)
result_text = list(map(self.tokenizer.decode, results))
return result_text