Skip to content

Commit

Permalink
feat: support retriable stall with controllable delay (#666)
Browse files Browse the repository at this point in the history
* Quick and dirty changes

* review comments

* Fixing minor comments

* Formatting changes

* minor formatting changes

* minor formatting change
  • Loading branch information
raj-prince authored Aug 20, 2024
1 parent 98343f2 commit c4a53b4
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,4 @@ curl -H "x-retry-test-id: 1d05c20627844214a9ff7cbcf696317d" "http://localhost:91
| return-broken-stream | [HTTP] Testbench will fail after a few downloaded bytes <br> [GRPC] Testbench will fail with `UNAVAILABLE` after a few downloaded bytes
| return-broken-stream-after-YK | [HTTP] Testbench will fail after YKiB of downloaded data <br> [GRPC] Testbench will fail with `UNAVAILABLE` after YKiB of downloaded data
| return-reset-connection | [HTTP] Testbench will fail with a reset connection <br> [GRPC] Testbench will fail the RPC with `UNAVAILABLE`
| stall-for-Ts-after-YK | [HTTP] Testbench will stall for T second after reading YKiB of downloaded data, e.g. stall-for-10s-after-12K stalls after reading 12KiB of data <br> [GRPC] Not supported
38 changes: 38 additions & 0 deletions testbench/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import re
import socket
import struct
import time
import types
from functools import wraps

Expand All @@ -46,6 +47,8 @@
retry_return_broken_stream_after_bytes = re.compile(
r"return-broken-stream-after-([0-9]+)K$"
)
retry_stall_after_bytes = re.compile(r"stall-for-([0-9]+)s-after-([0-9]+)K$")

content_range_split = re.compile(r"bytes (\*|[0-9]+-[0-9]+|[0-9]+-\*)\/(\*|[0-9]+)")

# === STR === #
Expand Down Expand Up @@ -718,6 +721,29 @@ def streamer():
return response_handler


def __get_stream_and_stall_fn(
database, method, test_id, limit=4, stall_time_sec=10, chunk_size=4
):
def response_handler(data):
def streamer():
d = _extract_data(data)
bytes_yield = 0
instruction_dequed = False
for r in range(0, len(d), chunk_size):
if bytes_yield >= limit and not instruction_dequed:
time.sleep(stall_time_sec)
database.dequeue_next_instruction(test_id, method)
instruction_dequed = True
chunk_end = min(r + chunk_size, len(d))
chunk_downloaded = chunk_end - r
bytes_yield += chunk_downloaded
yield d[r:chunk_end]

return flask.Response(streamer(), headers=_extract_headers(data))

return response_handler


def __get_default_response_fn(data):
return data

Expand Down Expand Up @@ -818,6 +844,18 @@ def handle_retry_test_instruction(database, request, socket_closer, method):
broken_stream_after_bytes = (
testbench.common.retry_return_broken_stream_after_bytes.match(next_instruction)
)

retry_stall_after_bytes_matches = testbench.common.retry_stall_after_bytes.match(
next_instruction
)
if retry_stall_after_bytes_matches:
items = list(retry_stall_after_bytes_matches.groups())
stall_time = int(items[0])
after_bytes = int(items[1]) * 1024
return __get_stream_and_stall_fn(
database, method, test_id, limit=after_bytes, stall_time_sec=stall_time
)

if broken_stream_after_bytes and method == "storage.objects.get":
items = list(broken_stream_after_bytes.groups())
after_bytes = int(items[0]) * 1024
Expand Down
1 change: 1 addition & 0 deletions testbench/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ def __validate_injected_failure_description(self, failure):
testbench.common.retry_return_error_after_bytes,
testbench.common.retry_return_short_response,
testbench.common.retry_return_broken_stream_after_bytes,
testbench.common.retry_stall_after_bytes,
]:
if expr.match(failure) is not None:
return
Expand Down
91 changes: 91 additions & 0 deletions tests/test_testbench_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import json
import os
import re
import time
import unittest
import unittest.mock

Expand Down Expand Up @@ -439,6 +440,96 @@ def test_retry_test_return_broken_stream_after_bytes(self):
_ = len(response.data)
self.assertIn("broken stream", ex.exception.msg)

def test_list_retry_stall_test(self):
response = self.client.post(
"/retry_test",
data=json.dumps(
{"instructions": {"storage.buckets.list": ["stall-for-1s-after-0K"]}}
),
)
self.assertEqual(response.status_code, 200)
self.assertTrue(
response.headers.get("content-type").startswith("application/json")
)
create_rest = json.loads(response.data)
self.assertIn("id", create_rest)

start_time = time.perf_counter()
list_response = self.client.get(
"/storage/v1/b",
query_string={"project": "test-project-unused"},
headers={"x-retry-test-id": create_rest.get("id")},
)
self.assertEqual(len(list_response.get_data()), 40)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
self.assertGreater(elapsed_time, 1)

def test_read_retry_test_stall_after_bytes(self):
response = self.client.post(
"/storage/v1/b", data=json.dumps({"name": "bucket-name"})
)
self.assertEqual(response.status_code, 200)
# Use the XML API to inject a larger object and smaller object.
media = self._create_block(UPLOAD_QUANTUM)
blob_larger = self.client.put(
"/bucket-name/256k.txt",
content_type="text/plain",
data=media,
)
self.assertEqual(blob_larger.status_code, 200)

media = self._create_block(128)
blob_smaller = self.client.put(
"/bucket-name/128.txt",
content_type="text/plain",
data=media,
)
self.assertEqual(blob_smaller.status_code, 200)

# Setup a stall for reading back the object.
response = self.client.post(
"/retry_test",
data=json.dumps(
{"instructions": {"storage.objects.get": ["stall-for-1s-after-128K"]}}
),
)

self.assertEqual(response.status_code, 200)
self.assertTrue(
response.headers.get("content-type").startswith("application/json")
)
create_rest = json.loads(response.data)
self.assertIn("id", create_rest)
id = create_rest.get("id")

start_time = time.perf_counter()
# The 128-bytes file is too small to trigger the "stall-for-1s-after-128K" fault injection.
response = self.client.get(
"/storage/v1/b/bucket-name/o/128.txt",
query_string={"alt": "media"},
headers={"x-retry-test-id": id},
)
self.assertEqual(response.status_code, 200, msg=response.data)
self.assertEqual(len(response.get_data()), 128)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
# This will take less the injected delay (1s).
self.assertLess(elapsed_time, 1)

start_time = time.perf_counter()
# The 256KiB file triggers the "stall-for-1s-after-128K" and will
# take more than injected delay (1s).
response = self.client.get(
"/storage/v1/b/bucket-name/o/256k.txt",
query_string={"alt": "media"},
headers={"x-retry-test-id": id},
)
self.assertEqual(len(response.get_data()), UPLOAD_QUANTUM)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
self.assertGreater(elapsed_time, 1)

def test_retry_test_return_error_after_bytes(self):
response = self.client.post(
"/storage/v1/b", data=json.dumps({"name": "bucket-name"})
Expand Down

0 comments on commit c4a53b4

Please sign in to comment.