-
Notifications
You must be signed in to change notification settings - Fork 1
/
api.py
154 lines (129 loc) · 5.16 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
from flask import Blueprint, abort, current_app, jsonify, request, json as flask_json
import jwt
import requests
import json
import re
from jwt_proxy.audit import audit_HAPI_change
blueprint = Blueprint('auth', __name__)
SUPPORTED_METHODS = ('GET', 'POST', 'PUT', 'DELETE', 'OPTIONS')
# TODO: to be pulled into its own module and loaded per config
def scope_filter(req, token):
# Check path
resource_pattern = rf"(Patient|DocumentReference)$"
if re.search(resource_pattern, req.path) is None:
return False
user_id = token.get("sub")
identifier_pattern = rf"(https(:|%3[Aa])(\/|%2[Ff]){2}keycloak\.ltt\.cirg\.uw\.edu(%7[Cc]|\|))?{user_id}"
# Search params for keycloak id
params = req.args
id_param_value = params.get("identifier", params.get("_identifier", params.get("subject.identifier")))
if id_param_value is not None and re.search(identifier_pattern, id_param_value):
return True
# Search body for keycloak id
if req.is_json:
try:
body = req.get_json()
except (ValueError, TypeError):
return False
resource_type = body.get("resourceType")
if resource_type == "DocumentReference":
reference_string = body.get("subject", {}).get("reference")
if reference_string is not None and re.search(identifier_pattern, reference_string):
return True
return False
def proxy_request(req, upstream_url, user_info=None):
"""Forward request to given url"""
response = requests.request(
method=req.method,
url=upstream_url,
headers=req.headers,
params=req.args,
json=req.json,
data=req.data,
)
try:
result = response.json()
except json.decoder.JSONDecodeError:
return response.text
# Capture all changes
try:
if req.method in ("POST", "PUT", "DELETE"):
audit_HAPI_change(
user_info=user_info,
method=req.method,
params=req.args,
url=upstream_url,
)
except Exception as e:
from flask import current_app
current_app.logger.exception(e)
return result
@blueprint.route("/", defaults={"relative_path": ""}, methods=SUPPORTED_METHODS)
@blueprint.route("/<path:relative_path>", methods=SUPPORTED_METHODS)
def validate_jwt(relative_path):
"""Validate JWT and pass to upstream server"""
if f"/{relative_path}" in current_app.config["PATH_WHITELIST"]:
response_content = proxy_request(
req=request,
upstream_url=f"{current_app.config['UPSTREAM_SERVER']}/{relative_path}",
)
return response_content
token = request.headers.get("authorization", "").split("Bearer ")[-1]
if not token:
return jsonify(message="token missing"), 400
jwks_client = jwt.PyJWKClient(current_app.config["JWKS_URL"])
signing_key = jwks_client.get_signing_key_from_jwt(token)
try:
decoded_token = jwt.decode(
jwt=token,
# TODO cache public key in redis
key=signing_key.key,
algorithms=("RS256"),
audience=("account"),
)
except jwt.exceptions.ExpiredSignatureError:
return jsonify(message="token expired"), 401
# TODO: call new function here to dynamically load a filter call dependent on config; hardwired for now
if scope_filter(request, decoded_token):
response_content = proxy_request(
req=request,
upstream_url=f"{current_app.config['UPSTREAM_SERVER']}/{relative_path}",
user_info=decoded_token.get("email") or decoded_token.get("preferred_username"),
)
return response_content
return jsonify(message="invalid request"), 400
@blueprint.route("/fhir/.well-known/smart-configuration")
def smart_configuration():
"""Non-secret application settings"""
results = {
"authorization_endpoint": current_app.config.get("OIDC_AUTHORIZE_URL"),
"token_endpoint": current_app.config.get("OIDC_TOKEN_URI"),
"introspection_endpoint": current_app.config.get(
"OIDC_TOKEN_INTROSPECTION_URI"
),
}
return jsonify(results)
@blueprint.route("/settings", defaults={"config_key": None})
@blueprint.route("/settings/<string:config_key>")
def config_settings(config_key):
"""Non-secret application settings"""
# workaround no JSON representation for datetime.timedelta
class CustomJSONEncoder(flask_json.JSONEncoder):
def default(self, obj):
return str(obj)
current_app.json_encoder = CustomJSONEncoder
# return selective keys - not all can be be viewed by users, e.g.secret key
blacklist = ("SECRET", "KEY")
if config_key:
key = config_key.upper()
for pattern in blacklist:
if pattern in key:
abort(400, f"Configuration key {key} not available")
return jsonify({key: current_app.config.get(key)})
results = {}
for key in current_app.config:
matches = any(pattern for pattern in blacklist if pattern in key)
if matches:
continue
results[key] = current_app.config.get(key)
return jsonify(results)