diff --git a/README.md b/README.md
index f498466a..9e569681 100644
--- a/README.md
+++ b/README.md
@@ -252,7 +252,7 @@ curl -H "x-retry-test-id: 1d05c20627844214a9ff7cbcf696317d" "http://localhost:91
| return-broken-stream | [HTTP] Testbench will fail after a few downloaded bytes
[GRPC] Testbench will fail with `UNAVAILABLE` after a few downloaded bytes
| return-broken-stream-after-YK | [HTTP] Testbench will fail after YKiB of downloaded data
[GRPC] Testbench will fail with `UNAVAILABLE` after YKiB of downloaded data
| return-reset-connection | [HTTP] Testbench will fail with a reset connection
[GRPC] Testbench will fail the RPC with `UNAVAILABLE`
-| stall-for-Ts-after-YK | [HTTP] Testbench will stall for T second after reading YKiB of downloaded data, e.g. stall-for-10s-after-12K stalls after reading 12KiB of data
[GRPC] Not supported
+| stall-for-Ts-after-YK | [HTTP] Testbench will stall for T second after reading YKiB of downloaded/uploaded data, e.g. stall-for-10s-after-12K stalls after reading/writing 12KiB of data
[GRPC] Not supported
## Releasing the testbench
@@ -264,4 +264,4 @@ Steps:
1. Title "v0.x.x"
1. Click Generate release notes
1. Make sure "Set as the latest release" is checked
-1. Click "Publish Release" to release
+1. Click "Publish Release" to release
\ No newline at end of file
diff --git a/testbench/common.py b/testbench/common.py
index 678a5d82..5f7399ca 100644
--- a/testbench/common.py
+++ b/testbench/common.py
@@ -844,11 +844,17 @@ def handle_retry_test_instruction(database, request, socket_closer, method):
broken_stream_after_bytes = (
testbench.common.retry_return_broken_stream_after_bytes.match(next_instruction)
)
+ if broken_stream_after_bytes and method == "storage.objects.get":
+ items = list(broken_stream_after_bytes.groups())
+ after_bytes = int(items[0]) * 1024
+ return __get_streamer_response_fn(
+ database, method, socket_closer, test_id, limit=after_bytes
+ )
retry_stall_after_bytes_matches = testbench.common.retry_stall_after_bytes.match(
next_instruction
)
- if retry_stall_after_bytes_matches:
+ if retry_stall_after_bytes_matches and method != "storage.objects.insert":
items = list(retry_stall_after_bytes_matches.groups())
stall_time = int(items[0])
after_bytes = int(items[1]) * 1024
@@ -856,12 +862,6 @@ def handle_retry_test_instruction(database, request, socket_closer, method):
database, method, test_id, limit=after_bytes, stall_time_sec=stall_time
)
- if broken_stream_after_bytes and method == "storage.objects.get":
- items = list(broken_stream_after_bytes.groups())
- after_bytes = int(items[0]) * 1024
- return __get_streamer_response_fn(
- database, method, socket_closer, test_id, limit=after_bytes
- )
retry_return_short_response = testbench.common.retry_return_short_response.match(
next_instruction
)
@@ -895,6 +895,30 @@ def wrapper(*args, **kwargs):
return retry_test
+def get_stall_uploads_after_bytes(database, request, context=None, transport="HTTP"):
+ """Retrieve stall time and #bytes corresponding to uploads from retry test instructions."""
+ method = "storage.objects.insert"
+ test_id = request.headers.get("x-retry-test-id", None)
+ if not test_id:
+ return 0, 0, ""
+ next_instruction = None
+ if database.has_instructions_retry_test(test_id, method, transport=transport):
+ next_instruction = database.peek_next_instruction(test_id, method)
+ if not next_instruction:
+ return 0, 0, ""
+
+ stall_after_byte_matches = testbench.common.retry_stall_after_bytes.match(
+ next_instruction
+ )
+ if stall_after_byte_matches:
+ items = list(stall_after_byte_matches.groups())
+ stall_time = int(items[0])
+ after_bytes = int(items[1]) * 1024
+ return stall_time, after_bytes, test_id
+
+ return 0, 0, ""
+
+
def get_retry_uploads_error_after_bytes(
database, request, context=None, transport="HTTP"
):
@@ -919,9 +943,29 @@ def get_retry_uploads_error_after_bytes(
error_code = int(items[0])
after_bytes = int(items[1]) * 1024
return error_code, after_bytes, test_id
+
return 0, 0, ""
+def handle_stall_uploads_after_bytes(
+ upload,
+ data,
+ database,
+ stall_time,
+ after_bytes,
+ test_id=0,
+):
+ """
+ Handle stall-after-bytes instructions for resumable uploads.
+ Stall happens after given value of bytes.
+ e.g. We are uploading 120K of data then, stall-2s-after-100K will stall the request.
+ """
+ if len(upload.media) <= after_bytes and len(upload.media) + len(data) > after_bytes:
+ if test_id:
+ database.dequeue_next_instruction(test_id, "storage.objects.insert")
+ time.sleep(stall_time)
+
+
def handle_retry_uploads_error_after_bytes(
upload,
data,
diff --git a/testbench/rest_server.py b/testbench/rest_server.py
index 96308406..72a0ae88 100644
--- a/testbench/rest_server.py
+++ b/testbench/rest_server.py
@@ -16,6 +16,7 @@
import datetime
import json
import logging
+import time
import flask
from google.protobuf import json_format
@@ -978,6 +979,18 @@ def object_insert(bucket_name):
blob, projection = gcs_type.object.Object.init_media(flask.request, bucket)
elif upload_type == "multipart":
blob, projection = gcs_type.object.Object.init_multipart(flask.request, bucket)
+ # Handle stall for full uploads.
+ testbench.common.extract_instruction(request, context=None)
+ (
+ stall_time,
+ after_bytes,
+ test_id,
+ ) = testbench.common.get_stall_uploads_after_bytes(db, request)
+ if stall_time and len(blob.media) >= after_bytes:
+ if test_id:
+ db.dequeue_next_instruction(test_id, "storage.objects.insert")
+ time.sleep(stall_time)
+
db.insert_object(
bucket_name,
blob,
@@ -1104,6 +1117,23 @@ def resumable_upload_chunk(bucket_name):
chunk_last_byte,
test_id,
)
+
+ testbench.common.extract_instruction(request, context=None)
+ (
+ stall_time,
+ after_bytes,
+ test_id,
+ ) = testbench.common.get_stall_uploads_after_bytes(db, request)
+
+ if stall_time:
+ testbench.common.handle_stall_uploads_after_bytes(
+ upload,
+ data,
+ db,
+ stall_time,
+ after_bytes,
+ test_id,
+ )
# The testbench should ignore any request bytes that have already been persisted,
# to be aligned with GCS behavior (https://cloud.google.com/storage/docs/resumable-uploads#resent-data).
# Thus we validate chunk_first_byte against last_byte_persisted.
diff --git a/tests/test_testbench_retry.py b/tests/test_testbench_retry.py
index a5b5f8a1..0030993e 100644
--- a/tests/test_testbench_retry.py
+++ b/tests/test_testbench_retry.py
@@ -30,6 +30,7 @@
import testbench
from google.storage.v2 import storage_pb2
from testbench import rest_server
+from tests.format_multipart_upload import format_multipart_upload
UPLOAD_QUANTUM = 256 * 1024
@@ -655,6 +656,230 @@ def test_retry_test_return_error_after_bytes(self):
self.assertIn("size", create_rest)
self.assertEqual(int(create_rest.get("size")), 2 * UPLOAD_QUANTUM)
+ def test_write_retry_test_stall_after_bytes(self):
+ # Create a new bucket
+ response = self.client.post(
+ "/storage/v1/b", data=json.dumps({"name": "bucket-name"})
+ )
+ self.assertEqual(response.status_code, 200)
+
+ # Setup a stall for reading back the object.
+ response = self.client.post(
+ "/retry_test",
+ data=json.dumps(
+ {
+ "instructions": {
+ "storage.objects.insert": [
+ "stall-for-1s-after-250K",
+ "stall-for-1s-after-300K",
+ ]
+ }
+ }
+ ),
+ content_type="application/json",
+ )
+ self.assertEqual(response.status_code, 200)
+ self.assertTrue(
+ response.headers.get("content-type").startswith("application/json")
+ )
+
+ create_rest = json.loads(response.data)
+ self.assertIn("id", create_rest)
+ test_id = create_rest.get("id")
+
+ # Initiate resumable upload
+ response = self.client.post(
+ "/upload/storage/v1/b/bucket-name/o",
+ query_string={"uploadType": "resumable", "name": "stall"},
+ content_type="application/json",
+ )
+ self.assertEqual(response.status_code, 200)
+
+ location = response.headers.get("location")
+ self.assertIn("upload_id=", location)
+ match = re.search(r"[&?]upload_id=([^&]+)", location)
+ self.assertIsNotNone(match, msg=location)
+ upload_id = match.group(1)
+
+ # Upload the first 256KiB chunk of data and trigger the stall.
+ chunk = self._create_block(UPLOAD_QUANTUM)
+ self.assertEqual(len(chunk), UPLOAD_QUANTUM)
+
+ start_time = time.perf_counter()
+ response = self.client.put(
+ f"/upload/storage/v1/b/bucket-name/o",
+ query_string={"upload_id": upload_id},
+ headers={
+ "content-range": "bytes 0-{len:d}/{obj_size:d}".format(
+ len=UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM
+ ),
+ "x-retry-test-id": test_id,
+ },
+ data=chunk,
+ )
+ end_time = time.perf_counter()
+ elapsed_time = end_time - start_time
+ self.assertGreater(elapsed_time, 1)
+ self.assertEqual(response.status_code, 308)
+
+ # Upload the second 256KiB chunk of data and trigger the stall again.
+ start_time = time.perf_counter()
+ chunk = self._create_block(UPLOAD_QUANTUM)
+ self.assertEqual(len(chunk), UPLOAD_QUANTUM)
+ response = self.client.put(
+ "/upload/storage/v1/b/bucket-name/o",
+ query_string={"upload_id": upload_id},
+ headers={
+ "content-range": "bytes 0-{len:d}/{obj_size:d}".format(
+ len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM
+ ),
+ "x-retry-test-id": test_id,
+ },
+ data=chunk,
+ )
+ end_time = time.perf_counter()
+ elapsed_time = end_time - start_time
+ self.assertGreater(elapsed_time, 1)
+ self.assertEqual(response.status_code, 200, msg=response.data)
+
+ # Upload the second 256KiB chunk of data and check that stall not happen
+ start_time = time.perf_counter()
+ chunk = self._create_block(UPLOAD_QUANTUM)
+ self.assertEqual(len(chunk), UPLOAD_QUANTUM)
+ response = self.client.put(
+ "/upload/storage/v1/b/bucket-name/o",
+ query_string={"upload_id": upload_id},
+ headers={
+ "content-range": "bytes 0-{len:d}/{obj_size:d}".format(
+ len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM
+ ),
+ "x-retry-test-id": test_id,
+ },
+ data=chunk,
+ )
+ end_time = time.perf_counter()
+ elapsed_time = end_time - start_time
+ self.assertLess(elapsed_time, 1)
+ self.assertEqual(response.status_code, 200, msg=response.data)
+
+ def test_write_retry_test_stall_single_shot(self):
+ # Create a new bucket
+ response = self.client.post(
+ "/storage/v1/b", data=json.dumps({"name": "bucket-name"})
+ )
+ self.assertEqual(response.status_code, 200)
+
+ # Setup a stall for reading back the object.
+ response = self.client.post(
+ "/retry_test",
+ data=json.dumps(
+ {
+ "instructions": {
+ "storage.objects.insert": [
+ "stall-for-1s-after-250K",
+ ]
+ }
+ }
+ ),
+ content_type="application/json",
+ )
+ self.assertEqual(response.status_code, 200)
+ self.assertTrue(
+ response.headers.get("content-type").startswith("application/json")
+ )
+
+ create_rest = json.loads(response.data)
+ self.assertIn("id", create_rest)
+ test_id = create_rest.get("id")
+
+ # Upload the 256KiB of data and trigger the stall.
+ data = self._create_block(UPLOAD_QUANTUM)
+ self.assertEqual(len(data), UPLOAD_QUANTUM)
+
+ start_time = time.perf_counter()
+ boundary, payload = format_multipart_upload({}, data)
+ response = self.client.post(
+ "/upload/storage/v1/b/bucket-name/o",
+ query_string={"uploadType": "multipart", "name": "stall"},
+ content_type="multipart/related; boundary=" + boundary,
+ headers={
+ "x-retry-test-id": test_id,
+ },
+ data=payload,
+ )
+ end_time = time.perf_counter()
+ elapsed_time = end_time - start_time
+ self.assertEqual(response.status_code, 200)
+ self.assertGreater(elapsed_time, 1)
+
+ # Upload the data again and check that stall not happen.
+ start_time = time.perf_counter()
+ response = self.client.post(
+ "/upload/storage/v1/b/bucket-name/o",
+ query_string={"uploadType": "multipart", "name": "stall"},
+ content_type="multipart/related; boundary=" + boundary,
+ headers={
+ "x-retry-test-id": test_id,
+ },
+ data=payload,
+ )
+ end_time = time.perf_counter()
+ elapsed_time = end_time - start_time
+ self.assertLess(elapsed_time, 1)
+ self.assertEqual(response.status_code, 200)
+
+ def test_write_retry_test_stall_single_shot_while_upload_size_less_than_stall_size(
+ self,
+ ):
+ # Create a new bucket
+ response = self.client.post(
+ "/storage/v1/b", data=json.dumps({"name": "bucket-name"})
+ )
+ self.assertEqual(response.status_code, 200)
+
+ # Setup a stall for reading back the object.
+ response = self.client.post(
+ "/retry_test",
+ data=json.dumps(
+ {
+ "instructions": {
+ "storage.objects.insert": [
+ "stall-for-1s-after-250K",
+ ]
+ }
+ }
+ ),
+ content_type="application/json",
+ )
+ self.assertEqual(response.status_code, 200)
+ self.assertTrue(
+ response.headers.get("content-type").startswith("application/json")
+ )
+
+ create_rest = json.loads(response.data)
+ self.assertIn("id", create_rest)
+ test_id = create_rest.get("id")
+
+ # Upload the 200KiB of data and check stall not happen.
+ data = self._create_block(200 * 1024)
+ self.assertEqual(len(data), 200 * 1024)
+
+ start_time = time.perf_counter()
+ boundary, payload = format_multipart_upload({}, data)
+ response = self.client.post(
+ "/upload/storage/v1/b/bucket-name/o",
+ query_string={"uploadType": "multipart", "name": "stall"},
+ content_type="multipart/related; boundary=" + boundary,
+ headers={
+ "x-retry-test-id": test_id,
+ },
+ data=payload,
+ )
+ end_time = time.perf_counter()
+ elapsed_time = end_time - start_time
+ self.assertEqual(response.status_code, 200)
+ self.assertLess(elapsed_time, 1)
+
class TestTestbenchRetryGrpc(unittest.TestCase):
def setUp(self):