-
Notifications
You must be signed in to change notification settings - Fork 21
/
filecheck.py
873 lines (780 loc) · 38.3 KB
/
filecheck.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import mimetypes
import shlex
import subprocess
import zipfile
import argparse
import random
import shutil
import time
import hashlib
from pathlib import Path
from typing import Dict, List, Tuple, Callable, Optional, Union
import oletools.oleid # type: ignore
import olefile # type: ignore
import officedissector # type: ignore
import warnings
import exifread # type: ignore
from PIL import Image # type: ignore
from pdfid import PDFiD, cPDFiD # type: ignore
from kittengroomer import FileBase, KittenGroomerBase, Logging
class Config:
"""Configuration information for filecheck.py."""
# MIMES
# Application subtypes (mimetype: 'application/<subtype>')
mimes_ooxml: Tuple[str, ...] = ('vnd.openxmlformats-officedocument.',)
mimes_office: Tuple[str, ...] = ('msword', 'vnd.ms-',)
mimes_libreoffice: Tuple[str, ...] = ('vnd.oasis.opendocument',)
mimes_rtf: Tuple[str, ...] = ('rtf', 'richtext',)
mimes_pdf: Tuple[str, ...] = ('pdf', 'postscript',)
mimes_xml: Tuple[str, ...] = ('xml',)
mimes_csv: Tuple[str, ...] = ('csv', 'text/csv')
mimes_ms: Tuple[str, ...] = ('dosexec',)
mimes_compressed: Tuple[str, ...] = ('zip', 'rar', 'x-rar', 'bzip2', 'lzip', 'lzma', 'lzop',
'xz', 'compress', 'gzip', 'tar',)
mimes_data: Tuple[str, ...] = ('octet-stream',)
mimes_audio: Tuple[str, ...] = ('ogg',)
# Image subtypes
mimes_exif: Tuple[str, ...] = ('image/jpeg', 'image/tiff',)
mimes_png: Tuple[str, ...] = ('image/png',)
# Mimetypes with metadata
mimes_metadata: Tuple[str, ...] = ('image/jpeg', 'image/tiff', 'image/png',)
# Mimetype aliases
aliases: Dict[str, Union[str, List[str]]] = {
# Win executables
'application/x-msdos-program': 'application/x-dosexec',
'application/x-dosexec': 'application/x-msdos-program',
# Other apps with confusing mimetypes
'application/rtf': 'text/rtf',
'application/rar': 'application/x-rar',
'application/vnd.rar': 'application/x-rar',
'application/ogg': 'audio/ogg',
'audio/ogg': 'application/ogg'
}
# Mime Type / Extension fix. TODO: Doesn't quite work....????
mimetypes.add_type('text/plain', '.csv', False)
mimetypes.add_type('text/csv', '.csv', False)
mimetypes.add_type('application/vnd.apple.numbers', '.numbers', True)
mimetypes.add_type('application/vnd.apple.pages', '.pages', False)
mimetypes.add_type('application/vnd.apple.keynote', '.keynote', False)
# EXTS
# Commonly used malicious extensions
# Sources: http://www.howtogeek.com/137270/50-file-extensions-that-are-potentially-dangerous-on-windows/
# https://github.com/wiregit/wirecode/blob/master/components/core-settings/src/main/java/org/limewire/core/settings/FilterSettings.java
malicious_exts: Tuple[str, ...] = (
# Applications
".exe", ".pif", ".application", ".gadget", ".msi", ".msp", ".com", ".scr",
".hta", ".cpl", ".msc", ".jar",
# Scripts
".bat", ".cmd", ".vb", ".vbs", ".vbe", ".js", ".jse", ".ws", ".wsf",
".wsc", ".wsh", ".ps1", ".ps1xml", ".ps2", ".ps2xml", ".psc1", ".psc2",
".msh", ".msh1", ".msh2", ".mshxml", ".msh1xml", ".msh2xml",
# Shortcuts
".scf", ".lnk", ".inf",
# Other
".reg", ".dll",
# Office macro (OOXML with macro enabled)
".docm", ".dotm", ".xlsm", ".xltm", ".xlam", ".pptm", ".potm", ".ppam",
".ppsm", ".sldm",
# banned from wirecode
".asf", ".asx", ".au", ".htm", ".html", ".mht", ".vbs",
".wax", ".wm", ".wma", ".wmd", ".wmv", ".wmx", ".wmz", ".wvx",
# Google chrome malicious extensions
".ad", ".ade", ".adp", ".ah", ".apk", ".app", ".application", ".asp",
".asx", ".bas", ".bash", ".bat", ".cfg", ".chi", ".chm", ".class",
".cmd", ".com", ".command", ".crt", ".crx", ".csh", ".deb", ".dex",
".dll", ".drv", ".exe", ".fxp", ".grp", ".hlp", ".hta", ".htm", ".html",
".htt", ".inf", ".ini", ".ins", ".isp", ".jar", ".jnlp", ".user.js",
".js", ".jse", ".ksh", ".lnk", ".local", ".mad", ".maf", ".mag", ".mam",
".manifest", ".maq", ".mar", ".mas", ".mat", ".mau", ".mav", ".maw",
".mda", ".mdb", ".mde", ".mdt", ".mdw", ".mdz", ".mht", ".mhtml", ".mmc",
".mof", ".msc", ".msh", ".mshxml", ".msi", ".msp", ".mst", ".ocx", ".ops",
".pcd", ".pif", ".pkg", ".pl", ".plg", ".prf", ".prg", ".pst", ".py",
".pyc", ".pyw", ".rb", ".reg", ".rpm", ".scf", ".scr", ".sct", ".sh",
".shar", ".shb", ".shs", ".shtm", ".shtml", ".spl", ".svg", ".swf", ".sys",
".tcsh", ".url", ".vb", ".vbe", ".vbs", ".vsd", ".vsmacros", ".vss",
".vst", ".vsw", ".ws", ".wsc", ".wsf", ".wsh", ".xbap", ".xht", ".xhtm",
".xhtml", ".xml", ".xsl", ".xslt", ".website", ".msh1", ".msh2", ".msh1xml",
".msh2xml", ".ps1", ".ps1xml", ".ps2", ".ps2xml", ".psc1", ".psc2", ".xnk",
".appref-ms", ".gadget", ".efi", ".fon", ".partial", ".svg", ".xml",
".xrm_ms", ".xsl", ".action", ".bin", ".inx", ".ipa", ".isu", ".job",
".out", ".pad", ".paf", ".rgs", ".u3p", ".vbscript", ".workflow", ".001",
".ace", ".arc", ".arj", ".b64", ".balz", ".bhx", ".cab", ".cpio", ".fat",
".hfs", ".hqx", ".iso", ".lha", ".lpaq1", ".lpaq5", ".lpaq8", ".lzh",
".mim", ".ntfs", ".paq8f", ".paq8jd", ".paq8l", ".paq8o", ".pea", ".quad",
".r00", ".r01", ".r02", ".r03", ".r04", ".r05", ".r06", ".r07", ".r08",
".r09", ".r10", ".r11", ".r12", ".r13", ".r14", ".r15", ".r16", ".r17",
".r18", ".r19", ".r20", ".r21", ".r22", ".r23", ".r24", ".r25", ".r26",
".r27", ".r28", ".r29", ".squashfs", ".swm", ".tpz", ".txz", ".tz", ".udf",
".uu", ".uue", ".vhd", ".vmdk", ".wim", ".wrc", ".xar", ".xxe", ".z",
".zipx", ".zpaq", ".cdr", ".dart", ".dc42", ".diskcopy42", ".dmg",
".dmgpart", ".dvdr", ".img", ".imgpart", ".ndif", ".smi", ".sparsebundle",
".sparseimage", ".toast", ".udif",
)
# Sometimes, mimetypes.guess_type gives unexpected results, such as for .tar.gz files:
# In [12]: mimetypes.guess_type('toot.tar.gz', strict=False)
# Out[12]: ('application/x-tar', 'gzip')
# It works as expected if you do mimetypes.guess_type('application/gzip', strict=False)
override_ext: Dict[str, str] = {'.gz': 'application/gzip',
'.csv': 'text/csv', # ,'text/plain' )
'.numbers': 'application/vnd.apple.numbers', # ,'application/zip')
'.pages': 'application/vnd.apple.pages', # ,'application/zip')
'.keynote': 'application/vnd.apple.keynote' # ,'application/zip')
}
SEVENZ_PATH = '/usr/bin/7z'
class File(FileBase):
"""
Main file object
Created for each file that is processed by KittenGroomer. Contains all
filetype-specific processing methods.
"""
def __init__(self, src_path: Path, dst_path: Path):
super(File, self).__init__(src_path, dst_path)
self.is_archive: bool = False
self.tempdir_path: Path = Path(str(self.dst_path) + '_temp')
subtypes_apps: Tuple[Tuple[Tuple[str, ...], Callable], ...] = (
(Config.mimes_office, self._winoffice),
(Config.mimes_ooxml, self._ooxml),
(Config.mimes_rtf, self.text),
(Config.mimes_libreoffice, self._libreoffice),
(Config.mimes_pdf, self._pdf),
(Config.mimes_xml, self.text),
(Config.mimes_csv, self.text),
(Config.mimes_ms, self._executables),
(Config.mimes_compressed, self._archive),
(Config.mimes_data, self._binary_app),
(Config.mimes_audio, self.audio)
)
self.app_subtype_methods: Dict[str, Callable] = self._make_method_dict(subtypes_apps)
types_metadata: Tuple[Tuple[Tuple[str, ...], Callable], ...] = (
(Config.mimes_exif, self._metadata_exif),
(Config.mimes_png, self._metadata_png),
)
self.metadata_mimetype_methods: Dict[str, Callable] = self._make_method_dict(types_metadata)
self.mime_processing_options: Dict[str, Callable] = {
'text': self.text,
'audio': self.audio,
'image': self.image,
'video': self.video,
'application': self.application,
'example': self.example,
'message': self.message,
'model': self.model,
'multipart': self.multipart,
'inode': self.inode,
}
def __repr__(self):
return "<filecheck.File object: {{{}}}>".format(self.filename)
def _check_extension(self):
"""
Guess the file's mimetype based on its extension.
If the file's mimetype (as determined by libmagic) is contained in
the `mimetype` module's list of valid mimetypes and the expected
mimetype based on its extension differs from the mimetype determined
by libmagic, then mark the file as dangerous.
"""
if not self.has_extension:
self.make_dangerous('File has no extension')
else:
if self.extension in Config.override_ext:
expected_mimetypes = Config.override_ext[self.extension]
encoding = None
self.mimetype = expected_mimetypes
else:
expected_mimetype, encoding = mimetypes.guess_type(str(self.src_path),
strict=False)
expected_mimetypes = [expected_mimetype]
if expected_mimetype in Config.aliases:
if isinstance(Config.aliases[expected_mimetype], list):
expected_mimetypes += Config.aliases[expected_mimetype]
else:
expected_mimetypes.append(Config.aliases[expected_mimetype])
if (encoding is None) and (os.path.getsize(self.src_path) == 0):
is_empty_file = True
else:
is_empty_file = False
is_known_extension = self.extension in mimetypes.types_map.keys()
if is_known_extension and self.mimetype not in expected_mimetypes and not is_empty_file:
self.make_dangerous(f'Mimetype does not match expected mimetypes ({expected_mimetypes}) for this extension')
def _check_mimetype(self):
"""
Compare mimetype (as determined by libmagic) to extension.
Determine whether the extension that are normally associated with
the mimetype include the file's actual extension.
"""
if not self.has_mimetype:
self.make_dangerous('File has no mimetype')
else:
if self.mimetype in Config.aliases:
mimetype = Config.aliases[self.mimetype]
else:
mimetype = self.mimetype
expected_extensions = mimetypes.guess_all_extensions(mimetype,
strict=False)
if mimetype in Config.aliases:
expected_extensions += mimetypes.guess_all_extensions(Config.aliases[mimetype], strict=False)
if expected_extensions:
if self.has_extension and self.extension not in expected_extensions:
self.make_dangerous(f'Extension does not match expected extensions ({expected_extensions}) for this mimetype')
def _check_filename(self):
"""
Verify the filename
If the filename contains any dangerous or specific characters, handle
them appropriately.
"""
if self.filename.startswith('.'):
macos_hidden_files = set(
'.Trashes', '._.Trashes', '.DS_Store', '.fseventsd', '.Spotlight-V100'
)
if self.filename in macos_hidden_files:
self.add_description('MacOS metadata file, added by MacOS to USB drives and some directories')
self.should_copy = False
right_to_left_override = u"\u202E"
if right_to_left_override in self.filename:
self.make_dangerous('Filename contains dangerous character')
new_filename = self.filename.replace(right_to_left_override, '')
self.set_property('filename', new_filename)
def _check_malicious_exts(self):
"""Check that the file's extension isn't contained in a blacklist"""
if self.extension in Config.malicious_exts:
self.make_dangerous('Extension identifies file as potentially dangerous')
def _compute_random_hashes(self):
"""Compute a random amount of hashes at random positions in the file to ensure integrity after the copy (mitigate TOCTOU attacks)"""
if not os.path.exists(self.src_path) or os.path.isdir(self.src_path) or self.maintype == 'image':
# Images are converted, no need to compute the hashes
return
self.random_hashes = []
if self.size < 64:
# hash the whole file
self.block_length = self.size
else:
if self.size < 128:
# Get a random length between 16 and the size of the file
self.block_length = random.randint(16, self.size)
else:
# Get a random length between 16 and 128
self.block_length = random.randint(16, 128)
for i in range(random.randint(3, 6)): # Do a random amound of read on the file (between 5 and 10)
start_pos = random.randint(0, self.size - self.block_length) # Pick a random length for the hash to compute
with open(self.src_path, 'rb') as f:
f.seek(start_pos)
hashed = hashlib.sha256(f.read(self.block_length)).hexdigest()
self.random_hashes.append((start_pos, hashed))
time.sleep(random.uniform(0.1, 0.5)) # Add a random sleep length
def _validate_random_hashes(self) -> bool:
"""Validate hashes computed by _compute_random_hashes"""
if not os.path.exists(self.src_path) or os.path.isdir(self.src_path) or self.maintype == 'image':
# Images are converted, we don't have to fear TOCTOU
return True
for start_pos, hashed_src in self.random_hashes:
with open(self.dst_path, 'rb') as f:
f.seek(start_pos)
hashed = hashlib.sha256(f.read(self.block_length)).hexdigest()
if hashed != hashed_src:
# Something fucked up happened
return False
return True
def check(self):
"""
Main file processing method.
First, checks for basic properties that might indicate a dangerous file.
If the file isn't dangerous, then delegates to various helper methods
for filetype-specific checks based on the file's mimetype.
"""
# Any of these methods can call make_dangerous():
self._check_malicious_exts()
self._check_mimetype()
self._check_extension()
self._check_filename() # can mutate self.filename
self._compute_random_hashes()
if not self.is_dangerous:
self.mime_processing_options.get(self.maintype, self.unknown)()
# ##### Helper functions #####
def _make_method_dict(self, list_of_tuples: Tuple) -> Dict[str, Callable]:
"""Returns a dictionary with mimetype: method pairs."""
dict_to_return = {}
for list_of_subtypes, method in list_of_tuples:
for subtype in list_of_subtypes:
dict_to_return[subtype] = method
return dict_to_return
@property
def has_metadata(self) -> bool:
"""True if filetype typically contains metadata, else False."""
if self.mimetype in Config.mimes_metadata:
return True
return False
def make_tempdir(self) -> Path:
"""Make a temporary directory at self.tempdir_path."""
if not self.tempdir_path.exists():
self.tempdir_path.mkdir(parents=True)
return self.tempdir_path
#######################
# ##### Discarded mimetypes, reason in the docstring ######
def inode(self):
"""Empty file or symlink."""
if self.is_symlink:
symlink_path = self.get_property('symlink')
self.add_description('File is a symlink to {}'.format(symlink_path))
else:
self.add_description('File is an inode (empty file)')
self.should_copy = False
def unknown(self):
"""Main type should never be unknown."""
self.add_description('Unknown mimetype')
self.should_copy = False
def example(self):
"""Used in examples, should never be returned by libmagic."""
self.add_description('Example file')
self.should_copy = False
def multipart(self):
"""Used in web apps, should never be returned by libmagic"""
self.add_description('Multipart file - usually found in web apps')
self.should_copy = False
# ##### Treated as malicious, no reason to have it on a USB key ######
def message(self):
"""Process a message file."""
self.make_dangerous('Message file - should not be found on USB key')
def model(self):
"""Process a model file."""
self.make_dangerous('Model file - should not be found on USB key')
# ##### Files that will be converted ######
def text(self):
"""Process an rtf, ooxml, or plaintext file."""
for mt in Config.mimes_rtf:
if mt in self.subtype:
self.add_description('Rich Text (rtf) file')
self.force_ext('.txt')
return
for mt in Config.mimes_ooxml:
if mt in self.subtype:
self._ooxml()
return
for mt in Config.mimes_csv:
if mt in self.subtype:
self.add_description('CSV file')
return
self.add_description('Plain text file')
self.force_ext('.txt')
def application(self):
"""Process an application specific file according to its subtype."""
for subtype, method in self.app_subtype_methods.items():
if subtype in self.subtype: # checking for partial matches
method()
return
self._unknown_app() # if none of the methods match
def _executables(self):
"""Process an executable file."""
self.make_dangerous('Executable file')
def _winoffice(self):
"""Process a winoffice file using olefile/oletools."""
oid = oletools.oleid.OleID(self.src_path) # First assume a valid file
if not olefile.isOleFile(self.src_path):
# Manual processing, may already count as suspicious
try:
ole = olefile.OleFileIO(self.src_path, raise_defects=olefile.DEFECT_INCORRECT)
except Exception:
self.make_dangerous('Unparsable WinOffice file')
if ole.parsing_issues:
self.make_dangerous('Parsing issues with WinOffice file')
else:
if ole.exists('macros/vba') or ole.exists('Macros') \
or ole.exists('_VBA_PROJECT_CUR') or ole.exists('VBA'):
self.make_dangerous('WinOffice file containing a macro')
else:
indicators = oid.check()
for i in indicators:
if i.id == 'ObjectPool' and i.value:
self.make_dangerous('WinOffice file containing an object pool')
elif i.id == 'flash' and i.value:
self.make_dangerous('WinOffice file with embedded flash')
elif i.id == 'encrypted' and i.value:
self.make_dangerous('Encrypted WinOffice file')
elif i.id == 'vba_macros' and i.value:
self.make_dangerous('WinOffice file containing a macro')
self.add_description('WinOffice file')
def _ooxml(self):
"""Process an ooxml file."""
self.add_description('OOXML (openoffice) file')
try:
doc = officedissector.doc.Document(self.src_path)
except Exception:
self.make_dangerous('Invalid ooxml file')
return
# There are probably other potentially malicious features:
# fonts, custom props, custom XML
if doc.is_macro_enabled or len(doc.features.macros) > 0:
self.make_dangerous('Ooxml file containing macro')
if len(doc.features.embedded_controls) > 0:
self.make_dangerous('Ooxml file with activex')
if len(doc.features.embedded_objects) > 0:
# Exploited by CVE-2014-4114 (OLE)
self.make_dangerous('Ooxml file with embedded objects')
if len(doc.features.embedded_packages) > 0:
self.make_dangerous('Ooxml file with embedded packages')
def _libreoffice(self):
"""Process a libreoffice file."""
# As long as there is no way to do a sanity check on the files => dangerous
try:
lodoc = zipfile.ZipFile(self.src_path, 'r')
except Exception:
# TODO: are there specific exceptions we should catch here? Or should it be everything
self.make_dangerous('Invalid libreoffice file')
for f in lodoc.infolist():
fname = f.filename.lower()
if fname.startswith('script') or fname.startswith('basic') or \
fname.startswith('object') or fname.endswith('.bin'):
self.make_dangerous('Libreoffice file containing executable code')
if not self.is_dangerous:
self.add_description('Libreoffice file')
def _pdf(self):
"""Process a PDF file."""
xmlDoc = PDFiD(str(self.src_path))
oPDFiD = cPDFiD(xmlDoc, True)
if oPDFiD.encrypt.count > 0:
self.make_dangerous('Encrypted pdf')
if oPDFiD.js.count > 0 or oPDFiD.javascript.count > 0:
self.make_dangerous('Pdf with embedded javascript')
if oPDFiD.aa.count > 0 or oPDFiD.openaction.count > 0:
self.make_dangerous('Pdf with openaction(s)')
if oPDFiD.richmedia.count > 0:
self.make_dangerous('Pdf containing flash')
if oPDFiD.launch.count > 0:
self.make_dangerous('Pdf with launch action(s)')
if oPDFiD.xfa.count > 0:
self.make_dangerous('Pdf with XFA structures')
if oPDFiD.objstm.count > 0:
self.make_dangerous('Pdf with ObjectStream structures')
if not self.is_dangerous:
self.add_description('Pdf file')
def _archive(self):
"""
Process an archive using 7zip.
The archive is extracted to a temporary directory and self.process_dir
is called on that directory. The recursive archive depth is increased
to protect against archive bombs.
"""
# TODO: change this to something archive type specific instead of generic 'Archive'
self.add_description('Archive')
self.should_copy = False
self.is_archive = True
def _unknown_app(self):
"""Process an unknown file."""
self.make_dangerous('Unknown application file')
def _binary_app(self):
"""Process an unknown binary file."""
self.make_dangerous('Unknown binary file')
#######################
# Metadata extractors
def _metadata_exif(self, metadata_file_path) -> bool:
"""Read exif metadata from a jpg or tiff file using exifread."""
# TODO: can we shorten this method somehow?
with open(self.src_path, 'rb') as img:
tags = None
try:
tags = exifread.process_file(img, debug=True)
except Exception as e:
self.add_error(e, "Error while trying to grab full metadata for file {}; retrying for partial data.".format(self.src_path))
if tags is None:
try:
tags = exifread.process_file(img, debug=True)
except Exception as e:
self.add_error(e, "Failed to get any metadata for file {}.".format(self.src_path))
return False
for tag in sorted(tags.keys()):
# These tags are long and obnoxious/binary so we don't add them
if tag not in ('JPEGThumbnail', 'TIFFThumbnail'):
tag_string = str(tags[tag])
# Exifreader truncates data.
if len(tag_string) > 25 and tag_string.endswith(", ... ]"):
tag_value = tags[tag].values
tag_string = str(tag_value)
with open(metadata_file_path, 'w+') as metadata_file:
metadata_file.write("Key: {}\tValue: {}\n".format(tag, tag_string))
# TODO: how do we want to log metadata?
self.set_property('metadata', 'exif')
return True
def _metadata_png(self, metadata_file_path) -> bool:
"""Extract metadata from a png file using PIL/Pillow."""
warnings.simplefilter('error', Image.DecompressionBombWarning)
try:
with Image.open(self.src_path) as img:
for tag in sorted(img.info.keys()):
# These are long and obnoxious/binary
if tag not in ('icc_profile'):
with open(metadata_file_path, 'w+') as metadata_file:
metadata_file.write("Key: {}\tValue: {}\n".format(tag, img.info[tag]))
# LOG: handle metadata
self.set_property('metadata', 'png')
return True
except Exception as e: # Catch decompression bombs
# TODO: only catch DecompressionBombWarnings here?
self.add_error(e, "Caught exception processing metadata for {}".format(self.src_path))
self.make_dangerous('exception processing metadata')
return False
def extract_metadata(self):
"""Create metadata file and call correct metadata extraction method."""
metadata_file_path = self.create_metadata_file(".metadata.txt")
mt = self.mimetype
metadata_processing_method = self.metadata_mimetype_methods.get(mt)
if metadata_processing_method:
# TODO: should we return metadata and write it here instead of in processing method?
metadata_processing_method(metadata_file_path)
#######################
# ##### Media - audio and video aren't converted ######
def audio(self):
"""Process an audio file."""
self.add_description('Audio file')
self._media_processing()
def video(self):
"""Process a video."""
self.add_description('Video file')
self._media_processing()
def _media_processing(self):
"""Generic way to process all media files."""
self.add_description('Media file')
def image(self):
"""
Process an image.
Extracts metadata to dest key using self.extract_metada() if metadata
is present. Creates a temporary directory on dest key, opens the image
using PIL.Image, saves it to the temporary directory, and copies it to
the destination.
"""
if self.has_metadata:
self.extract_metadata()
tempdir_path = self.make_tempdir()
tempfile_path = tempdir_path / self.filename
warnings.simplefilter('error', Image.DecompressionBombWarning)
try: # Do image conversions
with Image.open(self.src_path) as img_in:
with Image.frombytes(img_in.mode, img_in.size, img_in.tobytes()) as img_out:
img_out.save(tempfile_path)
self.src_path = tempfile_path
except Exception as e: # Catch decompression bombs
# TODO: change this from all Exceptions to specific DecompressionBombWarning
self.add_error(e, "Caught exception (possible decompression bomb?) while translating file {}.".format(self.src_path))
self.make_dangerous('Image file containing decompression bomb')
if not self.is_dangerous:
self.add_description('Image file')
class GroomerLogger(object):
"""Groomer logging interface."""
def __init__(self, src_root_path: Path, dst_root_path: Path, debug: bool=False):
self._src_root_path: Path = src_root_path
self._dst_root_path: Path = dst_root_path
self._log_dir_path: Path = self._make_log_dir(dst_root_path)
self.log_path: Path = self._log_dir_path / 'circlean_log.txt'
self._add_root_dir(src_root_path)
if debug:
self.log_debug_err: Path = self._log_dir_path / 'debug_stderr.log'
self.log_debug_out: Path = self._log_dir_path / 'debug_stdout.log'
else:
self.log_debug_err = Path(os.devnull)
self.log_debug_out = Path(os.devnull)
def _make_log_dir(self, root_dir_path: Path) -> Path:
"""Create the directory in the dest dir that will hold the logs"""
log_dir_path = root_dir_path / 'logs'
if os.path.exists(log_dir_path):
shutil.rmtree(log_dir_path)
os.makedirs(log_dir_path)
return log_dir_path
def _add_root_dir(self, root_path: Path):
"""Add the root directory to the log"""
dirname = os.path.split(root_path)[1] + '/'
with open(self.log_path, mode='ab') as lf:
lf.write(bytes(dirname, 'utf-8'))
lf.write(b'\n')
def add_file(self, file_path: Path, file_props: dict, in_tempdir: bool=False):
"""Add a file to the log. Takes a path and a dict of file properties."""
depth = self._get_path_depth(str(file_path))
try:
file_hash = Logging.computehash(file_path)[:6]
except IsADirectoryError:
file_hash = 'directory'
except FileNotFoundError:
file_hash = '------'
if file_props['is_symlink']:
symlink_template = "+- NOT COPIED: symbolic link to {name} ({sha_hash})"
log_string = symlink_template.format(
name=file_props['symlink_path'],
sha_hash=file_hash
)
else:
if file_props['is_dangerous']:
category = "Dangerous"
else:
category = "Normal"
size_string = self._format_file_size(file_props['file_size'])
if not file_props['copied']:
copied_string = 'NOT COPIED: '
else:
copied_string = ''
file_template = "+- {copied}{name} ({sha_hash}): {size}, type: {mt}/{st}. {cat}: {desc_str}"
log_string = file_template.format(
copied=copied_string,
name=file_props['filename'],
sha_hash=file_hash,
size=size_string,
mt=file_props['maintype'],
st=file_props['subtype'],
cat=category,
desc_str=file_props['description_string'],
)
if file_props['errors']:
error_string = ', '.join([str(key) for key in file_props['errors']])
log_string += (' Errors: ' + error_string)
if in_tempdir:
depth -= 1
self._write_line_to_log(log_string, depth)
def add_dir(self, dir_path: Path):
"""Add a directory to the log"""
path_depth = self._get_path_depth(str(dir_path))
dirname = os.path.split(str(dir_path))[1] + '/'
log_line = '+- ' + dirname
self._write_line_to_log(log_line, path_depth)
def _format_file_size(self, size: int) -> str:
"""Returns a string with the file size and appropriate unit"""
file_size = size
for unit in ('B', 'KB', 'MB', 'GB'):
if file_size < 1024:
return str(int(file_size)) + unit
else:
file_size = int(file_size / 1024)
return str(int(file_size)) + 'GB'
def _get_path_depth(self, path: str) -> int:
"""Returns the relative path depth compared to root directory"""
if str(self._dst_root_path) in path:
base_path = str(self._dst_root_path)
elif str(self._src_root_path) in path:
base_path = str(self._src_root_path)
relpath = os.path.relpath(path, base_path)
path_depth = relpath.count(os.path.sep)
return path_depth
def _write_line_to_log(self, line: str, indentation_depth: int):
"""
Write a line to the log
Pad the line according to the `indentation_depth`.
"""
padding = b' '
padding += b'| ' * indentation_depth
line_bytes = os.fsencode(line)
with open(self.log_path, mode='ab') as lf:
lf.write(padding)
lf.write(line_bytes)
lf.write(b'\n')
class KittenGroomerFileCheck(KittenGroomerBase):
def __init__(self, root_src: str, root_dst: str, max_recursive_depth: int=2, debug: bool=False):
super(KittenGroomerFileCheck, self).__init__(root_src, root_dst)
self.recursive_archive_depth = 0
self.max_recursive_depth = max_recursive_depth
self.logger = GroomerLogger(self.src_root_path, self.dst_root_path, debug)
def __repr__(self):
return "filecheck.KittenGroomerFileCheck object: {{{}}}".format(
os.path.basename(self.src_root_path)
)
def process_dir(self, src_dir: Path, dst_dir: Optional[Path] = None):
"""Process a directory on the source key."""
for srcpath in self.list_files_dirs(src_dir):
if not srcpath.is_symlink() and srcpath.is_dir():
self.logger.add_dir(srcpath)
else:
if dst_dir:
dstpath = dst_dir
else:
dstpath = Path(str(srcpath).replace(str(self.src_root_path), str(self.dst_root_path)))
cur_file = File(srcpath, dstpath)
self.process_file(cur_file)
def process_file(self, file: File):
"""
Process an individual file.
Check the file, handle archives using self.process_archive, copy
the file to the destionation key, and clean up temporary directory.
"""
file.check()
if file.is_archive:
self.process_archive(file)
else:
if file.should_copy:
if file.safe_copy():
file.set_property('copied', True)
if not file._validate_random_hashes():
# Something's fucked up.
file.make_dangerous('The copied file is different from the one checked, removing.')
file.dst_path.unlink()
else:
file.set_property('copied', False)
self.write_file_to_log(file)
# TODO: Can probably handle cleaning up the tempdir better
if hasattr(file, 'tempdir_path'):
self.safe_rmtree(file.tempdir_path)
def process_archive(self, file: File):
"""
Unpack an archive using 7zip and process contents using process_dir.
Should be given a Kittengroomer file object whose src_path points
to an archive.
"""
self.recursive_archive_depth += 1
if self.recursive_archive_depth >= self.max_recursive_depth:
file.make_dangerous('Archive bomb')
else:
tempdir_path = file.make_tempdir()
command_str = '{} -p1 x "{}" -o"{}" -bd -aoa'
# -p1=password, x=extract, -o=output location, -bd=no % indicator, -aoa=overwrite existing files
unpack_command = command_str.format(SEVENZ_PATH,
file.src_path, tempdir_path)
self._run_process(unpack_command)
self.write_file_to_log(file)
self.process_dir(tempdir_path, file.dst_path / file.filename)
self.safe_rmtree(tempdir_path)
self.recursive_archive_depth -= 1
def _run_process(self, command_string: str, timeout: Optional[int]=None) -> bool:
"""Run command_string in a subprocess, wait until it finishes."""
args = shlex.split(command_string)
with open(self.logger.log_debug_err, 'ab') as stderr, open(self.logger.log_debug_out, 'ab') as stdout:
try:
subprocess.check_call(args, stdout=stdout, stderr=stderr, timeout=timeout)
except (subprocess.TimeoutExpired, subprocess.CalledProcessError):
return False
return True
def write_file_to_log(self, file: File):
"""Pass information about `file` to self.logger."""
props = file.get_all_props()
if not file.is_archive:
# FIXME: in_tempdir is a hack to make image files appear at the correct tree depth in log
in_tempdir = file.tempdir_path.exists()
self.logger.add_file(file.src_path, props, in_tempdir)
def list_files_dirs(self, root_dir_path: Path) -> List[Path]:
"""
Returns a list of all files and directories
Performs a depth-first traversal of the file tree.
"""
skipped_files = ('.Trashes', '._.Trashes', '.DS_Store', '.fseventsd', '.Spotlight-V100', 'System Volume Information')
queue = []
for path in sorted(os.listdir(root_dir_path), key=lambda x: str.lower(x)):
full_path = root_dir_path / path
filename = full_path.name
if filename not in skipped_files and not filename.startswith('._'):
# check for symlinks first to prevent getting trapped in infinite symlink recursion
if full_path.is_symlink():
queue.append(full_path)
elif full_path.is_dir():
# Skip hidden and special directories.
queue.append(full_path)
queue += self.list_files_dirs(full_path)
elif full_path.is_file():
queue.append(full_path)
else:
print(f"SKIPPING: {filename}")
return queue
def run(self):
self.process_dir(self.src_root_path)
def main(kg_implementation, description: str):
parser = argparse.ArgumentParser(prog='KittenGroomer', description=description)
parser.add_argument('-s', '--source', type=str, help='Source directory')
parser.add_argument('-d', '--destination', type=str, help='Destination directory')
args = parser.parse_args()
kg = kg_implementation(args.source, args.destination)
kg.run()
if __name__ == '__main__':
main(KittenGroomerFileCheck, 'File sanitizer used in CIRCLean. Renames potentially dangerous files.')