-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsymmetric_handler.py
173 lines (142 loc) · 5.26 KB
/
symmetric_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import os
from dotenv import load_dotenv
from cryptography.fernet import Fernet
from python_datalogger import DataLogger
from Crypto.Cipher import AES
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Util.Padding import pad, unpad
from Crypto.Random import get_random_bytes
class SymmetricHandler:
load_dotenv()
__temp_key = (os.getenv('TEMP_KEY')).encode('utf-8')
def __init__(self, password: str):
self.__password = password
self.__token_dc = None
self.__token_ec = None
@property
def password(self) -> str:
return self.__password
def get_token(self, byte: bool = True) -> any:
if byte:
return self.__token_ec
else:
return self.__token_ec.decode('utf-8')
def encrypt(self) -> None:
if not SymmetricHandler.__type_check(self.password):
self.__token_ec = Fernet(self.__temp_key).encrypt(self.__password.encode('utf-8'))
else:
raise Exception("Expected String !")
def decrypt(self) -> None:
if SymmetricHandler.__type_check(self.__token_ec):
self.__token_dc = Fernet(self.__temp_key).decrypt(self.__token_ec)
else:
raise Exception("Expected bytes !")
@staticmethod
def __type_check(value: any) -> bool:
if isinstance(value, bytes):
return True
else:
return False
class AESCipherHandler:
"""
Handles all the processes related to cryptography.
"""
def __init__(self, availability: bool = False, password: str = "root", key: bytes = None,
logger_name: str = "CryptoHandleLogger"):
self.__availability = availability
self.__password = password
self.salt = None
self.key = key
self.cipher = None
self.iv = None
self.logger = DataLogger(name=logger_name, propagate=False)
@property
def password(self) -> str:
"""makes the password read only"""
return self.__password
@property
def availability(self) -> bool:
"""makes availability option read only"""
return self.__availability
def assign_salt(self, salt=None) -> None:
"""
assigns salt if availability is False and param salt is None
:param salt: byte array of length 32
"""
if not self.__availability:
self.salt = get_random_bytes(32)
else:
self.salt = salt
def get_salt(self, path: str):
"""
reads a given bin file and assign salt
:param path: name/path of the salt bin file
"""
with open(path, 'rb') as file:
self.salt = file.read()
def generate_key(self) -> None:
"""Generates a unique 32byte key using salt and password"""
if self.key is None and self.salt is not None:
self.key = PBKDF2(self.__password, self.salt, dkLen=32)
self.logger.log_info("Key generated")
def get_encrypting_cipher(self):
"""assigns the cipher for encryption"""
self.cipher = AES.new(self.key, AES.MODE_CBC)
def get_decrypting_cipher(self):
"""assigns the cipher for decryption"""
self.cipher = AES.new(self.key, AES.MODE_CBC, iv=self.iv)
def get_iv(self):
"""assigns iv prior to decryption"""
self.iv = self.cipher.iv
def encrypt(self, data: str) -> bytes:
"""
encrypts the given set of data.
:param data: A string of information that needs to be encrypted
:return: encrypted bytes
"""
self.get_encrypting_cipher()
if not isinstance(data, bytes):
data = data.encode()
self.get_iv()
return self.cipher.encrypt(pad(data, AES.block_size))
def decrypt(self, data: bytes) -> bytes:
"""
decrypts a given set of data.
:param data: A bytearray that needs to be decrypted
:return: decrypted bytes
"""
try:
self.get_decrypting_cipher()
if isinstance(data, str):
data = data.encode()
return unpad(self.cipher.decrypt(data), AES.block_size)
except Exception as error:
self.logger.log_error(f"{error}")
def salt_to_bin(self, path) -> None:
"""creates a bin file containing the salt bytes"""
with open(path, 'wb') as file:
file.write(self.salt)
self.logger.log_info("salt bin created.")
def write_to_bin(self, file_name: str, data: bytes):
"""
creates or overwrites a bin file with iv and encrypted bytes.
:param file_name: name/path of the bin file
:param data: encrypted bytes
"""
if isinstance(data, str):
data = data.encode()
with open(file_name, 'wb') as file:
file.write(self.iv)
file.write(data)
self.logger.log_info("Data bin created.")
def read_from_bin(self, file_name: str) -> bytes:
"""
retrieves iv and encrypted byte data from a bin file.
:param file_name: name/path of the bin file
:return: bytes read from the bin file
"""
with open(file_name, 'rb') as file:
self.iv = file.read(16)
data = file.read()
self.logger.log_info("Data retrieved from bin.")
return data