-
Notifications
You must be signed in to change notification settings - Fork 2
/
app.py
145 lines (117 loc) · 4.64 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from uuid import uuid4
from datetime import datetime, timedelta
import json
from flask import Flask, jsonify, request
import jwt
from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from flask_basicauth import BasicAuth
from scope import Scope
app = Flask(__name__)
app.debug = True
basic_auth = BasicAuth(app)
TOKEN_VALID_FOR_SECONDS = 3600
app.config['SERVICE'] = 'kdreyer-registry' # eg. "registry.ceph.com"
app.config['ISSUER'] = 'kdreyer-registry' # eg. "registry-auth.ceph.com"
app.config['BASIC_AUTH_USERNAME'] = 'admin'
app.config['BASIC_AUTH_PASSWORD'] = 'pass'
app.config['BASIC_AUTH_REALM'] = app.config['ISSUER']
# To generate a cert and key:
# openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes -subj '/CN=kdreyer-registry'
def get_certificate():
"""Return a cryptography.x509.Certificate object"""
with open('cert.pem', 'rb') as cert_file:
return load_pem_x509_certificate(cert_file.read(),
default_backend())
def get_private_key():
with open('key.pem', 'rb') as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=None,
backend=default_backend()
)
return private_key
def get_token_payload(service, issuer, scopes):
now = datetime.utcnow()
token_payload = {
'iss' : issuer,
'sub' : 'test', # XXX?
'aud' : service,
'exp' : now + timedelta(seconds=TOKEN_VALID_FOR_SECONDS),
'nbf' : now,
'iat' : now,
'jti' : uuid4().get_hex(),
'access' : [
{
'type' : scope.type,
'name' : scope.name,
'actions' : scope.actions
}
for scope in scopes
]
}
# app.logger.debug(('token', token_payload))
return token_payload
def construct_token_response(service, issuer, scopes):
token_payload = get_token_payload(service, issuer, scopes)
cert = get_certificate()
x5c = cert.public_bytes(serialization.Encoding.PEM)
x5c = x5c.replace('\n', '')
x5c = x5c.replace('-----BEGIN CERTIFICATE-----', '')
x5c = x5c.replace('-----END CERTIFICATE-----', '')
response_payload = {
'token' : jwt.encode(token_payload,
get_private_key(),
headers = {
'x5c':[x5c]
},
algorithm='RS256'),
'expires_in' : 3600,
'issued_at' : datetime.utcnow().isoformat() + 'Z'
}
#app.logger.debug(('response', json.dumps(response_payload)))
decoded = jwt.decode(response_payload['token'],
get_private_key(),
verify=False)
# app.logger.debug(('decoded', json.dumps(decoded)))
return response_payload
def get_scopes():
scopes = set()
for s in request.args.getlist('scope'):
scopes.add(Scope.parse(s))
return scopes
def requires_auth(scopes):
""" Determine if this HTTP request requires authentication """
# Any action other than pulling requires authentication.
for scope in scopes:
if scope.actions != ['pull']:
app.logger.debug('scope %s requires auth' % scope.name)
return True
# A "docker login" command request an offline token, and we should check
# the auth attempt.
if request.args.get('offline_token', '') == 'true':
return True
return False
@app.route('/token')
def token():
"""
See https://github.com/docker/distribution/blob/master/docs/spec/auth/token.md#requesting-a-token
"""
try:
service = request.args['service']
except KeyError:
response = jsonify({'error': 'service parameter required'})
response.status_code = 400
return response
if service != app.config['SERVICE']:
response = jsonify({'error': 'service parameter incorrect'})
response.status_code = 400
return response
scopes = get_scopes()
if requires_auth(scopes) and not basic_auth.authenticate():
return basic_auth.challenge()
payload = construct_token_response(service, app.config['ISSUER'], scopes)
return jsonify(**payload)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5001)