-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathvm-automation.py
302 lines (264 loc) · 11.9 KB
/
vm-automation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
import hashlib
import logging
import os
import random
import re
import string
import subprocess
import sys
import threading
import time
# =================== Options ===================
# List of VMs and snapshots to use
vms_list = ['w10_1903_x64', 'w10_1903_x86']
snapshots_list = ['live']
# Path to 'vboxmanage' executable
vboxmanage_path = 'vboxmanage'
# 'gui' or 'headless'
vm_gui = 'gui'
# Login and password for guest OS
vm_guest_username = 'user'
vm_guest_password = 'P@ssw0rd'
# Where to put test file
remote_folder = 'C:\\Users\\user\\Desktop\\'
# 'on' to enable; 'off' to disable; anything else to keep original network state
vm_network_state = 'keep'
# Guest screen resolution ('Width Height Depth')
vm_guest_resolution = '1024 768 32'
# Script/applications to run before and after main file execution
# Specify full name for applications ('calc.exe', not 'calc')
preexec = ''
# preexec = 'C:\\preexec.cmd'
postexec = ''
# postexec = 'C:\\postexec.cmd'
# Timeout both for commands and VM
timeout = 60
# Calculate hash of input file
calculate_hash = 1
# Show search links to VirusTotal and Google. This will enable 'calculate_hash' too, if not set
show_links = 1
# Logging options
logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s', level=logging.DEBUG)
# ===============================================
logger = logging.getLogger('vm-automation')
# Check for command line args and file
args = ''.join(sys.argv[1:])
if args in ['-h', '--help', '/?'] or not args:
print(f'Usage: {args} path_to_file')
exit()
elif os.path.isfile(args):
local_file = args
local_file_extension = re.findall('\\.\\w+$', local_file)
else:
print(f'File "{args}" does not exist. Exiting.')
exit()
# Wrapper for vboxmanage command
def vboxmanage(cmd):
cmd = f'{vboxmanage_path} {cmd}'.split()
logging.debug(f'Executing command: {cmd}')
try:
result = subprocess.run(cmd, capture_output=True, timeout=timeout, text=True)
return [result.returncode, result.stdout, result.stderr]
except FileNotFoundError:
logging.critical(f'Vboxmanage path is incorrect. Exiting.')
exit()
# VirtualBox version
def vm_version():
result = vboxmanage('--version')
return result[1].rstrip()
# Start virtual machine
def vm_start(vm_name, snapshot_name):
if vm_gui in ['gui', 'sdl', 'headless', 'separate']:
vm_start_type = vm_gui
else:
vm_start_type = 'gui'
logging.info(f'{vm_name}({snapshot_name}): Starting VM')
result = vboxmanage(f'startvm {vm_name} --type {vm_start_type}')
if result[0] == 0:
logging.info(f'{vm_name}({snapshot_name}): VM started')
else:
logging.error(f'{vm_name}({snapshot_name}): Error while starting VM. Code: {result[0]}')
logging.debug(f'{vm_name}({snapshot_name}): stderr: {result[2]}')
exit()
# Stop virtual machine
def vm_stop(vm_name, snapshot_name):
logging.info(f'{vm_name}({snapshot_name}): Stopping VM')
result = vboxmanage(f'controlvm {vm_name} poweroff')
if result[0] == 0:
logging.debug(f'{vm_name}({snapshot_name}): VM stopped')
time.sleep(3)
elif result[0] == 1:
logging.debug(f'{vm_name}({snapshot_name}): VM not running')
else:
logging.error(f'{vm_name}({snapshot_name}): Unknown error: {result[0]}')
logging.debug(f'{vm_name}({snapshot_name}): stderr: {result[2]}')
# Restore snapshot for virtual machine
def vm_restore(vm_name, snapshot_name):
logging.info(f'{vm_name}({snapshot_name}): Restoring snapshot')
result = vboxmanage(f'snapshot {vm_name} restore {snapshot_name}')
if result[0] == 0:
logging.debug(f'{vm_name}({snapshot_name}): Snapshot restored')
time.sleep(3)
else:
logging.error(f'{vm_name}({snapshot_name}): Error while restoring snapshot. Code: {result[0]}')
logging.debug(f'{vm_name}({snapshot_name}): stderr: {result[2]}')
# Change network link state
def vm_network(vm_name, snapshot_name, link_state):
if link_state not in ['on', 'off']:
logging.error(f'{vm_name}({snapshot_name}): link_state should be "on" or "off"')
exit()
logging.info(f'{vm_name}({snapshot_name}): Setting network parameters to {link_state}')
result = vboxmanage(f'controlvm {vm_name} setlinkstate1 off')
if result[0] == 0:
logging.debug(f'{vm_name}({snapshot_name}): NIC state changed')
else:
logging.error(f'{vm_name}({snapshot_name}): Unable to change NIC state. Code: {result[0]}')
logging.debug(f'{vm_name}({snapshot_name}): stderr: {result[2]}')
# Control screen resolution
def vm_setres(vm_name, snapshot_name, screen_resolution):
logging.info(f'{vm_name}({snapshot_name}): Changing screen resolution for VM')
result = vboxmanage(f'controlvm {vm_name} setvideomodehint {screen_resolution}')
if result[0] == 0:
logging.debug(f'{vm_name}({snapshot_name}): Screen resolution changed')
else:
logging.error(f'{vm_name}({snapshot_name}): Unable to change screen resolution. Code: {result[0]}')
logging.debug(f'{vm_name}({snapshot_name}): stderr: {result[2]}')
# Execute file/command on VM
def vm_exec(vm_name, snapshot_name, vm_guest_username, vm_guest_password, remote_file):
logging.info(f'{vm_name}({snapshot_name}): Executing file {remote_file}')
_ = 0
while _ < timeout:
result = vboxmanage(
f'guestcontrol {vm_name} --username {vm_guest_username} --password {vm_guest_password} start {remote_file}')
if result[0] == 0:
logging.debug(f'{vm_name}({snapshot_name}): File executed successfully')
break
else:
# Waiting for VM to start
time.sleep(1)
_ += 1
if _ >= timeout:
logging.error(f'{vm_name}({snapshot_name}): Timeout while executing file on VM')
exit()
# Copy file to VM
def vm_copyto(vm_name, snapshot_name, vm_guest_username, vm_guest_password, local_file, remote_file):
_ = 0
while _ < timeout:
logging.info(f'{vm_name}({snapshot_name}): Uploading file {local_file} as {remote_file} to VM')
result = vboxmanage(
f'guestcontrol {vm_name} --username {vm_guest_username} --password {vm_guest_password} copyto {local_file} {remote_file}')
if result[0] == 0:
logging.debug(f'{vm_name}({snapshot_name}): File uploaded')
break
else:
logging.error(f'{vm_name}({snapshot_name}): Error while uploading file. Code: {result[0]}')
logging.debug(f'{vm_name}({snapshot_name}): stderr: {result[2]}')
time.sleep(1)
_ += 1
if _ >= timeout:
logging.error(f'{vm_name}({snapshot_name}): Timeout while waiting for VM')
exit()
# Copy file from VM
def vm_copyfrom(vm_name, snapshot_name, vm_guest_username, vm_guest_password, local_file, remote_file):
logging.info(f'{vm_name}({snapshot_name}): Downloading file {remote_file} from VM as {local_file}')
result = vboxmanage(
f'guestcontrol {vm_name} --username {vm_guest_username} --password {vm_guest_password} copyfrom {remote_file} {local_file}')
if result[0] == 0:
logging.debug(f'{vm_name}({snapshot_name}): File downloaded')
else:
logging.error(f'{vm_name}({snapshot_name}): Error while downloading file. Code: {result[0]}')
logging.debug(f'{vm_name}({snapshot_name}): stderr: {result[2]}')
# Take screenshot
def vm_screenshot(vm_name, snapshot_name, image_id=1):
screenshot_name = f'{vm_name}_{snapshot_name}_{image_id}.png'
logging.info(f'{vm_name}({snapshot_name}): Taking screenshot {screenshot_name}')
result = vboxmanage(f'controlvm {vm_name} screenshotpng {screenshot_name}')
if result[0] == 0:
logging.debug(f'{vm_name}({snapshot_name}): Screenshot created')
else:
logging.error(f'{vm_name}({snapshot_name}): Unable to take screenshot')
logging.debug(f'{vm_name}({snapshot_name}): stderr: {result[2]}')
image_id += 1
return image_id
# Main routines
def main_routine(vm_name, snapshots_list):
for snapshot_name in snapshots_list:
logging.info(f'{vm_name}({snapshot_name}): Task started')
# Stop VM, if already running, restore snapshot and start VM
vm_stop(vm_name, snapshot_name)
vm_restore(vm_name, snapshot_name)
vm_start(vm_name, snapshot_name)
time.sleep(3) # Just to make sure VM is alive
# Set guest resolution
if vm_guest_resolution:
vm_setres(vm_name, snapshot_name, vm_guest_resolution)
else:
logging.debug(f'{vm_name}({snapshot_name}): vm_guest_resolution not set')
# Set guest network state
if vm_network_state == 'on':
logging.debug(f'{vm_name}({snapshot_name}): Enabling network for guest')
vm_network(vm_name, snapshot_name, 'on')
elif vm_network_state == 'off':
logging.debug(f'{vm_name}({snapshot_name}): Disabling network for guest')
vm_network(vm_name, snapshot_name, 'off')
else:
logging.debug(f'{vm_name}({snapshot_name}): Keeping original network state')
# Generate random file name
if local_file_extension:
logging.debug(f'{vm_name}({snapshot_name}): Extension obtained from original file')
file_extension = local_file_extension
else:
logging.debug(f'{vm_name}({snapshot_name}): Unable to obtain file extension. Assuming *.exe')
file_extension = '.exe'
random_name = ''.join(random.choice(string.ascii_letters) for _ in range(random.randint(4, 20)))
remote_file = remote_folder + random_name + ''.join(file_extension)
# Run preexec script
if preexec:
vm_exec(vm_name, snapshot_name, vm_guest_username, vm_guest_password, preexec)
else:
logging.debug(f'{vm_name}({snapshot_name}): preexec not set')
# Upload file to VM; take screenshot; start file; take screenshot; sleep 2 seconds; take screenshot;
# wait for {timeout/2} seconds; take screenshot; wait for {timeout/2} seconds; take screenshot
vm_copyto(vm_name, snapshot_name, vm_guest_username, vm_guest_password, local_file, remote_file)
screenshot = vm_screenshot(vm_name, snapshot_name)
vm_exec(vm_name, snapshot_name, vm_guest_username, vm_guest_password, remote_file)
screenshot = vm_screenshot(vm_name, snapshot_name, screenshot)
time.sleep(2)
screenshot = vm_screenshot(vm_name, snapshot_name, screenshot)
time.sleep(timeout / 2)
screenshot = vm_screenshot(vm_name, snapshot_name, screenshot)
time.sleep(timeout / 2)
screenshot = vm_screenshot(vm_name, snapshot_name, screenshot)
# Run postexec script
if postexec:
vm_exec(vm_name, snapshot_name, vm_guest_username, vm_guest_password, postexec)
else:
logging.debug(f'{vm_name}({snapshot_name}): postexec not set')
# Stop VM, restore snapshot
vm_stop(vm_name, snapshot_name)
vm_restore(vm_name, snapshot_name)
logging.info(f'{vm_name}({snapshot_name}): Task finished')
# Show general info
logging.info(f'VirtualBox version: {vm_version()}; Script version: 0.3.2\n')
logging.info(f'VMs: {vms_list}')
logging.info(f'Snapshots: {snapshots_list}\n')
logging.info(f'File: "{local_file}"')
if calculate_hash or show_links:
file_hash = hashlib.sha256()
block_size = 65536
with open(local_file, 'rb') as f:
fb = f.read(block_size)
while len(fb) > 0:
file_hash.update(fb)
fb = f.read(block_size)
sha256sum = file_hash.hexdigest()
logging.info(f'sha256: {sha256sum}')
if show_links:
logging.info(f'Search VT: https://www.virustotal.com/gui/file/{sha256sum}/detection')
logging.info(f'Search Google: https://www.google.com/search?q={sha256sum}\n')
time.sleep(1)
# Start threads
for vm_name in vms_list:
t = threading.Thread(target=main_routine, args=(vm_name, snapshots_list))
t.start()
time.sleep(3) # Delay before starting next VM