Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support retriable stall with controllable delay #666

Merged
merged 6 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,4 @@ curl -H "x-retry-test-id: 1d05c20627844214a9ff7cbcf696317d" "http://localhost:91
| return-broken-stream | [HTTP] Testbench will fail after a few downloaded bytes <br> [GRPC] Testbench will fail with `UNAVAILABLE` after a few downloaded bytes
| return-broken-stream-after-YK | [HTTP] Testbench will fail after YKiB of downloaded data <br> [GRPC] Testbench will fail with `UNAVAILABLE` after YKiB of downloaded data
| return-reset-connection | [HTTP] Testbench will fail with a reset connection <br> [GRPC] Testbench will fail the RPC with `UNAVAILABLE`
| stall-for-Ts-after-YK | [HTTP] Testbench will stall for T second after reading YKiB of downloaded data, e.g. stall-for-10s-after-12K stalls after reading 12KiB of data <br> [GRPC] Not supported
41 changes: 41 additions & 0 deletions testbench/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import re
import socket
import struct
import time
import types
from functools import wraps

Expand All @@ -46,6 +47,10 @@
retry_return_broken_stream_after_bytes = re.compile(
r"return-broken-stream-after-([0-9]+)K$"
)
retry_stall_after_bytes = re.compile(
r"stall-for-([0-9]+)s-after-([0-9]+)K$"
)

content_range_split = re.compile(r"bytes (\*|[0-9]+-[0-9]+|[0-9]+-\*)\/(\*|[0-9]+)")

# === STR === #
Expand Down Expand Up @@ -718,6 +723,29 @@ def streamer():
return response_handler


def __get_stream_and_stall_fn(
database, method, test_id, limit=4, stall_time_sec=10, chunk_size=4
):
def response_handler(data):
def streamer():
d = _extract_data(data)
bytes_yield = 0
instruction_dequed = False
for r in range(0, len(d), chunk_size):
if bytes_yield >= limit and not instruction_dequed:
time.sleep(stall_time_sec)
database.dequeue_next_instruction(test_id, method)
instruction_dequed = True
chunk_end = min(r + chunk_size, len(d))
chunk_downloaded = chunk_end - r
bytes_yield += chunk_downloaded
yield d[r:chunk_end]

return flask.Response(streamer(), headers=_extract_headers(data))

return response_handler


def __get_default_response_fn(data):
return data

Expand Down Expand Up @@ -777,6 +805,7 @@ def handle_retry_test_instruction(database, request, socket_closer, method):
test_id, method, transport="HTTP"
):
return __get_default_response_fn

raj-prince marked this conversation as resolved.
Show resolved Hide resolved
next_instruction = database.peek_next_instruction(test_id, method)
error_code_matches = testbench.common.retry_return_error_code.match(
next_instruction
Expand Down Expand Up @@ -818,6 +847,18 @@ def handle_retry_test_instruction(database, request, socket_closer, method):
broken_stream_after_bytes = (
testbench.common.retry_return_broken_stream_after_bytes.match(next_instruction)
)

retry_stall_after_bytes_matches = testbench.common.retry_stall_after_bytes.match(
next_instruction
)
if retry_stall_after_bytes_matches:
items = list(retry_stall_after_bytes_matches.groups())
stall_time = int(items[0])
after_bytes = int(items[1]) * 1024
return __get_stream_and_stall_fn(
database, method, test_id, limit=after_bytes, stall_time_sec=stall_time
)

if broken_stream_after_bytes and method == "storage.objects.get":
items = list(broken_stream_after_bytes.groups())
after_bytes = int(items[0]) * 1024
Expand Down
1 change: 1 addition & 0 deletions testbench/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ def __validate_injected_failure_description(self, failure):
testbench.common.retry_return_error_after_bytes,
testbench.common.retry_return_short_response,
testbench.common.retry_return_broken_stream_after_bytes,
testbench.common.retry_stall_after_bytes,
]:
if expr.match(failure) is not None:
return
Expand Down
99 changes: 99 additions & 0 deletions tests/test_testbench_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import re
import unittest
import unittest.mock
import time

import crc32c
from grpc import StatusCode
Expand Down Expand Up @@ -439,6 +440,104 @@ def test_retry_test_return_broken_stream_after_bytes(self):
_ = len(response.data)
self.assertIn("broken stream", ex.exception.msg)

def test_list_retry_stall_test(self):
response = self.client.post(
"/retry_test",
data=json.dumps({
"instructions": {
"storage.buckets.list": ["stall-for-1s-after-0K"]
}
}),
)
self.assertEqual(response.status_code, 200)
self.assertTrue(
response.headers.get("content-type").startswith("application/json")
)
create_rest = json.loads(response.data)
self.assertIn("id", create_rest)

start_time = time.perf_counter()
list_response = self.client.get(
"/storage/v1/b",
query_string={"project": "test-project-unused"},
headers={"x-retry-test-id": create_rest.get("id")},
)
self.assertEqual(len(list_response.get_data()), 40)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
self.assertGreater(elapsed_time, 1)

raj-prince marked this conversation as resolved.
Show resolved Hide resolved

def test_read_retry_test_stall_after_bytes(self):
response = self.client.post(
"/storage/v1/b", data=json.dumps({"name": "bucket-name"})
)
self.assertEqual(response.status_code, 200)
# Use the XML API to inject a larger object and smaller object.
media = self._create_block(UPLOAD_QUANTUM)
blob_larger = self.client.put(
"/bucket-name/256k.txt",
content_type="text/plain",
data=media,
)
self.assertEqual(blob_larger.status_code, 200)

media = self._create_block(128)
blob_smaller = self.client.put(
"/bucket-name/128.txt",
content_type="text/plain",
data=media,
)
self.assertEqual(blob_smaller.status_code, 200)

# Setup a stall for reading back the object.
response = self.client.post(
"/retry_test",
data=json.dumps(
{
"instructions": {
"storage.objects.get": ["stall-for-1s-after-128K"]
}
}
),
)

self.assertEqual(response.status_code, 200)
self.assertTrue(
response.headers.get("content-type").startswith("application/json")
)
create_rest = json.loads(response.data)
self.assertIn("id", create_rest)
id = create_rest.get("id")

start_time = time.perf_counter()
# The 128-bytes file is too small to trigger the "stall-for-1s-after-128K" fault injection.
response = self.client.get(
"/storage/v1/b/bucket-name/o/128.txt",
query_string={"alt": "media"},
headers={"x-retry-test-id": id},
)
self.assertEqual(response.status_code, 200, msg=response.data)
self.assertEqual(len(response.get_data()), 128)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
# This will take less the injected delay (1s).
self.assertLess(elapsed_time, 1)

start_time = time.perf_counter()
# The 256KiB file triggers the "stall-for-1s-after-128K" and will
# take more than injected delay (1s).
response = self.client.get(
"/storage/v1/b/bucket-name/o/256k.txt",
query_string={"alt": "media"},
headers={"x-retry-test-id": id},
)
self.assertEqual(len(response.get_data()), UPLOAD_QUANTUM)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
self.assertGreater(elapsed_time, 1)
self.assertIn("x-goog-generation", response.headers)
raj-prince marked this conversation as resolved.
Show resolved Hide resolved

def test_retry_test_return_error_after_bytes(self):
response = self.client.post(
"/storage/v1/b", data=json.dumps({"name": "bucket-name"})
Expand Down
Loading