-
-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathroutes.py
118 lines (100 loc) · 3.84 KB
/
routes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import time
from flask import Blueprint, request, session
from flask import render_template, redirect, jsonify
from werkzeug.security import gen_salt
from authlib.integrations.flask_oauth2 import current_token
from authlib.oauth2 import OAuth2Error
from .models import db, User, OAuth2Client
from .oauth2 import authorization, require_oauth, generate_user_info
bp = Blueprint('bp', __name__)
def current_user():
if 'id' in session:
uid = session['id']
return User.query.get(uid)
return None
@bp.route('/', methods=('GET', 'POST'))
def home():
if request.method == 'POST':
username = request.form.get('username')
user = User.query.filter_by(username=username).first()
if not user:
user = User(username=username)
db.session.add(user)
db.session.commit()
session['id'] = user.id
return redirect('/')
user = current_user()
if user:
clients = OAuth2Client.query.filter_by(user_id=user.id).all()
else:
clients = []
return render_template('home.html', user=user, clients=clients)
def split_by_crlf(s):
return [v for v in s.splitlines() if v]
@bp.route('/create_client', methods=('GET', 'POST'))
def create_client():
user = current_user()
if not user:
return redirect('/')
if request.method == 'GET':
return render_template('create_client.html')
form = request.form
client_id = gen_salt(24)
client = OAuth2Client(client_id=client_id, user_id=user.id)
# Mixin doesn't set the issue_at date
client.client_id_issued_at = int(time.time())
if client.token_endpoint_auth_method == 'none':
client.client_secret = ''
else:
client.client_secret = gen_salt(48)
client_metadata = {
"client_name": form["client_name"],
"client_uri": form["client_uri"],
"grant_types": split_by_crlf(form["grant_type"]),
"redirect_uris": split_by_crlf(form["redirect_uri"]),
"response_types": split_by_crlf(form["response_type"]),
"scope": form["scope"],
"token_endpoint_auth_method": form["token_endpoint_auth_method"]
}
client.set_client_metadata(client_metadata)
db.session.add(client)
db.session.commit()
return redirect('/')
@bp.route('/oauth/authorize', methods=['GET', 'POST'])
def authorize():
user = current_user()
# TODO Login is required since we need to know the current resource owner.
# It can be done with a redirection to the login page, or a login
# form on this authorization page.
if request.method == 'GET':
try:
grant = authorization.get_consent_grant(end_user=user)
client = grant.client
scope = client.get_allowed_scope(grant.request.scope)
# You may add a function to extract scope into a list of scopes
# with rich information, e.g.
# scopes = describe_scope(scope) # returns [{'key': 'email', 'icon': '...'}]
except OAuth2Error as error:
return jsonify(dict(error.get_body()))
return render_template(
'authorize.html',
user=user,
grant=grant
) # can add client and scopes here
if not user and 'username' in request.form:
username = request.form.get('username')
user = User.query.filter_by(username=username).first()
if request.form['confirm']:
# granted by resource owner
grant_user = user
else:
# denied by resource owner
grant_user = None
return authorization.create_authorization_response(grant_user=grant_user)
@bp.route('/oauth/token', methods=['POST'])
def issue_token():
return authorization.create_token_response()
@bp.route('/oauth/userinfo')
@require_oauth('openid profile')
def api_me():
return jsonify(generate_user_info(current_token.user, current_token.scope))