-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFileHasher.py
More file actions
149 lines (131 loc) · 5.47 KB
/
FileHasher.py
File metadata and controls
149 lines (131 loc) · 5.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import hashlib
import os
import sqlite3
import multiprocessing
from multasker.process import TwoQueue
class FileHasher(TwoQueue):
path = ''
@staticmethod
def hash_file(filepath):
"""Calculate the SHA-256 hash of a file."""
hash_sha256 = hashlib.sha256()
try:
file_size_bytes = os.path.getsize(filepath)
chunk_size = 4096
with open(filepath, "rb") as f:
read_size = chunk_size
file_size = 'tiny'
if file_size_bytes < chunk_size:
read_size = file_size_bytes
elif chunk_size < file_size_bytes < (1024 * 1024 * 1024):
read_size = file_size_bytes if file_size_bytes < (1024 * 1024 * 32) else 1024 * 1024 * 32
file_size = 'small'
elif file_size_bytes >= (1024 * 1024 * 1024 * 10):
read_size = 1024 * 1024 * 128
file_size = 'huge'
elif file_size_bytes >= (1024 * 1024 * 1024):
read_size = 1024 * 1024 * 32
file_size = 'large'
print(f"[INFO] Hashing [{file_size}] {filepath}")
for chunk in iter(lambda: f.read(read_size), b""):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
except Exception as e:
print(f"Error hashing file {filepath}: {e}")
return None
@staticmethod
def worker(task_queue, result_queue, existing_paths):
"""Worker process to process files from the task queue."""
while True:
task = task_queue.get()
if task is None: # Stop condition
break
#print(f"[INFO] processing {task[0]}")
directory, files = task
for file in files:
filepath = os.path.join(directory, file)
# Skip if file path is already in the existing_paths set
if filepath in existing_paths:
# print(f"Skipping {filepath}")
continue
"""
if FileHasher.is_cloud_file(filepath=filepath):
print(f"[INFO] Skipping {filepath}. Cloud-only file")
continue
"""
file_hash = FileHasher.hash_file(filepath)
if file_hash: # Only add if hash calculation was successful
result_queue.put((filepath, file_hash))
else:
result_queue.put((filepath, ''))
@staticmethod
def db_writer(result_queue, db_path="file_hashes.db", batch_size=1000):
"""DB writer process that batches results from the result queue and writes to SQLite."""
# Connect to the SQLite database (creates it if it doesn't exist)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create the table if it doesn't exist
cursor.execute("""
CREATE TABLE IF NOT EXISTS paths (
path TEXT PRIMARY KEY,
hash TEXT
)
""")
conn.commit()
batch = []
while True:
try:
result = result_queue.get(timeout=5) # Wait for a result
if result is None: # Stop condition
if batch:
FileHasher.write_to_db(cursor, batch)
conn.commit()
break
batch.append(result)
# If batch size is reached, write to the database
if len(batch) >= batch_size:
FileHasher.write_to_db(cursor, batch)
conn.commit()
batch.clear()
except multiprocessing.queues.Empty:
pass # If no items in the queue, continue (or handle timeout)
conn.close()
@staticmethod
def write_to_db(cursor, batch):
"""Write a batch of results to the SQLite database."""
try:
cursor.executemany("""
INSERT OR REPLACE INTO paths (path, hash) VALUES (?, ?)
""", batch)
print("[INFO] Wrote results to database")
except sqlite3.Error as e:
print(f"Error writing to DB: {e}")
@staticmethod
def queue_callback(task_queue):
# Add tasks to the task queue from os.walk
for root, _, files in os.walk(FileHasher.path):
if files: # Only add entries that have files
task_queue.put((root, files))
@staticmethod
def set_path(pathname=''):
FileHasher.path = pathname
@staticmethod
def load_existing_paths(db_path="file_hashes.db"):
"""Load existing file paths from the database into a set."""
existing_paths = set()
if os.path.exists(db_path):
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Get all file paths from the database
cursor.execute("SELECT path FROM paths")
rows = cursor.fetchall()
# Add each file path to the set
existing_paths = {row[0] for row in rows}
conn.close()
print(f"Loaded {len(existing_paths)} existing file paths from the database.")
except sqlite3.Error as e:
print(f"Error reading from DB: {e}")
else:
print("No existing database found; starting fresh.")
return existing_paths