-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathauth.py
115 lines (95 loc) · 2.8 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#!/usr/bin/env python
"""
auth.py
Handles authentication with CAS and user sessions.
Adopted to PostgreSQL (replaced ? with %s in queries).
"""
import re
import ssl
from urllib import parse, request
from flask import Blueprint, session, redirect, render_template, abort
import flask
from database import get_user_db_connection
auth_bp = Blueprint("auth", __name__)
_CAS_URL = "https://fed.princeton.edu/cas/"
# Strip ticket from URL
def strip_ticket(url):
if url is None:
return "something is badly wrong"
url = re.sub(r"ticket=[^&]*&?", "", url)
url = re.sub(r"\?&?$|&$", "", url)
return url
# Create SSL context
def create_ssl_context():
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context
# Validate ticket
def validate(ticket):
val_url = (
_CAS_URL
+ "validate"
+ "?service="
+ parse.quote(strip_ticket(flask.request.url))
+ "&ticket="
+ parse.quote(ticket)
)
context = create_ssl_context()
with request.urlopen(val_url, context=context) as flo:
lines = flo.readlines()
if len(lines) != 2:
return None
first_line = lines[0].decode("utf-8")
second_line = lines[1].decode("utf-8")
if not first_line.startswith("yes"):
return None
return second_line
# Authenticate user
def authenticate():
if "username" in flask.session:
return flask.session.get("username")
ticket = flask.request.args.get("ticket")
if ticket is None:
login_url = (
_CAS_URL + "login?service=" + parse.quote(flask.request.url)
)
abort(redirect(login_url))
username = validate(ticket)
if username is None:
login_url = (
_CAS_URL
+ "login?service="
+ parse.quote(strip_ticket(flask.request.url))
)
abort(redirect(login_url))
username = username.strip()
flask.session["username"] = username
conn = get_user_db_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT user_id FROM users WHERE name = %s", (username,)
)
user = cursor.fetchone()
if user is None:
# Insert user if not exists
cursor.execute(
"INSERT INTO users (user_id, name) VALUES (%s, %s) ON CONFLICT (user_id) DO NOTHING",
(username, username),
)
conn.commit()
user_id = username
conn.close()
flask.session["user_id"] = user_id
return username
# Logout user
@auth_bp.route("/logoutapp", methods=["GET"])
def logoutapp():
session.clear()
return render_template("loggedout.html")
# Logout CAS
@auth_bp.route("/logoutcas", methods=["GET"])
def logoutcas():
session.clear()
logout_url = _CAS_URL + "logout"
return redirect(logout_url)