diff --git a/README.md b/README.md index 019cbd88..b718e9ec 100644 --- a/README.md +++ b/README.md @@ -249,3 +249,4 @@ curl -H "x-retry-test-id: 1d05c20627844214a9ff7cbcf696317d" "http://localhost:91 | return-broken-stream | [HTTP] Testbench will fail after a few downloaded bytes
[GRPC] Testbench will fail with `UNAVAILABLE` after a few downloaded bytes | return-broken-stream-after-YK | [HTTP] Testbench will fail after YKiB of downloaded data
[GRPC] Testbench will fail with `UNAVAILABLE` after YKiB of downloaded data | return-reset-connection | [HTTP] Testbench will fail with a reset connection
[GRPC] Testbench will fail the RPC with `UNAVAILABLE` +| stall-for-Ts-after-YK | [HTTP] Testbench will stall for T second after reading YKiB of downloaded data, e.g. stall-for-10s-after-12K stalls after reading 12KiB of data
[GRPC] Not supported \ No newline at end of file diff --git a/testbench/common.py b/testbench/common.py index b19dfa57..cc080479 100644 --- a/testbench/common.py +++ b/testbench/common.py @@ -23,6 +23,7 @@ import re import socket import struct +import time import types from functools import wraps @@ -46,6 +47,8 @@ retry_return_broken_stream_after_bytes = re.compile( r"return-broken-stream-after-([0-9]+)K$" ) +retry_stall_after_bytes = re.compile(r"stall-for-([0-9]+)s-after-([0-9]+)K$") + content_range_split = re.compile(r"bytes (\*|[0-9]+-[0-9]+|[0-9]+-\*)\/(\*|[0-9]+)") # === STR === # @@ -718,6 +721,29 @@ def streamer(): return response_handler +def __get_stream_and_stall_fn( + database, method, test_id, limit=4, stall_time_sec=10, chunk_size=4 +): + def response_handler(data): + def streamer(): + d = _extract_data(data) + bytes_yield = 0 + instruction_dequed = False + for r in range(0, len(d), chunk_size): + if bytes_yield >= limit and not instruction_dequed: + time.sleep(stall_time_sec) + database.dequeue_next_instruction(test_id, method) + instruction_dequed = True + chunk_end = min(r + chunk_size, len(d)) + chunk_downloaded = chunk_end - r + bytes_yield += chunk_downloaded + yield d[r:chunk_end] + + return flask.Response(streamer(), headers=_extract_headers(data)) + + return response_handler + + def __get_default_response_fn(data): return data @@ -818,6 +844,18 @@ def handle_retry_test_instruction(database, request, socket_closer, method): broken_stream_after_bytes = ( testbench.common.retry_return_broken_stream_after_bytes.match(next_instruction) ) + + retry_stall_after_bytes_matches = testbench.common.retry_stall_after_bytes.match( + next_instruction + ) + if retry_stall_after_bytes_matches: + items = list(retry_stall_after_bytes_matches.groups()) + stall_time = int(items[0]) + after_bytes = int(items[1]) * 1024 + return __get_stream_and_stall_fn( + database, method, test_id, limit=after_bytes, stall_time_sec=stall_time + ) + if broken_stream_after_bytes and method == "storage.objects.get": items = list(broken_stream_after_bytes.groups()) after_bytes = int(items[0]) * 1024 diff --git a/testbench/database.py b/testbench/database.py index 368e0692..f17260dd 100644 --- a/testbench/database.py +++ b/testbench/database.py @@ -438,6 +438,7 @@ def __validate_injected_failure_description(self, failure): testbench.common.retry_return_error_after_bytes, testbench.common.retry_return_short_response, testbench.common.retry_return_broken_stream_after_bytes, + testbench.common.retry_stall_after_bytes, ]: if expr.match(failure) is not None: return diff --git a/tests/test_testbench_retry.py b/tests/test_testbench_retry.py index 28dcbb84..a5b5f8a1 100644 --- a/tests/test_testbench_retry.py +++ b/tests/test_testbench_retry.py @@ -19,6 +19,7 @@ import json import os import re +import time import unittest import unittest.mock @@ -439,6 +440,96 @@ def test_retry_test_return_broken_stream_after_bytes(self): _ = len(response.data) self.assertIn("broken stream", ex.exception.msg) + def test_list_retry_stall_test(self): + response = self.client.post( + "/retry_test", + data=json.dumps( + {"instructions": {"storage.buckets.list": ["stall-for-1s-after-0K"]}} + ), + ) + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + + start_time = time.perf_counter() + list_response = self.client.get( + "/storage/v1/b", + query_string={"project": "test-project-unused"}, + headers={"x-retry-test-id": create_rest.get("id")}, + ) + self.assertEqual(len(list_response.get_data()), 40) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + self.assertGreater(elapsed_time, 1) + + def test_read_retry_test_stall_after_bytes(self): + response = self.client.post( + "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) + ) + self.assertEqual(response.status_code, 200) + # Use the XML API to inject a larger object and smaller object. + media = self._create_block(UPLOAD_QUANTUM) + blob_larger = self.client.put( + "/bucket-name/256k.txt", + content_type="text/plain", + data=media, + ) + self.assertEqual(blob_larger.status_code, 200) + + media = self._create_block(128) + blob_smaller = self.client.put( + "/bucket-name/128.txt", + content_type="text/plain", + data=media, + ) + self.assertEqual(blob_smaller.status_code, 200) + + # Setup a stall for reading back the object. + response = self.client.post( + "/retry_test", + data=json.dumps( + {"instructions": {"storage.objects.get": ["stall-for-1s-after-128K"]}} + ), + ) + + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + id = create_rest.get("id") + + start_time = time.perf_counter() + # The 128-bytes file is too small to trigger the "stall-for-1s-after-128K" fault injection. + response = self.client.get( + "/storage/v1/b/bucket-name/o/128.txt", + query_string={"alt": "media"}, + headers={"x-retry-test-id": id}, + ) + self.assertEqual(response.status_code, 200, msg=response.data) + self.assertEqual(len(response.get_data()), 128) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + # This will take less the injected delay (1s). + self.assertLess(elapsed_time, 1) + + start_time = time.perf_counter() + # The 256KiB file triggers the "stall-for-1s-after-128K" and will + # take more than injected delay (1s). + response = self.client.get( + "/storage/v1/b/bucket-name/o/256k.txt", + query_string={"alt": "media"}, + headers={"x-retry-test-id": id}, + ) + self.assertEqual(len(response.get_data()), UPLOAD_QUANTUM) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + self.assertGreater(elapsed_time, 1) + def test_retry_test_return_error_after_bytes(self): response = self.client.post( "/storage/v1/b", data=json.dumps({"name": "bucket-name"})