-
Notifications
You must be signed in to change notification settings - Fork 12
/
inference.py
124 lines (104 loc) · 3.58 KB
/
inference.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# -*- coding: utf-8 -*-
import os
import nltk
import paddle
import paddle.nn.functional as F
from paddlenlp.data import Tuple, Pad
from paddlenlp.transformers import AutoModelForSequenceClassification, AutoTokenizer
# load the model
if paddle.device.is_compiled_with_cuda():
paddle.set_device('gpu')
else:
paddle.set_device('cpu')
base_dir = os.path.dirname(os.path.abspath(__file__))
model_dir = os.path.join(base_dir, 'pretrained_model')
label_file = os.path.join(base_dir, 'data/label.tsv')
label_list = [ele.strip() for ele in open(label_file).readlines()]
model = AutoModelForSequenceClassification.from_pretrained(model_dir)
tokenizer = AutoTokenizer.from_pretrained(model_dir)
max_seq_length = 512
batch_size = 32
# set nltk data path
nltk_data_path = os.path.join(base_dir, 'nltk_data')
nltk.data.path.append(nltk_data_path)
ATTACK_TACTICS_MAP = {
'reconnaissance': 'Reconnaissance',
'resource-development': 'Resource Development',
'initial-access': 'Initial Access',
'execution': 'Execution',
'persistence': 'Persistence',
'privilege-escalation': 'Privilege Escalation',
'defense-evasion': 'Defense Evasion',
'credential-access': 'Credential Access',
'discovery': 'Discovery',
'lateral-movement': 'Lateral Movement',
'collection': 'Collection',
'command-and-control': 'Command and Control',
'exfiltration': 'Exfiltration',
'impact': 'Impact'
}
@paddle.no_grad()
def predict_text(text):
"""
Predicts the data labels.
Args:
text (obj:`str`): text to infer TTPs from.
"""
if not isinstance(text, str):
return text, None
if not text:
return text, [{
'sent': text,
'tts': []
}]
# splits text to sentences
sentences = nltk.sent_tokenize(text)
examples = []
for sent in sentences:
result = tokenizer(text=sent, max_seq_len=max_seq_length)
examples.append((result['input_ids'], result['token_type_ids']))
# separates data into some batches.
batches = [
examples[i:i + batch_size]
for i in range(0, len(examples), batch_size)
]
batchify_fn = lambda samples, fn=Tuple(
Pad(axis=0, pad_val=tokenizer.pad_token_id), # input
Pad(axis=0, pad_val=tokenizer.pad_token_type_id), # segment
): fn(samples)
preds = []
model.eval()
for batch in batches:
input_ids, token_type_ids = batchify_fn(batch)
input_ids = paddle.to_tensor(input_ids)
token_type_ids = paddle.to_tensor(token_type_ids)
logits = model(input_ids, token_type_ids)
probs = F.sigmoid(logits).numpy()
confidence = [] # you can adjust the score value for acceptance, even for each technique prediction
for prob in probs:
labels = []
for i, p in enumerate(prob):
if p > 0.5:
labels.append([i, p])
preds.append(labels)
res = []
for idx, sent in enumerate(sentences):
labels = []
for pred, score in preds[idx]:
tt = label_list[pred]
ta_n, ta_id, te_name, te_id = tt.split('_')
ta_n = ATTACK_TACTICS_MAP[ta_n]
if len(te_id) > 6: # to distinguish sub-techniques
te_id = te_id[:-3] + "." + te_id[-3:]
labels.append({
"tactic_name": ta_n,
"tactic_id": ta_id,
"technique_name": te_name,
"technique_id": te_id,
"score": score
})
res.append({
"sent": sent,
"tts": labels
})
return text, res