-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfinddup.py
155 lines (124 loc) · 5.25 KB
/
finddup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python
import os
import sys
import re
import string
import hashlib
import argparse
from functools import partial
# borrowed from http://goo.gl/kFJZKb
# which originally borrowed from http://goo.gl/zeJZl
def human2bytes(s):
"""
>>> human2bytes('1M')
1048576
>>> human2bytes('1G')
1073741824
"""
symbols = ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
letter = s[-1].strip().upper()
num = s[:-1]
if letter not in symbols:
return -1
try:
num = float(num)
except ValueError:
return -1
prefix = {symbols[0]: 1}
for i, s in enumerate(symbols[1:]):
prefix[s] = 1 << (i + 1) * 10
return int(num * prefix[letter])
class HelpFormatterMixin(argparse.RawDescriptionHelpFormatter,
argparse.ArgumentDefaultsHelpFormatter):
"""Formatter class to correctly display example in the epilog."""
def hash_md5(path, name, blocksize):
"""hash_md5(path, name, blocksize) -> <md5digest>
Returns the md5 checksum of the file's first blocksize block
"""
filename = os.path.join(path, name)
if not os.path.isfile(filename):
return 'NotARegularFile'
flsize = os.stat(filename).st_size
readbytes = None if flsize < blocksize else blocksize
with open(filename, 'rb') as fl:
return hashlib.md5(fl.read(readbytes)).hexdigest()
def hash_fuzzy(ignored, name):
"""First ^normalize^ a filename and then return the md5 digest of the
normalized name.
Normalizing means:
* converting the filename to lowercase & removing the extension
* removing all spaces and punctuation in the filename
"""
name, _ = os.path.splitext(name.lower())
name = name.replace('&', 'and')
name = name.translate(None, string.whitespace + string.punctuation)
return hashlib.md5(name.encode('utf-8')).hexdigest()
def main():
parser = argparse.ArgumentParser(
usage = "%s [OPTIONS] DIRECTORIES ..." % sys.argv[0],
description = 'Find duplicate files within a list of directories',
formatter_class = HelpFormatterMixin,
epilog = (
"Example: find all likely duplicate files under the current\n"
"directory using the md5 checksums of the first 1K bytes of\n"
"the files to identify duplicates.\n"
"\t$ %s -m -b 1K ./") % sys.argv[0]
)
parser.add_argument('DIRECTORIES', nargs='+', help="directories to search")
parser.add_argument('-e', '--exclude', default='(?!.*)',
help='exclude files where the path matches the provided regex pattern')
parser.add_argument('-o', '--only', default='.*',
help='only consider files where the name matches the provided regex pattern')
ex_group = parser.add_mutually_exclusive_group()
ex_group.add_argument('-n', '--name', action="store_true", default=True,
help="use exact filenames (fastest)")
ex_group.add_argument('-f', '--fuzzy', action="store_true",
help="use fuzzy match of file names")
parser.add_argument('-m', '--md5', action="store_true",
help="use md5 checksums (slowest)")
ex_group.add_argument('-B', '--blocksize', default='512K',
help=("limit md5 checksums to first BLOCKSIZE bytes. "
"Recognizes human readable formats, eg: 1G, 32M"))
parser.add_argument('-I', '--inverse', action="store_true", default=False,
help=("Inverse the report, ie: report files for "
"that *do not* have a duplicate copy."))
args = parser.parse_args()
blocksize = human2bytes(args.blocksize if args.blocksize else '512K')
if args.md5:
hash_fn = partial(hash_md5, blocksize=blocksize)
elif args.fuzzy:
hash_fn = hash_fuzzy
else: # args.name <- default
hash_fn = lambda _, name: name
path_pattern = re.compile(args.exclude)
name_pattern = re.compile(args.only)
# - begin hashing
file_hash = {}
nfiles = 0
for directory in args.DIRECTORIES:
for root, subdirs, files in os.walk(directory):
for name in filter(name_pattern.search, files):
path = os.path.join(root, name)
if path_pattern.search(path):
continue
nfiles += 1
file_hash.setdefault(hash_fn(root, name), []).append(path)
if args.inverse:
report = {k: v for k, v in file_hash.items() if len(v) == 1}
else:
report = {k: v for k, v in file_hash.items() if len(v) > 1}
for k, v in report.items():
print('%s\n\t%s' % (k, '\n\t'.join(sorted(v))))
print('\nProcessed {} files '.format(nfiles),)
if report:
if args.inverse:
print('and found {} files without duplicates'.format(len(report)))
else:
print('and found {} possible duplicates'.format(len(report)))
else:
if args.inverse:
print('\nProcessed {} files and found all files duplicated'.format(nfiles))
else:
print('\nProcessed {} files and found no duplicates'.format(nfiles))
if __name__ == '__main__':
main()