-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodels.py
More file actions
118 lines (109 loc) · 4.18 KB
/
models.py
File metadata and controls
118 lines (109 loc) · 4.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import torch
import time
import requests
import torch.nn.functional as F
import lightgbm as lgb
import numpy as np
import subprocess
import json
from ember import predict_sample
from MalConv import MalConv
import sys
class MalConvModel(object):
def __init__(self, model_path, thresh=0.5, name='malconv'):
self.model = MalConv(channels=256, window_size=512, embd_size=8).train()
weights = torch.load(model_path,map_location='cpu')
self.model.load_state_dict( weights['model_state_dict'])
self.thresh = thresh
self.__name__ = name
def get_score(self, file_path):
try:
with open(file_path, 'rb') as fp:
bytez = fp.read(2000000) # read the first 2000000 bytes
_inp = torch.from_numpy( np.frombuffer(bytez,dtype=np.uint8)[np.newaxis,:] )
with torch.no_grad():
outputs = F.softmax( self.model(_inp), dim=-1)
return outputs.detach().numpy()[0,1]
except Exception as e:
print(e)
return 0.0
def is_evasive(self, file_path):
score = self.get_score(file_path)
#print(os.path.basename(file_path), score)
return score < self.thresh
#class EmberModel_gym(object): # model in gym-malware
# # ember_threshold = 0.8336 # resulting in 1% FPR
# def __init__(self, model_path, thresh=0.9, name='ember'): # 0.9 or 0.8336
# # load lightgbm model
# self.local_model = joblib.load(model_path)
# self.thresh = thresh
# self.__name__ = 'ember'
#
# def get_score(self, file_path):
# with open(file_path, 'rb') as fp:
# bytez = fp.read()
# #return predict_sample(self.model, bytez) > self.thresh
# features = feature_extractor.extract( bytez )
# score = local_model.predict_proba( features.reshape(1,-1) )[0,-1]
# return score
#
# def is_evasive(self, file_path):
# score = self.get_score(file_path)
# return score < self.thresh
class EmberModel_2019(object): # model in MLSEC 2019
def __init__(self, model_path, thresh=0.8336, name='ember'):
# load lightgbm model
self.model = lgb.Booster(model_file=model_path)
self.thresh = thresh
self.__name__ = 'ember'
def get_score(self,file_path):
with open(file_path, 'rb') as fp:
bytez = fp.read()
score = predict_sample(self.model, bytez)
return score
def is_evasive(self, file_path):
score = self.get_score(file_path)
return score < self.thresh
#class EmberModel_2020(object): # model in MLSEC 2020
# '''Implements predict(self, bytez)'''
# def __init__(self,
# name: str = 'ember_MLSEC202H0',
# thresh=0.8336):
# self.thresh = thresh
# self.__name__ = name
#
# def get_score(self, file_path):
# with open(file_path, 'rb') as fp:
# bytez = fp.read()
# url = 'http://127.0.0.1:8080/'
# timeout = 5
# error_msg = None
# res = None
# start = time.time()
# try:
# res = self.get_raw_result(bytez, url, timeout)
# score = res.json()['score']
# except (requests.RequestException, KeyError, json.decoder.JSONDecodeError) as e:
# score = 1.0 # timeout or other error results in malicious
# error_msg = str(e)
# if res:
# error_msg += f'-{res.text()}'
# return score
#
# def is_evasive(self, file_path):
# score = self.get_score(file_path)
# return score < self.thresh
#
# def get_raw_result(self, bytez, url, timeout):
# return requests.post(url, data=bytez, headers={'Content-Type': 'application/octet-stream'}, timeout=timeout)
class ClamAV(object):
def is_evasive(self, file_path):
res = subprocess.run(['clamdscan', '--fdpass', file_path], stdout=subprocess.PIPE)
#print(res.stdout)
if 'FOUND' in str(res.stdout):
return False
elif 'OK' in str(res.stdout):
return True
else:
print('clamav error')
exit()