-
Notifications
You must be signed in to change notification settings - Fork 389
/
__init__.py
403 lines (314 loc) · 13.5 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
"""Stubs for patching HTTP and HTTPS requests"""
import logging
from http.client import HTTPConnection, HTTPResponse, HTTPSConnection
from io import BytesIO
from vcr.errors import CannotOverwriteExistingCassetteException
from vcr.request import Request
from . import compat
log = logging.getLogger(__name__)
class VCRFakeSocket:
"""
A socket that doesn't do anything!
Used when playing back cassettes, when there
is no actual open socket.
"""
def close(self):
pass
def settimeout(self, *args, **kwargs):
pass
def fileno(self):
"""
This is kinda crappy. requests will watch
this descriptor and make sure it's not closed.
Return file descriptor 0 since that's stdin.
"""
return 0 # wonder how bad this is....
def parse_headers(header_list):
"""
Convert headers from our serialized dict with lists for keys to a
HTTPMessage
"""
header_string = b""
for key, values in header_list.items():
for v in values:
header_string += key.encode("utf-8") + b":" + v.encode("utf-8") + b"\r\n"
return compat.get_httpmessage(header_string)
def serialize_headers(response):
headers = response.headers if response.msg is None else response.msg
out = {}
for key, values in compat.get_headers(headers):
out.setdefault(key, [])
out[key].extend(values)
return out
class VCRHTTPResponse(HTTPResponse):
"""
Stub response class that gets returned instead of a HTTPResponse
"""
def __init__(self, recorded_response):
self.fp = None
self.recorded_response = recorded_response
self.reason = recorded_response["status"]["message"]
self.status = self.code = recorded_response["status"]["code"]
self.version = None
self._content = BytesIO(self.recorded_response["body"]["string"])
self._closed = False
self._original_response = self # for requests.session.Session cookie extraction
headers = self.recorded_response["headers"]
# Since we are loading a response that has already been serialized, our
# response is no longer chunked. That means we don't want any
# libraries trying to process a chunked response. By removing the
# transfer-encoding: chunked header, this should cause the downstream
# libraries to process this as a non-chunked response.
te_key = [h for h in headers.keys() if h.upper() == "TRANSFER-ENCODING"]
if te_key:
del headers[te_key[0]]
self.headers = self.msg = parse_headers(headers)
self.length = compat.get_header(self.msg, "content-length") or None
@property
def closed(self):
# in python3, I can't change the value of self.closed. So I'
# twiddling self._closed and using this property to shadow the real
# self.closed from the superclass
return self._closed
def read(self, *args, **kwargs):
return self._content.read(*args, **kwargs)
def read1(self, *args, **kwargs):
return self._content.read1(*args, **kwargs)
def readall(self):
return self._content.readall()
def readinto(self, *args, **kwargs):
return self._content.readinto(*args, **kwargs)
def readline(self, *args, **kwargs):
return self._content.readline(*args, **kwargs)
def readlines(self, *args, **kwargs):
return self._content.readlines(*args, **kwargs)
def seekable(self):
return self._content.seekable()
def tell(self):
return self._content.tell()
def isatty(self):
return self._content.isatty()
def seek(self, *args, **kwargs):
return self._content.seek(*args, **kwargs)
def close(self):
self._closed = True
return True
def getcode(self):
return self.status
def isclosed(self):
return self.closed
def info(self):
return parse_headers(self.recorded_response["headers"])
def getheaders(self):
message = parse_headers(self.recorded_response["headers"])
return list(compat.get_header_items(message))
def getheader(self, header, default=None):
values = [v for (k, v) in self.getheaders() if k.lower() == header.lower()]
if values:
return ", ".join(values)
else:
return default
def readable(self):
return self._content.readable()
@property
def length_remaining(self):
return self._content.getbuffer().nbytes - self._content.tell()
def get_redirect_location(self):
"""
Returns (a) redirect location string if we got a redirect
status code and valid location, (b) None if redirect status and
no location, (c) False if not a redirect status code.
See https://urllib3.readthedocs.io/en/stable/reference/urllib3.response.html .
"""
if not (300 <= self.status <= 399):
return False
return self.getheader("Location")
@property
def data(self):
return self._content.getbuffer().tobytes()
def drain_conn(self):
pass
def stream(self, amt=65536, decode_content=None):
while True:
b = self._content.read(amt)
yield b
if not b:
break
class VCRConnection:
# A reference to the cassette that's currently being patched in
cassette = None
def _port_postfix(self):
"""
Returns empty string for the default port and ':port' otherwise
"""
port = self.real_connection.port
default_port = {"https": 443, "http": 80}[self._protocol]
return f":{port}" if port != default_port else ""
def _uri(self, url):
"""Returns request absolute URI"""
if url and not url.startswith("/"):
# Then this must be a proxy request.
return url
uri = f"{self._protocol}://{self.real_connection.host}{self._port_postfix()}{url}"
log.debug("Absolute URI: %s", uri)
return uri
def _url(self, uri):
"""Returns request selector url from absolute URI"""
prefix = f"{self._protocol}://{self.real_connection.host}{self._port_postfix()}"
return uri.replace(prefix, "", 1)
def request(self, method, url, body=None, headers=None, *args, **kwargs):
"""Persist the request metadata in self._vcr_request"""
self._vcr_request = Request(method=method, uri=self._uri(url), body=body, headers=headers or {})
log.debug(f"Got {self._vcr_request}")
# Note: The request may not actually be finished at this point, so
# I'm not sending the actual request until getresponse(). This
# allows me to compare the entire length of the response to see if it
# exists in the cassette.
self._sock = VCRFakeSocket()
def putrequest(self, method, url, *args, **kwargs):
"""
httplib gives you more than one way to do it. This is a way
to start building up a request. Usually followed by a bunch
of putheader() calls.
"""
self._vcr_request = Request(method=method, uri=self._uri(url), body="", headers={})
log.debug(f"Got {self._vcr_request}")
def putheader(self, header, *values):
self._vcr_request.headers[header] = values
def send(self, data):
"""
This method is called after request(), to add additional data to the
body of the request. So if that happens, let's just append the data
onto the most recent request in the cassette.
"""
self._vcr_request.body = self._vcr_request.body + data if self._vcr_request.body else data
def close(self):
# Note: the real connection will only close if it's open, so
# no need to check that here.
self.real_connection.close()
def endheaders(self, message_body=None):
"""
Normally, this would actually send the request to the server.
We are not sending the request until getting the response,
so bypass this part and just append the message body, if any.
"""
if message_body is not None:
self._vcr_request.body = message_body
def getresponse(self, _=False, **kwargs):
"""Retrieve the response"""
# Check to see if the cassette has a response for this request. If so,
# then return it
if self.cassette.can_play_response_for(self._vcr_request):
log.info(f"Playing response for {self._vcr_request} from cassette")
response = self.cassette.play_response(self._vcr_request)
return VCRHTTPResponse(response)
else:
if self.cassette.write_protected and self.cassette.filter_request(self._vcr_request):
raise CannotOverwriteExistingCassetteException(
cassette=self.cassette,
failed_request=self._vcr_request,
)
# Otherwise, we should send the request, then get the response
# and return it.
log.info(f"{self._vcr_request} not in cassette, sending to real server")
# This is imported here to avoid circular import.
# TODO(@IvanMalison): Refactor to allow normal import.
from vcr.patch import force_reset
with force_reset():
self.real_connection.request(
method=self._vcr_request.method,
url=self._url(self._vcr_request.uri),
body=self._vcr_request.body,
headers=self._vcr_request.headers,
)
# get the response
response = self.real_connection.getresponse()
response_data = response.data if hasattr(response, "data") else response.read()
# put the response into the cassette
response = {
"status": {"code": response.status, "message": response.reason},
"headers": serialize_headers(response),
"body": {"string": response_data},
}
self.cassette.append(self._vcr_request, response)
return VCRHTTPResponse(response)
def set_debuglevel(self, *args, **kwargs):
self.real_connection.set_debuglevel(*args, **kwargs)
def connect(self, *args, **kwargs):
"""
httplib2 uses this. Connects to the server I'm assuming.
Only pass to the baseclass if we don't have a recorded response
and are not write-protected.
"""
if hasattr(self, "_vcr_request") and self.cassette.can_play_response_for(self._vcr_request):
# We already have a response we are going to play, don't
# actually connect
return
if self.cassette.write_protected:
# Cassette is write-protected, don't actually connect
return
from vcr.patch import force_reset
with force_reset():
return self.real_connection.connect(*args, **kwargs)
self._sock = VCRFakeSocket()
@property
def sock(self):
if self.real_connection.sock:
return self.real_connection.sock
return self._sock
@sock.setter
def sock(self, value):
if self.real_connection.sock:
self.real_connection.sock = value
def __init__(self, *args, **kwargs):
kwargs.pop("strict", None) # apparently this is gone in py3
# need to temporarily reset here because the real connection
# inherits from the thing that we are mocking out. Take out
# the reset if you want to see what I mean :)
from vcr.patch import force_reset
with force_reset():
self.real_connection = self._baseclass(*args, **kwargs)
self._sock = None
def __setattr__(self, name, value):
"""
We need to define this because any attributes that are set on the
VCRConnection need to be propagated to the real connection.
For example, urllib3 will set certain attributes on the connection,
such as 'ssl_version'. These attributes need to get set on the real
connection to have the correct and expected behavior.
TODO: Separately setting the attribute on the two instances is not
ideal. We should switch to a proxying implementation.
"""
try:
setattr(self.real_connection, name, value)
except AttributeError:
# raised if real_connection has not been set yet, such as when
# we're setting the real_connection itself for the first time
pass
super().__setattr__(name, value)
def __getattr__(self, name):
"""
Send requests for weird attributes up to the real connection
(counterpart to __setattr above)
"""
if self.__dict__.get("real_connection"):
# check in case real_connection has not been set yet, such as when
# we're setting the real_connection itself for the first time
return getattr(self.real_connection, name)
return super().__getattr__(name)
for k, v in HTTPConnection.__dict__.items():
if isinstance(v, staticmethod):
setattr(VCRConnection, k, v)
class VCRHTTPConnection(VCRConnection):
"""A Mocked class for HTTP requests"""
_baseclass = HTTPConnection
_protocol = "http"
debuglevel = _baseclass.debuglevel
_http_vsn = _baseclass._http_vsn
class VCRHTTPSConnection(VCRConnection):
"""A Mocked class for HTTPS requests"""
_baseclass = HTTPSConnection
_protocol = "https"
is_verified = True
debuglevel = _baseclass.debuglevel
_http_vsn = _baseclass._http_vsn