diff --git a/katdal/chunkstore_s3.py b/katdal/chunkstore_s3.py index adf68614..a99ca981 100644 --- a/katdal/chunkstore_s3.py +++ b/katdal/chunkstore_s3.py @@ -20,6 +20,7 @@ import contextlib import copy import hashlib +import http.client import json import threading import time @@ -30,14 +31,19 @@ import jwt import numpy as np import requests +import urllib3 try: import botocore.auth import botocore.credentials except ImportError: botocore = None -from urllib3.exceptions import MaxRetryError -from urllib3.response import HTTPResponse +try: + # XXX A deprecated alias of TimeoutError since Python 3.10 + from socket import timeout as SocketTimeoutError +except ImportError: + SocketTimeoutError = TimeoutError +from urllib3.exceptions import MaxRetryError, ReadTimeoutError, ProtocolError, IncompleteRead from urllib3.util.retry import Retry from .chunkstore import (BadChunk, ChunkNotFound, ChunkStore, StoreUnavailable, @@ -68,11 +74,9 @@ ] } -# Fake HTTP status code associated with reset connections / truncated data -_TRUNCATED_HTTP_STATUS_CODE = 555 # These HTTP responses typically indicate temporary S3 server / proxy overload, # which will trigger retries terminating in a missing data response if unsuccessful. -_DEFAULT_SERVER_GLITCHES = (500, 502, 503, 504, _TRUNCATED_HTTP_STATUS_CODE) +_DEFAULT_SERVER_GLITCHES = (500, 502, 503, 504) class S3ObjectNotFound(ChunkNotFound): @@ -82,33 +86,50 @@ class S3ObjectNotFound(ChunkNotFound): class S3ServerGlitch(ChunkNotFound): """S3 chunk store responded with an HTTP error deemed to be temporary.""" - def __init__(self, msg, status_code): - super().__init__(msg) - self.status_code = status_code - - -class TruncatedRead(ValueError): - """HTTP request to S3 chunk store responded with fewer bytes than expected.""" - class _DetectTruncation: - """Raise :exc:`TruncatedRead` if wrapped `readable` runs out of data.""" + """Raise :exc:`IncompleteRead` if wrapped `readable` runs out of data.""" def __init__(self, readable): self._readable = readable + if isinstance(readable, http.client.HTTPResponse): + # Store initial length in case of httplib so that we can figure out bytes_read + # (because tell() doesn't work since the socket cannot be seeked). + self._content_length = readable.length + else: + self._content_length = None def __getattr__(self, name): """Proxy all attributes to underlying wrapped object.""" return getattr(self._readable, name) - def read(self, size, *args, **kwargs): + def _raise_incomplete_read(self, bytes_read, bytes_left): + """Figure out more accurate parameters for `IncompleteRead` and raise it.""" + if isinstance(self._readable, http.client.HTTPResponse): + if self._readable.length is not None: + bytes_left = self._readable.length + bytes_read = self._content_length - bytes_left + elif isinstance(self._readable, urllib3.response.HTTPResponse): + if self._readable.length_remaining is not None: + bytes_left = self._readable.length_remaining + bytes_read = self._readable.tell() + raise IncompleteRead(bytes_read, bytes_left) + + def read(self, size=None, *args, **kwargs): """Overload `read` method to detect truncated data source.""" data = self._readable.read(size, *args, **kwargs) - if data == b'' and size != 0: - raise TruncatedRead('Error reading from S3 HTTP response: expected ' - f'{size} more byte(s), got EOF') + if data == b'' and size is not None and size > 0: + self._raise_incomplete_read(0, size) return data + def readinto(self, buffer, *args, **kwargs): + """Overload `readinto` method to detect truncated data source.""" + view = memoryview(buffer) + bytes_read = self._readable.readinto(view, *args, **kwargs) + if bytes_read != view.nbytes: + self._raise_incomplete_read(bytes_read, view.nbytes - bytes_read) + return bytes_read + def read_array(fp): """Read a numpy array in npy format from a file descriptor. @@ -138,10 +159,7 @@ def read_array(fp): # For HTTPResponse it works to just pass in `data` directly, but the # wrapping is added for the benefit of any other implementation that # isn't expecting a numpy array - bytes_read = fp.readinto(memoryview(data.view(np.uint8))) - if bytes_read != data.nbytes: - raise TruncatedRead('Error reading from S3 HTTP response: ' - f'expected {data.nbytes} bytes, got {bytes_read}') + fp.readinto(data.view(np.uint8)) if fortran_order: data.shape = shape[::-1] data = data.transpose() @@ -168,6 +186,16 @@ def _read_chunk(response): return chunk +def _read_object(response): + """Read bytes from content of HTTP response and check that it's all there.""" + data = response.content + # Verify that all content has been consumed + bytes_left = response.raw.length_remaining + if bytes_left is not None and bytes_left > 0: + raise IncompleteRead(response.raw.tell(), bytes_left) + return data + + def _bucket_url(url): """Turn `url` into associated S3 bucket URL (first path component only).""" split_url = urllib.parse.urlsplit(url) @@ -396,6 +424,66 @@ def len(self): return sum(memoryview(item).nbytes for item in self.items) +def _raise_for_status(response, chunk_name, ignored_errors): + """Turn error responses into appropriate exceptions, like raise_for_status.""" + status = response.status_code + if 400 <= status < 600 and status not in ignored_errors: + # Construct error message, including detailed response content if sensible + prefix = f'Chunk {chunk_name!r}: ' if chunk_name else '' + msg = (f'{prefix}Store responded with HTTP error {status} ({response.reason}) ' + f'to request: {response.request.method} {response.url}') + content_type = response.headers.get('Content-Type') + if content_type in ('application/xml', 'text/xml', 'text/plain'): + msg += f'\nDetails of server response: {response.text}' + # Raise the appropriate exception + if status in (401, 403): + raise AuthorisationFailed(msg) + elif status == 404: + raise S3ObjectNotFound(msg) + else: + raise StoreUnavailable(msg) + + +@contextlib.contextmanager +def _request(session, method, url, timeout=(None, None), **kwargs): + """A beefed-up request that facilitates retries on reading the response. + + This catches socket timeouts and reset connections while the response data + is being read and reraises them as appropriate urllib3 exceptions that can + be passed to a `Retry` object to trigger a read retry. + """ + try: + with session.request(method, url, timeout=timeout, **kwargs) as response: + yield response + except SocketTimeoutError as error: + msg = f'Read timed out - socket idle for {timeout[1]} seconds' + raise ReadTimeoutError('Read timeout', url, msg) from error + except ( + # Requests massages ProtocolErrors into ChunkedEncodingErrors, so turn it back + requests.exceptions.ChunkedEncodingError, + ConnectionResetError, + IncompleteRead, + ) as error: + raise ProtocolError(str(error)) from error + + +def _connect_read_tuple(connect_and_or_read): + """Turn connect and/or read retries/timeouts into a (connect, read) tuple.""" + try: + connect, read = connect_and_or_read + except TypeError: + connect = read = connect_and_or_read + return (connect, read) + + +def _retry_object(retries, **defaults): + """Turn (connect, read) `retries` into `Retry` object (or keep as is).""" + if not isinstance(retries, Retry): + connect_retries, read_retries = _connect_read_tuple(retries) + retries = Retry(connect=connect_retries, read=read_retries, **defaults) + return retries + + class S3ChunkStore(ChunkStore): """A store of chunks (i.e. N-dimensional arrays) based on the Amazon S3 API. @@ -421,9 +509,9 @@ class S3ChunkStore(ChunkStore): string type with UTF-8. The URL may also contain a path if this store is relative to an existing bucket, in which case the chunk name is a relative path (useful for unit tests). - timeout : float or tuple of 2 floats, optional + timeout : float or None or tuple of 2 floats or None's, optional Connect / read timeout, in seconds, either a single value for both or - custom values as (connect, read) tuple (set to None to leave unchanged) + custom values as (connect, read) tuple. None means "wait forever"... retries : int or tuple of 2 ints or :class:`urllib3.util.retry.Retry`, optional Number of connect / read retries, either a single value for both or custom values as (connect, read) tuple, or a `Retry` object for full @@ -449,35 +537,34 @@ class S3ChunkStore(ChunkStore): def __init__(self, url, timeout=(30, 300), retries=2, token=None, credentials=None, public_read=False, expiry_days=0, **kwargs): - error_map = {requests.exceptions.RequestException: StoreUnavailable} + error_map = { + # Urllib3 / Requests exceptions raised when read / status retries run out + MaxRetryError: S3ServerGlitch, # too many retries on reading response + requests.exceptions.ReadTimeout: S3ServerGlitch, # read timeouts on header + requests.exceptions.RetryError: S3ServerGlitch, # too many status retries + # A generic request error (includes connection failures) + requests.exceptions.RequestException: StoreUnavailable, + } super().__init__(error_map) auth = _auth_factory(url, token, credentials) - if not isinstance(retries, Retry): - try: - connect_retries, read_retries = retries - except TypeError: - connect_retries = read_retries = retries - # The backoff factor of 10 provides 5 minutes worth of retries - # when the S3 server is strained; with 5 retries you get - # (0 + 2 + 4 + 8 + 16) * 10 = 300 seconds on top of read timeouts. - retries = Retry(connect=connect_retries, read=read_retries, - status=5, backoff_factor=10., - status_forcelist=_DEFAULT_SERVER_GLITCHES) def session_factory(): session = _CacheSettingsSession(url) session.auth = auth - # Don't let requests do status retries as we'll be doing it ourselves - max_retries = retries.new(status=0, raise_on_status=False) - adapter = requests.adapters.HTTPAdapter(max_retries=max_retries) + # Don't set `max_retries` yet - it will be done at the request level + adapter = requests.adapters.HTTPAdapter() session.mount(url, adapter) return session self._session_pool = _Pool(session_factory) self._url = to_str(url) - self._retries = retries self._verified_buckets = set() - self.timeout = timeout + self.timeout = _connect_read_tuple(timeout) + # The backoff factor of 10 provides 5 minutes worth of retries + # when the S3 server is strained; with 5 retries you get + # (0 + 2 + 4 + 8 + 16) * 10 = 300 seconds on top of read timeouts. + self.retries = _retry_object(retries, status=5, backoff_factor=10., + status_forcelist=_DEFAULT_SERVER_GLITCHES) self.public_read = public_read self.expiry_days = int(expiry_days) @@ -485,93 +572,39 @@ def _chunk_url(self, chunk_name, extension='.npy'): """Assemble URL corresponding to chunk (or array) name.""" return urllib.parse.urljoin(self._url, to_str(urllib.parse.quote(chunk_name + extension))) - @contextlib.contextmanager - def request(self, method, url, chunk_name='', ignored_errors=(), timeout=(), **kwargs): - """Run a request on a session from the pool and handle error responses. + def request( + self, + method, + url, + process=lambda response: response, + chunk_name='', + ignored_errors=(), + timeout=(), # None has a special meaning, so use something else to indicate default + retries=None, + **kwargs + ): + """Send HTTP request to S3 server, process response and retry if needed. - This is a context manager like :meth:`requests.Session.request` that - either yields a successful response or raises the appropriate chunk - store exception. + This retries temporary HTTP errors, including reset connections while + processing a successful response. Parameters ---------- method, url : str The standard required parameters of :meth:`requests.Session.request` + process : function, signature ``result = process(response)``, optional + Function that will process response (just return response by default) chunk_name : str, optional Name of chunk, used for error reporting only ignored_errors : collection of int, optional HTTP status codes that are treated like 200 OK, not raising an error - timeout : tuple or float, optional + timeout : float or None or tuple of 2 floats or None's, optional Override timeout for this request (use the store timeout by default) + retries : int or tuple of 2 ints or :class:`urllib3.util.retry.Retry`, optional + Override retries for this request (use the store retries by default) kwargs : optional These are passed on to :meth:`requests.Session.request` - Yields - ------ - response : `requests.Response` - HTTP response, considered successful - - Raises - ------ - AuthorisationFailed - If the request is not authorised by appropriate token or credentials - S3ObjectNotFound - If S3 object request fails because it does not exist - S3ServerGlitch - If HTTP error has a retryable status code (i.e. temporary glitch) - StoreUnavailable - If a general HTTP error occurred that is not ignored - """ - kwargs['timeout'] = self.timeout if timeout == () else timeout - # Use _standard_errors to filter errors emanating from within with-block - with self._standard_errors(chunk_name), self._session_pool() as session: - with session.request(method, url, **kwargs) as response: - # Turn error responses into the appropriate exception, like raise_for_status - status = response.status_code - if 400 <= status < 600 and status not in ignored_errors: - # Construct error message, including detailed response content if sensible - prefix = f'Chunk {chunk_name!r}: ' if chunk_name else '' - msg = (f'{prefix}Store responded with HTTP error {status} ({response.reason}) ' - f'to request: {response.request.method} {response.url}') - content_type = response.headers.get('Content-Type') - if content_type in ('application/xml', 'text/xml', 'text/plain'): - msg += f'\nDetails of server response: {response.text}' - # Raise the appropriate exception - if status in (401, 403): - raise AuthorisationFailed(msg) - elif status == 404: - raise S3ObjectNotFound(msg) - elif self._retries.is_retry(method, status): - raise S3ServerGlitch(msg, status) - else: - raise StoreUnavailable(msg) - try: - yield response - except TruncatedRead as trunc_error: - # A truncated read is considered a glitch with custom status - prefix = f'Chunk {chunk_name!r}: ' if chunk_name else '' - glitch_error = S3ServerGlitch(prefix + str(trunc_error), - _TRUNCATED_HTTP_STATUS_CODE) - raise glitch_error from trunc_error - - def complete_request(self, method, url, chunk_name='', - process=lambda response: None, **kwargs): - """Send HTTP request to S3 server, process response and retry if needed. - - This retries temporary HTTP errors, including reset connections while - processing a successful response. - - Parameters - ---------- - method, url : str - The standard required parameters of :meth:`requests.Session.request` - chunk_name : str, optional - Name of chunk, used for error reporting only - process : function, signature ``result = process(response)``, optional - Function that will process response (the default does nothing) - kwargs : optional - Passed on to :meth:`request` and :meth:`requests.Session.request` - Returns ------- result : object @@ -588,23 +621,24 @@ def complete_request(self, method, url, chunk_name='', StoreUnavailable If a general HTTP error occurred that is not ignored """ - retries = self._retries.new() - while True: - try: - with self.request(method, url, chunk_name, **kwargs) as response: - result = process(response) - except S3ServerGlitch as e: - # Retry based on status of response until we run out of retries - response = HTTPResponse(status=e.status_code) + timeout = self.timeout if timeout == () else _connect_read_tuple(timeout) + retries = self.retries if retries is None else _retry_object(retries) + retries = retries.new() + # Use _standard_errors to filter errors emanating from within with-block + with self._standard_errors(chunk_name), self._session_pool() as session: + adapter = session.get_adapter(url) + while True: + # Initialise and reuse the same Retry object for the entire session + adapter.max_retries = retries try: - retries = retries.increment(method, url, response) - except MaxRetryError: - # Raise the final straw that broke the retry camel's back - raise e from None - else: - retries.sleep(response) - else: - return result + with _request(session, method, url, timeout, **kwargs) as response: + _raise_for_status(response, chunk_name, ignored_errors) + retries = response.raw.retries.new() + return process(response) + # Urllib3 exceptions that can trigger read retries + except (ReadTimeoutError, ProtocolError) as error: + retries = retries.increment(method, url, error=error) + retries.sleep() def _verify_bucket(self, url, chunk_error=None): """Check that bucket associated with `url` exists and is not empty.""" @@ -613,8 +647,7 @@ def _verify_bucket(self, url, chunk_error=None): return try: # Speed up the request by only checking that the bucket has at least one key - response = self.complete_request('GET', bucket, process=lambda r: r, - params={'max-keys': 1}) + response = self.request('GET', bucket, params={'max-keys': 1}) except S3ObjectNotFound as err: # There is no point continuing if the bucket is completely missing raise StoreUnavailable(err) from chunk_error @@ -634,8 +667,8 @@ def get_chunk(self, array_name, slices, dtype): # work with non-identity encodings. headers = {'Accept-Encoding': 'identity'} try: - chunk = self.complete_request('GET', url, chunk_name, _read_chunk, - headers=headers, stream=True) + chunk = self.request('GET', url, _read_chunk, chunk_name=chunk_name, + headers=headers, stream=True) except S3ObjectNotFound as err: # If the entire bucket is gone, this becomes StoreUnavailable instead self._verify_bucket(url, err) @@ -655,7 +688,7 @@ def create_array(self, array_name): def _create_bucket(self, url): # Make bucket (409 indicates the bucket already exists, which is OK) - self.complete_request('PUT', url, ignored_errors=(409,)) + self.request('PUT', url, ignored_errors=(409,)) if self.public_read: policy_url = urllib.parse.urljoin(url, '?policy') @@ -665,14 +698,14 @@ def _create_bucket(self, url): f'arn:aws:s3:::{bucket_name}/*', f'arn:aws:s3:::{bucket_name}' ] - self.complete_request('PUT', policy_url, data=json.dumps(policy)) + self.request('PUT', policy_url, data=json.dumps(policy)) if self.expiry_days > 0: xml_payload = _BASE_LIFECYCLE_POLICY.format(self.expiry_days) b64_md5 = base64.b64encode(hashlib.md5(xml_payload.encode('utf-8')).digest()).decode('utf-8') lifecycle_headers = {'Content-Type': 'text/xml', 'Content-MD5': b64_md5} - self.complete_request('PUT', url, params='lifecycle', - data=xml_payload, headers=lifecycle_headers) + self.request('PUT', url, params='lifecycle', + data=xml_payload, headers=lifecycle_headers) def put_chunk(self, array_name, slices, chunk): """See the docstring of :meth:`ChunkStore.put_chunk`.""" @@ -686,21 +719,21 @@ def put_chunk(self, array_name, slices, chunk): md5 = base64.b64encode(md5_gen.digest()) headers = {'Content-MD5': md5.decode()} data = _Multipart([npy_header, memoryview(chunk)]) - self.complete_request('PUT', url, chunk_name, headers=headers, data=data) + self.request('PUT', url, chunk_name=chunk_name, headers=headers, data=data) def mark_complete(self, array_name): """See the docstring of :meth:`ChunkStore.mark_complete`.""" self.create_array(array_name) obj_name = self.join(array_name, 'complete') url = urllib.parse.urljoin(self._url, obj_name) - self.complete_request('PUT', url, obj_name, data=b'') + self.request('PUT', url, chunk_name=obj_name, data=b'') def is_complete(self, array_name): """See the docstring of :meth:`ChunkStore.is_complete`.""" obj_name = self.join(array_name, 'complete') url = urllib.parse.urljoin(self._url, obj_name) try: - self.complete_request('GET', url, obj_name) + self.request('GET', url, chunk_name=obj_name) except ChunkNotFound: return False return True diff --git a/katdal/datasources.py b/katdal/datasources.py index 0c7fc942..9fa9f541 100644 --- a/katdal/datasources.py +++ b/katdal/datasources.py @@ -26,7 +26,7 @@ from .chunkstore import ChunkStoreError from .chunkstore_npy import NpyFileChunkStore -from .chunkstore_s3 import S3ChunkStore +from .chunkstore_s3 import S3ChunkStore, _read_object from .dataset import parse_url_or_path from .sensordata import TelstateSensorGetter, TelstateToStr from .vis_flags_weights import ChunkStoreVisFlagsWeights @@ -461,8 +461,8 @@ def from_url(cls, url, chunk_store='auto', **kwargs): telstate = katsdptelstate.TelescopeState() try: rdb_store = S3ChunkStore(store_url, **kwargs) - with rdb_store.request('GET', rdb_url) as response: - telstate.load_from_file(io.BytesIO(response.content)) + rdb_data = rdb_store.request('GET', rdb_url, process=_read_object) + telstate.load_from_file(io.BytesIO(rdb_data)) except ChunkStoreError as e: raise DataSourceNotFound(str(e)) from e # If the RDB file is opened via archive URL, use that URL and diff --git a/katdal/test/test_chunkstore_s3.py b/katdal/test/test_chunkstore_s3.py index 0897da60..44687d7d 100644 --- a/katdal/test/test_chunkstore_s3.py +++ b/katdal/test/test_chunkstore_s3.py @@ -35,6 +35,7 @@ import re import shutil import socket +import struct import sys import tempfile import threading @@ -49,13 +50,22 @@ from katsdptelstate.rdb_writer import RDBWriter import pytest from numpy.testing import assert_array_equal +from urllib3.exceptions import IncompleteRead from urllib3.util.retry import Retry from katdal.chunkstore import ChunkNotFound, StoreUnavailable -from katdal.chunkstore_s3 import (_DEFAULT_SERVER_GLITCHES, InvalidToken, - S3ChunkStore, TruncatedRead, _AWSAuth, - AuthorisationFailed, decode_jwt, read_array) -from katdal.datasources import TelstateDataSource +from katdal.chunkstore_s3 import ( + _DEFAULT_SERVER_GLITCHES, + InvalidToken, + S3ChunkStore, + S3ServerGlitch, + _AWSAuth, + AuthorisationFailed, + decode_jwt, + read_array, + _read_object, +) +from katdal.datasources import TelstateDataSource, DataSourceNotFound from katdal.test.s3_utils import MissingProgram, S3Server, S3User from katdal.test.test_chunkstore import ChunkStoreTestBase, generate_arrays from katdal.test.test_datasources import (assert_telstate_data_source_equal, @@ -68,11 +78,11 @@ # Pick quick but different timeouts and retries for unit tests: # - The effective connect timeout is 5.0 (initial) + 5.0 (1 retry) = 10 seconds # - The effective read timeout is 2.0 + 3 * 2.0 + 0.1 * (0 + 2 + 4) = 8.6 seconds -# - The effective status timeout is 0.1 * (0 + 2 + 4) = 0.6 seconds, or -# 4 * 0.1 + 0.6 = 1.0 second if the suggestions use SUGGESTED_STATUS_DELAY +# - The effective status timeout is 0.1 * (0 + 2) = 0.2 seconds, or +# 3 * 0.1 + 0.2 = 0.5 second if the suggestions use SUGGESTED_STATUS_DELAY TIMEOUT = (5.0, 2.0) -RETRY = Retry(connect=1, read=3, status=3, backoff_factor=0.1, - raise_on_status=False, status_forcelist=_DEFAULT_SERVER_GLITCHES) +RETRY = Retry(connect=1, read=3, status=2, backoff_factor=0.1, + status_forcelist=_DEFAULT_SERVER_GLITCHES) SUGGESTED_STATUS_DELAY = 0.1 READ_PAUSE = 0.1 # Dummy private key for ES256 algorithm (taken from PyJWT unit tests) @@ -103,9 +113,9 @@ def get_free_port(host): class TestReadArray: - def _test(self, array): + def _test(self, array, version=None): fp = io.BytesIO() - np.save(fp, array) + np.lib.format.write_array(fp, array, version, allow_pickle=False) fp.seek(0) out = read_array(fp) np.testing.assert_equal(array, out) @@ -122,12 +132,7 @@ def testFortran(self): self._test(np.arange(20).reshape(4, 5, 1).T) def testV2(self): - # Make dtype that needs more than 64K to store, forcing .npy version 2.0 - dtype = np.dtype([('a' * 70000, np.float32), ('b', np.float32)]) - with warnings.catch_warnings(): - # Suppress warning that V2 files can only be read by numpy >= 1.9 - warnings.simplefilter('ignore', category=UserWarning) - self._test(np.zeros(100, dtype)) + self._test(np.zeros(100), version=(2, 0)) def testBadVersion(self): data = b'\x93NUMPY\x03\x04' # Version 3.4 @@ -149,7 +154,7 @@ def _truncate_and_fail_to_read(self, *args): fp.seek(*args) fp.truncate() fp.seek(0) - with pytest.raises(TruncatedRead): + with pytest.raises(IncompleteRead): read_array(fp) def testShort(self): @@ -273,6 +278,8 @@ def setup_method(self): shutil.rmtree(entry.path) # Also get rid of the cache of verified buckets self.store._verified_buckets.clear() + self.store.timeout = TIMEOUT + self.store.retries = RETRY print(f"Chunk store: {self.store_url}, S3 server: {self.s3_url}") def array_name(self, name): @@ -326,7 +333,7 @@ def test_token_without_https(self): def test_mark_complete_top_level(self): self._test_mark_complete(PREFIX + '-completetest') - def test_rdb_support(self): + def test_rdb_support(self, suggestion=''): telstate = katsdptelstate.TelescopeState() view, cbid, sn, _, _ = make_fake_data_source(telstate, self.store, (5, 16, 40), PREFIX) telstate['capture_block_id'] = cbid @@ -339,9 +346,13 @@ def test_rdb_support(self): # Read the file back in and upload it to S3 with open(temp_filename, mode='rb') as rdb_file: rdb_data = rdb_file.read() - rdb_url = urllib.parse.urljoin(self.store_url, self.store.join(cbid, rdb_filename)) + if suggestion: + rdb_path = self.store.join(suggestion, cbid, rdb_filename) + else: + rdb_path = self.store.join(cbid, rdb_filename) + rdb_url = urllib.parse.urljoin(self.store_url, rdb_path) self.store.create_array(cbid) - self.store.complete_request('PUT', rdb_url, data=rdb_data) + self.store.request('PUT', rdb_url, data=rdb_data) # Check that data source can be constructed from URL (with auto chunk store) source_from_url = TelstateDataSource.from_url(rdb_url, **self.store_kwargs) source_direct = TelstateDataSource(view, cbid, sn, self.store) @@ -374,6 +385,17 @@ def __getattr__(self, name): return self.do_all return self.__getattribute__(name) + def _reset_connection(self): + """Send a TCP reset (RST) packet to reset the connection. + + Enable SO_LINGER with a linger interval of 0 s. This drops the + connection like a hot potato once closed. + """ + l_onoff = 1 # non-zero value enables linger option in kernel + l_linger = 0 # timeout interval in seconds + self.connection.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, + struct.pack('ii', l_onoff, l_linger)) + def do_all(self): # See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection HOP_HEADERS = { @@ -385,6 +407,7 @@ def do_all(self): data = self.rfile.read(data_len) truncate = False pause = 0.0 + reset = False glitch_location = 0 # Extract a proxy suggestion prepended to the path @@ -409,12 +432,16 @@ def do_all(self): self.send_response(status_code, 'Suggested by unit test') self.end_headers() return + if command == 'reset-connection': + self._reset_connection() + return # Truncate or pause transmission of the payload after specified bytes - glitch = re.match(r'^(truncate|pause)-read-after-(\d+)-bytes$', command) + glitch = re.match(r'^(truncate|pause|reset)-read-after-(\d+)-bytes$', command) if glitch: flavour = glitch.group(1) truncate = (flavour == 'truncate') pause = READ_PAUSE if flavour == 'pause' else 0.0 + reset = (flavour == 'reset') glitch_location = int(glitch.group(2)) else: raise ValueError(f"Unknown command '{command}' " @@ -474,14 +501,19 @@ def do_all(self): # might stop listening if it knows nothing more is coming, like in a PUT response). if len(content) == 0: return - if pause: - self.wfile.write(content[:glitch_location]) + # Write the first half of the data + self.wfile.write(content[:glitch_location]) + if truncate: + return + elif reset: + self._reset_connection() + return + elif pause: # The wfile object should be an unbuffered _SocketWriter but flush anyway self.wfile.flush() time.sleep(pause) - self.wfile.write(content[glitch_location:]) - else: - self.wfile.write(content[:glitch_location] if truncate else content) + # Write the rest of the data + self.wfile.write(content[glitch_location:]) def log_message(self, format, *args): # Get time offset from first of these requests (useful for debugging) @@ -547,7 +579,8 @@ def prepare_store_args(cls, url, **kwargs): elif url != cls.httpd.target: raise RuntimeError('Cannot use multiple target URLs with http proxy') # The token authorises the standard bucket and anything starting with PREFIX - token = encode_jwt({'prefix': [BUCKET, PREFIX]}) + # (as well as suggestions prepended to the path) + token = encode_jwt({'prefix': [BUCKET, PREFIX, 'please']}) kwargs.setdefault('token', token) return super().prepare_store_args(cls.proxy_url, credentials=None, **kwargs) @@ -570,68 +603,160 @@ def _put_chunk(self, suggestion): self.store.put_chunk(array_name, slices, chunk) return chunk, slices, self.store.join(array_name, suggestion) - @pytest.mark.expected_duration(0.9) + # 50x STATUSES + # + # With the RETRY settings of 2 status retries, backoff factor of 0.1 s + # and SUGGESTED_STATUS_DELAY of 0.1 s we get the following timeline + # (indexed by seconds): + # 0.0 - access chunk for the first time + # 0.1 - response is 500, immediately try again (retry #1) + # 0.2 - response is 500, back off for 2 * 0.1 seconds + # 0.4 - retry #2 (the final attempt) - server should now be fixed + # 0.4 - success! + + @pytest.mark.expected_duration(0.1) + def test_server_error(self): + suggestion = 'please-respond-with-500-for-0.2-seconds' + chunk, slices, array_name = self._put_chunk(suggestion) + chunk_name, _ = self.store.chunk_metadata(array_name, slices, dtype=chunk.dtype) + url = self.store._chunk_url(chunk_name) + response = self.store.request('GET', url, ignored_errors=(500,), retries=0, stream=True) + assert response.status_code == 500 + + @pytest.mark.expected_duration(0.4) def test_recover_from_server_errors(self): - chunk, slices, array_name = self._put_chunk( - 'please-respond-with-500-for-0.8-seconds') - # With the RETRY settings of 3 status retries, backoff factor of 0.1 s - # and SUGGESTED_STATUS_DELAY of 0.1 s we get the following timeline - # (indexed by seconds): - # 0.0 - access chunk for the first time - # 0.1 - response is 500, immediately try again (retry #1) - # 0.2 - response is 500, back off for 2 * 0.1 seconds - # 0.4 - retry #2 - # 0.5 - response is 500, back off for 4 * 0.1 seconds - # 0.9 - retry #3 (the final attempt) - server should now be fixed - # 0.9 - success! - self.store.get_chunk(array_name, slices, chunk.dtype) - - @pytest.mark.expected_duration(1.0) + suggestion = 'please-respond-with-500-for-0.3-seconds' + chunk, slices, array_name = self._put_chunk(suggestion) + chunk_retrieved = self.store.get_chunk(array_name, slices, chunk.dtype) + assert_array_equal(chunk_retrieved, chunk, 'Bad chunk after server error') + + @pytest.mark.expected_duration(0.5) def test_persistent_server_errors(self): - chunk, slices, array_name = self._put_chunk( - 'please-respond-with-502-for-1.2-seconds') - # After 0.9 seconds the client gives up and returns with failure 0.1 s later + suggestion = 'please-respond-with-502-for-0.7-seconds' + chunk, slices, array_name = self._put_chunk(suggestion) + # After 0.4 seconds the client gives up and returns with failure 0.1 s later with pytest.raises(ChunkNotFound): self.store.get_chunk(array_name, slices, chunk.dtype) - @pytest.mark.expected_duration(0.6) - def test_recover_from_read_truncated_within_npy_header(self): - chunk, slices, array_name = self._put_chunk( - 'please-truncate-read-after-60-bytes-for-0.4-seconds') - # With the RETRY settings of 3 status retries and backoff factor of 0.1 s - # we get the following timeline (indexed by seconds): - # 0.0 - access chunk for the first time - # 0.0 - response is 200 but truncated, immediately try again (retry #1) - # 0.0 - response is 200 but truncated, back off for 2 * 0.1 seconds - # 0.2 - retry #2, response is 200 but truncated, back off for 4 * 0.1 seconds - # 0.6 - retry #3 (the final attempt) - server should now be fixed - # 0.6 - success! - chunk_retrieved = self.store.get_chunk(array_name, slices, chunk.dtype) - assert_array_equal(chunk_retrieved, chunk, 'Truncated read not recovered') + # TRUNCATED READS + # + # With the RETRY settings of 3 read retries and backoff factor of 0.1 s + # we get the following timeline (indexed by seconds): + # 0.0 - access chunk for the first time + # 0.0 - response is 200 but truncated, immediately try again (retry #1) + # 0.0 - response is 200 but truncated, back off for 2 * 0.1 seconds + # 0.2 - retry #2, response is 200 but truncated, back off for 4 * 0.1 seconds + # 0.6 - retry #3 (the final attempt) - server should now be fixed + # 0.6 - success! + + # The NPY file has a 128-byte header, followed by the array data itself. + # Check both parts, since they are read somewhat differently (read vs readinto). + @pytest.mark.parametrize('nbytes', [60, 129]) + @pytest.mark.expected_duration(0.0) + def test_truncated_read(self, nbytes): + suggestion = f'please-truncate-read-after-{nbytes}-bytes-for-0.1-seconds' + chunk, slices, array_name = self._put_chunk(suggestion) + self.store.retries = Retry(0) + with pytest.raises(S3ServerGlitch) as excinfo: + self.store.get_chunk(array_name, slices, chunk.dtype) + bytes_left = 128 + chunk.nbytes - nbytes + excinfo.match(fr'IncompleteRead\({nbytes} bytes read, {bytes_left} more expected\)') + @pytest.mark.parametrize('nbytes', [60, 129]) @pytest.mark.expected_duration(0.6) - def test_recover_from_read_truncated_within_npy_array(self): - chunk, slices, array_name = self._put_chunk( - 'please-truncate-read-after-129-bytes-for-0.4-seconds') + def test_recover_from_truncated_read(self, nbytes): + suggestion = f'please-truncate-read-after-{nbytes}-bytes-for-0.4-seconds' + chunk, slices, array_name = self._put_chunk(suggestion) chunk_retrieved = self.store.get_chunk(array_name, slices, chunk.dtype) - assert_array_equal(chunk_retrieved, chunk, 'Truncated read not recovered') + assert_array_equal(chunk_retrieved, chunk, 'Bad chunk after truncated read') @pytest.mark.expected_duration(0.6) def test_persistent_truncated_reads(self): - chunk, slices, array_name = self._put_chunk( - 'please-truncate-read-after-60-bytes-for-0.8-seconds') + suggestion = 'please-truncate-read-after-60-bytes-for-0.8-seconds' + chunk, slices, array_name = self._put_chunk(suggestion) # After 0.6 seconds the client gives up with pytest.raises(ChunkNotFound): self.store.get_chunk(array_name, slices, chunk.dtype) - @pytest.mark.expected_duration(READ_PAUSE) - def test_handle_read_paused_within_npy_header(self): - chunk, slices, array_name = self._put_chunk('please-pause-read-after-60-bytes') + def test_rdb_support_recover_from_truncated_reads(self): + super().test_rdb_support('please-truncate-read-after-1000-bytes-for-0.4-seconds') + + def test_rdb_support_persistent_truncated_reads(self): + with pytest.raises(DataSourceNotFound) as excinfo: + super().test_rdb_support('please-truncate-read-after-1000-bytes-for-0.8-seconds') + excinfo.match(r'IncompleteRead\(1000 bytes read') + + @pytest.mark.expected_duration(0.6) + def test_recover_from_reset_connections(self): + suggestion = 'please-reset-read-after-129-bytes-for-0.4-seconds' + chunk, slices, array_name = self._put_chunk(suggestion) chunk_retrieved = self.store.get_chunk(array_name, slices, chunk.dtype) - assert_array_equal(chunk_retrieved, chunk, 'Paused read failed') + assert_array_equal(chunk_retrieved, chunk, 'Bad chunk after reset connection') + + @pytest.mark.expected_duration(0.6) + def test_persistent_reset_connections(self): + suggestion = 'please-reset-read-after-129-bytes-for-0.8-seconds' + chunk, slices, array_name = self._put_chunk(suggestion) + with pytest.raises(ChunkNotFound) as excinfo: + self.store.get_chunk(array_name, slices, chunk.dtype) + assert isinstance(excinfo.value, S3ServerGlitch) + @pytest.mark.expected_duration(0.6) + def test_persistent_early_reset_connections(self): + suggestion = 'please-reset-connection-for-0.8-seconds' + chunk, slices, array_name = self._put_chunk(suggestion) + with pytest.raises(StoreUnavailable): + self.store.get_chunk(array_name, slices, chunk.dtype) + + @pytest.mark.parametrize('nbytes', [60, 129]) # check both NPY header and array itself @pytest.mark.expected_duration(READ_PAUSE) - def test_handle_read_paused_within_npy_array(self): - chunk, slices, array_name = self._put_chunk('please-pause-read-after-129-bytes') + def test_handle_paused_read(self, nbytes): + suggestion = f'please-pause-read-after-{nbytes}-bytes' + chunk, slices, array_name = self._put_chunk(suggestion) + chunk_retrieved = self.store.get_chunk(array_name, slices, chunk.dtype) + assert_array_equal(chunk_retrieved, chunk, 'Bad chunk after paused read') + + # SOCKET TIMEOUTS + # + # With a read timeout of 0.09 seconds and the RETRY settings of 3 read retries and + # backoff factor of 0.1 s, we get the following timeline (indexed by seconds): + # 0.00 - access chunk for the first time + # 0.09 - response is 200 but socket times out after 0.09 seconds, immediately try again + # 0.18 - retry #1, response is 200 but socket stalls, back off for 2 * 0.1 seconds + # 0.47 - retry #2, response is 200 but socket stalls, back off for 4 * 0.1 seconds + # 0.87 - retry #3 (the final attempt) - server should now be fixed + # 0.87 - success! + + @pytest.mark.parametrize('nbytes', [60, 129]) # check both NPY header and array itself + @pytest.mark.expected_duration(0.87) + def test_recover_from_socket_timeout(self, nbytes): + suggestion = f'please-pause-read-after-{nbytes}-bytes-for-0.8-seconds' + chunk, slices, array_name = self._put_chunk(suggestion) + self.store.timeout = (5.0, 0.09) chunk_retrieved = self.store.get_chunk(array_name, slices, chunk.dtype) - assert_array_equal(chunk_retrieved, chunk, 'Paused read failed') + assert_array_equal(chunk_retrieved, chunk, 'Bad chunk after socket timeout') + + @pytest.mark.expected_duration(0.96) + def test_persistent_socket_timeouts(self): + suggestion = 'please-pause-read-after-129-bytes-for-1.0-seconds' + chunk, slices, array_name = self._put_chunk(suggestion) + self.store.timeout = (5.0, 0.09) + # The final retry starts at 0.87 seconds and the client gives up 0.09 seconds later + with pytest.raises(ChunkNotFound): + self.store.get_chunk(array_name, slices, chunk.dtype) + + # XXX Check whether the 'pause' condition (socket timeouts) can be tested + # with _read_object, which preloads the content due to stream=False. + @pytest.mark.parametrize('condition', ['truncate', 'reset']) + @pytest.mark.expected_duration(0.6) + def test_non_streaming_request_recovery_from_glitches(self, condition): + data = b'x' * 1000 + cbid = PREFIX + path = self.store.join(cbid, f'test_recovery_from_{condition}.bin') + url = urllib.parse.urljoin(self.store_url, path) + self.store.create_array(cbid) + self.store.request('PUT', url, data=data) + suggestion = f'please-{condition}-read-after-400-bytes-for-0.52-seconds' + url = urllib.parse.urljoin(self.store_url, self.store.join(suggestion, path)) + retrieved_data = self.store.request('GET', url, process=_read_object) + assert retrieved_data == data