-
Notifications
You must be signed in to change notification settings - Fork 4
/
flask_simple_crypt.py
135 lines (107 loc) · 5.19 KB
/
flask_simple_crypt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
flask-simple-crypt
------------------
A Flask extension providing simple, secure encryption and decryption for Python.
Original work and credit goes to Andrew Cooke, https://github.com/andrewcooke/simple-crypt
:copyright: (c) 2022 by Carlos, [email protected]
(c) 2012-2015 Andrew Cooke, [email protected]
(c) 2013 d10n, https://github.com/d10/ & [email protected]
:license: MIT, see LICENSE for more details.
"""
import base64
from Crypto.Cipher import AES
from Crypto.Hash import SHA256, HMAC
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Random.random import getrandbits
from Crypto.Util import Counter
__version__ = "0.3.3"
__author__ = "Carlos"
__license__ = "MIT"
__copyright__ = "(c) 2022 Carlos Rivas\n" \
"(c) 2012-2015 Andrew Cooke\n" \
"(c) 2013 d10n, https://github.com/d10/\n" \
__all__ = ["SimpleCrypt"]
class SimpleCrypt(object):
def __init__(self, app=None):
self.AES_KEY_LEN = 256
self.SALT_LEN = 256
self.HASH = SHA256
self.PREFIX = b"fsc"
self.HEADER = self.PREFIX + b"\x00\x02"
self.HALF_BLOCK = AES.block_size * 8 // 2
self.HEADER_LEN = len(self.HEADER)
self.EXPANSION_COUNT = None
self.FSC_KEY = None
if app is not None:
self.init_app(app)
def init_app(self, app):
self.EXPANSION_COUNT = app.config.get('FSC_EXPANSION_COUNT', 10240)
self.FSC_KEY = app.config.get("SECRET_KEY")
if not self.FSC_KEY:
raise RuntimeError("flask-simple-crypt requires the usage of SECRET_KEY")
def encrypt(self, data):
data = self._str_to_bytes(data)
self._assert_encrypt_length(data)
salt = bytes(self._random_bytes(self.SALT_LEN // 8))
hmac_key, cipher_key = self._expand_keys(self.FSC_KEY, salt, self.EXPANSION_COUNT)
counter = Counter.new(self.HALF_BLOCK, prefix=salt[:self.HALF_BLOCK // 8])
cipher = AES.new(cipher_key, AES.MODE_CTR, counter=counter)
encrypted = cipher.encrypt(data)
hmac = self._hmac(hmac_key, self.HEADER + salt + encrypted)
return base64.b64encode(self.HEADER + salt + encrypted + hmac)
def decrypt(self, payload):
data = base64.b64decode(payload)
self._assert_not_unicode(data)
self._assert_header_prefix(data)
self._assert_decrypt_length(data)
raw = data[self.HEADER_LEN:]
salt = raw[:self.SALT_LEN // 8]
hmac_key, cipher_key = self._expand_keys(self.FSC_KEY, salt, self.EXPANSION_COUNT)
hmac = raw[-self.HASH.digest_size:]
hmac2 = self._hmac(hmac_key, data[:-self.HASH.digest_size])
self._assert_hmac(hmac_key, hmac, hmac2)
counter = Counter.new(self.HALF_BLOCK, prefix=salt[:self.HALF_BLOCK // 8])
cipher = AES.new(cipher_key, AES.MODE_CTR, counter=counter)
return cipher.decrypt(raw[self.SALT_LEN // 8:-self.HASH.digest_size])
def _assert_not_unicode(self, data):
u_type = type(b"".decode("utf8"))
if isinstance(data, u_type):
raise DecryptionException("Data to decrypt must be bytes; " +
"you cannot use a string because " +
"no string encoding will accept all " +
"possible characters.")
def _assert_encrypt_length(self, data):
if len(data) > 2 ** self.HALF_BLOCK:
raise EncryptionException("Message is too long.")
def _assert_decrypt_length(self, data):
if len(data) < self.HEADER_LEN + self.SALT_LEN // 8 + self.HASH.digest_size:
raise DecryptionException("Missing data.")
def _assert_header_prefix(self, data):
if len(data) >= 3 and data[:3] != self.PREFIX:
raise DecryptionException("Data passed to decrypt were not generated by simple-crypt (bad header).")
def _assert_hmac(self, key, hmac, hmac2):
if self._hmac(key, hmac) != self._hmac(key, hmac2):
raise DecryptionException("Bad password or corrupt / modified data.")
def _pbkdf2(self, password, salt, n_bytes, count):
return PBKDF2(password, salt, dkLen=n_bytes,
count=count, prf=lambda p, s: HMAC.new(p, s, self.HASH).digest())
def _expand_keys(self, password, salt, expansion_count):
if not salt: raise ValueError("Missing salt.")
if not password: raise ValueError("Missing password.")
key_len = self.AES_KEY_LEN // 8
keys = self._pbkdf2(self._str_to_bytes(password), salt, 2 * key_len, expansion_count)
return keys[:key_len], keys[key_len:]
def _hide(self, ranbytes):
return bytearray(self._pbkdf2(bytes(ranbytes), b"", len(ranbytes), 1))
def _random_bytes(self, n):
return self._hide(bytearray(getrandbits(8) for _ in range(n)))
def _hmac(self, key, data):
return HMAC.new(key, data, self.HASH).digest()
def _str_to_bytes(self, data):
u_type = type(b"".decode("utf8"))
if isinstance(data, u_type):
return data.encode("utf8")
return data
class DecryptionException(Exception): pass
class EncryptionException(Exception): pass