forked from ThePalaceProject/library-registry
-
Notifications
You must be signed in to change notification settings - Fork 0
/
adobe_vendor_id.py
399 lines (309 loc) · 13.6 KB
/
adobe_vendor_id.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
import re
import requests
from flask import Response, request
import adobe_xml_templates as t
from model import ShortClientTokenDecoder
from util.string_helpers import base64
from util.xmlparser import XMLParser
class AdobeVendorIDController:
"""
Flask controllers that implement the Account Service and Authorization Service
portions of the Adobe Vendor ID protocol.
"""
def __init__(self, _db, vendor_id, node_value, delegates=None):
"""Constructor.
:param delegates: A list of URLs or AdobeVendorIDClient objects. If this Vendor ID
server cannot validate an incoming login, it will delegate to each
of these other servers in turn.
"""
if not delegates:
delegates = []
self._db = _db
self.request_handler = AdobeVendorIDRequestHandler(vendor_id)
self.model = AdobeVendorIDModel(self._db, node_value, delegates)
def signin_handler(self):
"""Process an incoming signInRequest document."""
__transaction = self._db.begin_nested()
output = self.request_handler.handle_signin_request(
request.data.decode("utf8"),
self.model.standard_lookup,
self.model.authdata_lookup,
)
__transaction.commit()
return Response(output, 200, {"Content-Type": "application/xml"})
def userinfo_handler(self):
"""Process an incoming userInfoRequest document."""
output = self.request_handler.handle_accountinfo_request(
request.data.decode("utf8"), self.model.urn_to_label
)
return Response(output, 200, {"Content-Type": "application/xml"})
def status_handler(self):
return Response("UP", 200, {"Content-Type": "text/plain"})
class AdobeRequestParser(XMLParser):
NAMESPACES = {"adept": "http://ns.adobe.com/adept"}
def process(self, data):
requests = list(self.process_all(data, self.REQUEST_XPATH, self.NAMESPACES))
if not requests:
return None
return requests[
0
] # Return only the first request tag, even if there are multiple
def _add(self, d, tag, key, namespaces, transform=None):
v = self._xpath1(tag, "adept:" + key, namespaces)
if v is not None:
v = v.text
if v is not None:
v = v.strip()
if callable(transform):
v = transform(v)
d[key] = v
class AdobeSignInRequestParser(AdobeRequestParser):
REQUEST_XPATH = "/adept:signInRequest"
STANDARD = "standard"
AUTH_DATA = "authData"
def process_one(self, tag, namespaces):
method = tag.attrib.get("method")
if not method:
raise ValueError("No signin method specified")
data = dict(method=method)
if method == self.STANDARD:
self._add(data, tag, "username", namespaces)
self._add(data, tag, "password", namespaces)
elif method == self.AUTH_DATA:
self._add(data, tag, self.AUTH_DATA, namespaces, base64.b64decode)
else:
raise ValueError(f"Unknown signin method: {method}")
return data
class AdobeAccountInfoRequestParser(AdobeRequestParser):
REQUEST_XPATH = "/adept:accountInfoRequest"
def process_one(self, tag, namespaces):
method = tag.attrib.get("method")
data = dict(method=method)
self._add(data, tag, "user", namespaces)
return data
class AdobeVendorIDRequestHandler:
"""Standalone class that can be tested without bringing in Flask or the database schema"""
AUTH_ERROR_TYPE = "AUTH"
ACCOUNT_INFO_ERROR_TYPE = "ACCOUNT_INFO"
TOKEN_FAILURE = "Incorrect token."
AUTHENTICATION_FAILURE = "Incorrect barcode or PIN."
URN_LOOKUP_FAILURE = "Could not identify patron from '%s'."
SIGN_IN_RESPONSE_TEMPLATE = t.SIGN_IN_RESPONSE_TEMPLATE
ACCOUNT_INFO_RESPONSE_TEMPLATE = t.ACCOUNT_INFO_RESPONSE_TEMPLATE
ERROR_RESPONSE_TEMPLATE = t.ERROR_RESPONSE_TEMPLATE
def __init__(self, vendor_id):
self.vendor_id = vendor_id
def handle_signin_request(self, data, standard_lookup, authdata_lookup):
parser = AdobeSignInRequestParser()
try:
data = parser.process(data)
except Exception as e:
return self.error_document(self.AUTH_ERROR_TYPE, str(e))
user_id = label = None
if not data:
return self.error_document(
self.AUTH_ERROR_TYPE, "Request document in wrong format."
)
if "method" not in data:
return self.error_document(self.AUTH_ERROR_TYPE, "No method specified")
if data["method"] == parser.STANDARD:
(user_id, label) = standard_lookup(data)
failure = self.AUTHENTICATION_FAILURE
elif data["method"] == parser.AUTH_DATA:
authdata = data[parser.AUTH_DATA]
(user_id, label) = authdata_lookup(authdata)
failure = self.TOKEN_FAILURE
if user_id is None:
return self.error_document(self.AUTH_ERROR_TYPE, failure)
else:
return self.SIGN_IN_RESPONSE_TEMPLATE % {"user": user_id, "label": label}
def handle_accountinfo_request(self, data, urn_to_label):
parser = AdobeAccountInfoRequestParser()
label = None
try:
data = parser.process(data)
if not data:
return self.error_document(
self.ACCOUNT_INFO_ERROR_TYPE, "Request document in wrong format."
)
if "user" not in data:
return self.error_document(
self.ACCOUNT_INFO_ERROR_TYPE,
"Could not find user identifer in request document.",
)
label = urn_to_label(data["user"])
except Exception as e:
return self.error_document(self.ACCOUNT_INFO_ERROR_TYPE, str(e))
if label:
return self.ACCOUNT_INFO_RESPONSE_TEMPLATE % dict(label=label)
else:
return self.error_document(
self.ACCOUNT_INFO_ERROR_TYPE, self.URN_LOOKUP_FAILURE % data["user"]
)
def error_document(self, type, message):
return self.ERROR_RESPONSE_TEMPLATE % {
"vendor_id": self.vendor_id,
"type": type,
"message": message,
}
class AdobeVendorIDModel:
"""Implement Adobe Vendor ID within the library registry's database model"""
def __init__(self, _db, node_value, delegates):
self._db = _db
delegate_objs = []
for i in delegates:
if isinstance(i, str):
delegate_objs.append(AdobeVendorIDClient(i))
else:
delegate_objs.append(i)
self.short_client_token_decoder = ShortClientTokenDecoder(
node_value, delegate_objs
)
def standard_lookup(self, authorization_data):
"""
Treat an incoming username and password as the two parts of a short client token.
Return an Adobe Account ID and a human-readable label. Create a DelegatedPatronIdentifier
to hold the Adobe Account ID if necessary.
"""
username = authorization_data.get("username")
password = authorization_data.get("password")
try:
delegated_patron_identifier = (
self.short_client_token_decoder.decode_two_part(
self._db, username, password
)
)
except ValueError:
delegated_patron_identifier = None
if delegated_patron_identifier:
return self.account_id_and_label(delegated_patron_identifier)
else:
for delegate in self.short_client_token_decoder.delegates:
try:
(account_id, label, _) = delegate.sign_in_standard(
username, password
)
return account_id, label
except Exception:
pass # This delegate couldn't help us.
return (
None,
None,
) # Neither this server nor the delegates were able to do anything.
def authdata_lookup(self, authdata):
"""
Treat an authdata string as a short client token. Return an Adobe Account ID and a
human-readable label. Create a DelegatedPatronIdentifier to hold the Adobe Account ID
if necessary.
"""
try:
delegated_patron_identifier = self.short_client_token_decoder.decode(
self._db, authdata
)
except ValueError:
delegated_patron_identifier = None
if delegated_patron_identifier:
return self.account_id_and_label(delegated_patron_identifier)
else:
for delegate in self.short_client_token_decoder.delegates:
try:
(account_id, label, _) = delegate.sign_in_authdata(authdata)
return account_id, label
except Exception:
pass # This delegate couldn't help us.
return (
None,
None,
) # Neither this server nor the delegates were able to do anything.
def account_id_and_label(self, delegated_patron_identifier):
"""Turn a DelegatedPatronIdentifier into a 2-tuple of (account id, label)"""
if not delegated_patron_identifier:
return (None, None)
urn = delegated_patron_identifier.delegated_identifier
return (urn, self.urn_to_label(urn))
def urn_to_label(self, urn):
"""We have no information about patrons, so labels are sparse."""
return f"Delegated account ID {urn}"
class VendorIDAuthenticationError(Exception):
"""The Vendor ID service is working properly but returned an error."""
class VendorIDServerException(Exception):
"""The Vendor ID service is not working properly."""
class AdobeVendorIDClient:
"""
A client library for the Adobe Vendor ID protocol.
Used by the AdobeVendorIDAcceptanceTestScript to verify the compliance of the library registry.
It may also be used during a transition period where you are moving from another Vendor ID
implementation to a library registry. You can delegate to another Vendor ID implementation the
validation of any credentials that cannot be validated through the library registry.
"""
SIGNIN_AUTHDATA_BODY = """<signInRequest method="authData" xmlns="http://ns.adobe.com/adept">
<authData>%s</authData>
</signInRequest>"""
SIGNIN_STANDARD_BODY = """<signInRequest method="standard" xmlns="http://ns.adobe.com/adept">
<username>%s</username>
<password>%s</password>
</signInRequest>"""
USER_INFO_BODY = """<accountInfoRequest method="standard" xmlns="http://ns.adobe.com/adept">
<user>%s</user>
</accountInfoRequest>"""
USER_IDENTIFIER_RE = re.compile("<user>([^<]+)</user>")
LABEL_RE = re.compile("<label>([^<]+)</label>")
ERROR_RE = re.compile('<error [^<]+ data="([^<]+)"')
def __init__(self, base_url):
self.base_url = base_url
self.signin_url = base_url + "SignIn"
self.accountinfo_url = base_url + "AccountInfo"
self.status_url = base_url + "Status"
def status(self):
"""Is the server up and running?"""
response = requests.get(self.status_url)
content = response.text
self.handle_error(response.status_code, content)
if content == "UP":
return True
raise VendorIDServerException("Unexpected response: %s" % content)
def sign_in_authdata(self, authdata):
"""Attempt to sign in using authdata.
:param: If signin is successful, a 2-tuple (account identifier, label).
"""
body = self.SIGNIN_AUTHDATA_BODY % base64.encodestring(authdata)
response = requests.post(self.signin_url, data=body)
return self._process_sign_in_result(response)
def sign_in_standard(self, username, password):
"""Attempt to sign in using username and password."""
body = self.SIGNIN_STANDARD_BODY % (username, password)
response = requests.post(self.signin_url, data=body)
return self._process_sign_in_result(response)
def user_info(self, urn):
"""Turn a user identifier into a label."""
body = self.USER_INFO_BODY % urn
response = requests.post(self.accountinfo_url, data=body)
content = response.text
self.handle_error(response.status_code, content)
label = self.extract_label(content)
if not label:
raise VendorIDServerException("Unexpected response: %s" % content)
return label, content
def extract_user_identifier(self, content):
return self._extract_by_re(content, self.USER_IDENTIFIER_RE)
def extract_label(self, content):
return self._extract_by_re(content, self.LABEL_RE)
def handle_error(self, status_code, content):
if status_code != 200:
raise VendorIDServerException(f"Unexpected status code: {status_code}")
error = self._extract_by_re(content, self.ERROR_RE)
if error:
raise VendorIDAuthenticationError(error)
def _extract_by_re(self, content, re):
match = re.search(content)
if not match:
return None
return match.groups()[0]
def _process_sign_in_result(self, response):
content = response.text
self.handle_error(response.status_code, content)
identifier = self.extract_user_identifier(content)
label = self.extract_label(content)
if not identifier or not label:
raise VendorIDServerException("Unexpected response: %s" % content)
return identifier, label, content