diff --git a/README.md b/README.md
index 019cbd88..b718e9ec 100644
--- a/README.md
+++ b/README.md
@@ -249,3 +249,4 @@ curl -H "x-retry-test-id: 1d05c20627844214a9ff7cbcf696317d" "http://localhost:91
| return-broken-stream | [HTTP] Testbench will fail after a few downloaded bytes
[GRPC] Testbench will fail with `UNAVAILABLE` after a few downloaded bytes
| return-broken-stream-after-YK | [HTTP] Testbench will fail after YKiB of downloaded data
[GRPC] Testbench will fail with `UNAVAILABLE` after YKiB of downloaded data
| return-reset-connection | [HTTP] Testbench will fail with a reset connection
[GRPC] Testbench will fail the RPC with `UNAVAILABLE`
+| stall-for-Ts-after-YK | [HTTP] Testbench will stall for T second after reading YKiB of downloaded data, e.g. stall-for-10s-after-12K stalls after reading 12KiB of data
[GRPC] Not supported
\ No newline at end of file
diff --git a/testbench/common.py b/testbench/common.py
index b19dfa57..cc080479 100644
--- a/testbench/common.py
+++ b/testbench/common.py
@@ -23,6 +23,7 @@
import re
import socket
import struct
+import time
import types
from functools import wraps
@@ -46,6 +47,8 @@
retry_return_broken_stream_after_bytes = re.compile(
r"return-broken-stream-after-([0-9]+)K$"
)
+retry_stall_after_bytes = re.compile(r"stall-for-([0-9]+)s-after-([0-9]+)K$")
+
content_range_split = re.compile(r"bytes (\*|[0-9]+-[0-9]+|[0-9]+-\*)\/(\*|[0-9]+)")
# === STR === #
@@ -718,6 +721,29 @@ def streamer():
return response_handler
+def __get_stream_and_stall_fn(
+ database, method, test_id, limit=4, stall_time_sec=10, chunk_size=4
+):
+ def response_handler(data):
+ def streamer():
+ d = _extract_data(data)
+ bytes_yield = 0
+ instruction_dequed = False
+ for r in range(0, len(d), chunk_size):
+ if bytes_yield >= limit and not instruction_dequed:
+ time.sleep(stall_time_sec)
+ database.dequeue_next_instruction(test_id, method)
+ instruction_dequed = True
+ chunk_end = min(r + chunk_size, len(d))
+ chunk_downloaded = chunk_end - r
+ bytes_yield += chunk_downloaded
+ yield d[r:chunk_end]
+
+ return flask.Response(streamer(), headers=_extract_headers(data))
+
+ return response_handler
+
+
def __get_default_response_fn(data):
return data
@@ -818,6 +844,18 @@ def handle_retry_test_instruction(database, request, socket_closer, method):
broken_stream_after_bytes = (
testbench.common.retry_return_broken_stream_after_bytes.match(next_instruction)
)
+
+ retry_stall_after_bytes_matches = testbench.common.retry_stall_after_bytes.match(
+ next_instruction
+ )
+ if retry_stall_after_bytes_matches:
+ items = list(retry_stall_after_bytes_matches.groups())
+ stall_time = int(items[0])
+ after_bytes = int(items[1]) * 1024
+ return __get_stream_and_stall_fn(
+ database, method, test_id, limit=after_bytes, stall_time_sec=stall_time
+ )
+
if broken_stream_after_bytes and method == "storage.objects.get":
items = list(broken_stream_after_bytes.groups())
after_bytes = int(items[0]) * 1024
diff --git a/testbench/database.py b/testbench/database.py
index 368e0692..f17260dd 100644
--- a/testbench/database.py
+++ b/testbench/database.py
@@ -438,6 +438,7 @@ def __validate_injected_failure_description(self, failure):
testbench.common.retry_return_error_after_bytes,
testbench.common.retry_return_short_response,
testbench.common.retry_return_broken_stream_after_bytes,
+ testbench.common.retry_stall_after_bytes,
]:
if expr.match(failure) is not None:
return
diff --git a/tests/test_testbench_retry.py b/tests/test_testbench_retry.py
index 28dcbb84..a5b5f8a1 100644
--- a/tests/test_testbench_retry.py
+++ b/tests/test_testbench_retry.py
@@ -19,6 +19,7 @@
import json
import os
import re
+import time
import unittest
import unittest.mock
@@ -439,6 +440,96 @@ def test_retry_test_return_broken_stream_after_bytes(self):
_ = len(response.data)
self.assertIn("broken stream", ex.exception.msg)
+ def test_list_retry_stall_test(self):
+ response = self.client.post(
+ "/retry_test",
+ data=json.dumps(
+ {"instructions": {"storage.buckets.list": ["stall-for-1s-after-0K"]}}
+ ),
+ )
+ self.assertEqual(response.status_code, 200)
+ self.assertTrue(
+ response.headers.get("content-type").startswith("application/json")
+ )
+ create_rest = json.loads(response.data)
+ self.assertIn("id", create_rest)
+
+ start_time = time.perf_counter()
+ list_response = self.client.get(
+ "/storage/v1/b",
+ query_string={"project": "test-project-unused"},
+ headers={"x-retry-test-id": create_rest.get("id")},
+ )
+ self.assertEqual(len(list_response.get_data()), 40)
+ end_time = time.perf_counter()
+ elapsed_time = end_time - start_time
+ self.assertGreater(elapsed_time, 1)
+
+ def test_read_retry_test_stall_after_bytes(self):
+ response = self.client.post(
+ "/storage/v1/b", data=json.dumps({"name": "bucket-name"})
+ )
+ self.assertEqual(response.status_code, 200)
+ # Use the XML API to inject a larger object and smaller object.
+ media = self._create_block(UPLOAD_QUANTUM)
+ blob_larger = self.client.put(
+ "/bucket-name/256k.txt",
+ content_type="text/plain",
+ data=media,
+ )
+ self.assertEqual(blob_larger.status_code, 200)
+
+ media = self._create_block(128)
+ blob_smaller = self.client.put(
+ "/bucket-name/128.txt",
+ content_type="text/plain",
+ data=media,
+ )
+ self.assertEqual(blob_smaller.status_code, 200)
+
+ # Setup a stall for reading back the object.
+ response = self.client.post(
+ "/retry_test",
+ data=json.dumps(
+ {"instructions": {"storage.objects.get": ["stall-for-1s-after-128K"]}}
+ ),
+ )
+
+ self.assertEqual(response.status_code, 200)
+ self.assertTrue(
+ response.headers.get("content-type").startswith("application/json")
+ )
+ create_rest = json.loads(response.data)
+ self.assertIn("id", create_rest)
+ id = create_rest.get("id")
+
+ start_time = time.perf_counter()
+ # The 128-bytes file is too small to trigger the "stall-for-1s-after-128K" fault injection.
+ response = self.client.get(
+ "/storage/v1/b/bucket-name/o/128.txt",
+ query_string={"alt": "media"},
+ headers={"x-retry-test-id": id},
+ )
+ self.assertEqual(response.status_code, 200, msg=response.data)
+ self.assertEqual(len(response.get_data()), 128)
+ end_time = time.perf_counter()
+ elapsed_time = end_time - start_time
+ # This will take less the injected delay (1s).
+ self.assertLess(elapsed_time, 1)
+
+ start_time = time.perf_counter()
+ # The 256KiB file triggers the "stall-for-1s-after-128K" and will
+ # take more than injected delay (1s).
+ response = self.client.get(
+ "/storage/v1/b/bucket-name/o/256k.txt",
+ query_string={"alt": "media"},
+ headers={"x-retry-test-id": id},
+ )
+ self.assertEqual(len(response.get_data()), UPLOAD_QUANTUM)
+ end_time = time.perf_counter()
+ elapsed_time = end_time - start_time
+ self.assertGreater(elapsed_time, 1)
+
def test_retry_test_return_error_after_bytes(self):
response = self.client.post(
"/storage/v1/b", data=json.dumps({"name": "bucket-name"})