This repository was archived by the owner on May 26, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 656
/
Copy pathutils.py
137 lines (108 loc) · 3.76 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import jwt
import uuid
import warnings
from django.contrib.auth import get_user_model
from calendar import timegm
from datetime import datetime
from rest_framework_jwt.compat import get_username
from rest_framework_jwt.compat import get_username_field
from rest_framework_jwt.settings import api_settings
def jwt_get_secret_key(payload=None):
"""
For enhanced security you may want to use a secret key based on user.
This way you have an option to logout only this user if:
- token is compromised
- password is changed
- etc.
"""
if api_settings.JWT_GET_USER_SECRET_KEY:
User = get_user_model() # noqa: N806
username_field = get_username_field()
username = payload.get(username_field)
user = User.objects.get_by_natural_key(username)
key = str(api_settings.JWT_GET_USER_SECRET_KEY(user))
return key
return api_settings.JWT_SECRET_KEY
def jwt_payload_handler(user):
username_field = get_username_field()
username = get_username(user)
warnings.warn(
'The following fields will be removed in the future: '
'`email` and `user_id`. ',
DeprecationWarning
)
payload = {
'user_id': user.pk,
'username': username,
'exp': datetime.utcnow() + api_settings.JWT_EXPIRATION_DELTA
}
if hasattr(user, 'email'):
payload['email'] = user.email
if isinstance(user.pk, uuid.UUID):
payload['user_id'] = str(user.pk)
payload[username_field] = username
# Include original issued at time for a brand new token,
# to allow token refresh
if api_settings.JWT_ALLOW_REFRESH:
payload['orig_iat'] = timegm(
datetime.utcnow().utctimetuple()
)
if api_settings.JWT_AUDIENCE is not None:
payload['aud'] = api_settings.JWT_AUDIENCE
if api_settings.JWT_ISSUER is not None:
payload['iss'] = api_settings.JWT_ISSUER
return payload
def jwt_get_user_id_from_payload_handler(payload):
"""
Override this function if user_id is formatted differently in payload
"""
warnings.warn(
'The following will be removed in the future. '
'Use `JWT_PAYLOAD_GET_USERNAME_HANDLER` instead.',
DeprecationWarning
)
return payload.get('user_id')
def jwt_get_username_from_payload_handler(payload):
"""
Override this function if username is formatted differently in payload
"""
return payload.get('username')
def jwt_encode_handler(payload):
key = api_settings.JWT_PRIVATE_KEY or jwt_get_secret_key(payload)
return jwt.encode(
payload,
key,
api_settings.JWT_ALGORITHM
).decode('utf-8')
def jwt_decode_handler(token):
options = {
'verify_exp': api_settings.JWT_VERIFY_EXPIRATION,
}
# get user from token, BEFORE verification, to get user secret key
unverified_payload = jwt.decode(token, None, False)
secret_key = jwt_get_secret_key(unverified_payload)
return jwt.decode(
token,
api_settings.JWT_PUBLIC_KEY or secret_key,
api_settings.JWT_VERIFY,
options=options,
leeway=api_settings.JWT_LEEWAY,
audience=api_settings.JWT_AUDIENCE,
issuer=api_settings.JWT_ISSUER,
algorithms=[api_settings.JWT_ALGORITHM]
)
def jwt_response_payload_handler(token, user=None, request=None):
"""
Returns the response data for both the login and refresh views.
Override to return a custom response such as including the
serialized representation of the User.
Example:
def jwt_response_payload_handler(token, user=None, request=None):
return {
'token': token,
'user': UserSerializer(user, context={'request': request}).data
}
"""
return {
'token': token
}