Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add write stall support #684

Merged
merged 45 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
6399b4b
add code for write stall
Tulsishah Oct 10, 2024
5a48a66
fix test
Tulsishah Oct 10, 2024
6462f7b
remove unnecessary files
Tulsishah Oct 10, 2024
107366b
remove unnecessary files
Tulsishah Oct 10, 2024
ebcf119
write test
Tulsishah Oct 10, 2024
fbeaee4
undo test changes to remove unnecessary changes
Tulsishah Oct 11, 2024
5cf72e1
Update test_testbench_retry.py
Tulsishah Oct 11, 2024
fd748d6
add test
Tulsishah Oct 11, 2024
37b683b
remove .idea files
Tulsishah Oct 11, 2024
25ee376
write stall changes
Tulsishah Oct 11, 2024
560571b
remove .idea file
Tulsishah Oct 11, 2024
583d7f7
Update test_testbench_retry.py
Tulsishah Oct 11, 2024
c58848b
test
Tulsishah Oct 11, 2024
b452a45
stall once for identiacal req
Tulsishah Oct 15, 2024
222f89b
add comment
Tulsishah Oct 15, 2024
cef1298
remove .idea file
Tulsishah Oct 15, 2024
346e504
fix unit test
Tulsishah Oct 15, 2024
a09aaa3
fix unit test
Tulsishah Oct 15, 2024
d5212ac
test changes
Tulsishah Oct 16, 2024
3c9ed5d
test changes
Tulsishah Oct 16, 2024
1e43977
review comments
Tulsishah Oct 18, 2024
b611460
remove .idea files
Tulsishah Oct 18, 2024
569a1ec
Merge branch 'main' into main
Tulsishah Oct 18, 2024
2b3408e
lint fixes
Tulsishah Oct 18, 2024
e3b3b74
lint fixes
Tulsishah Oct 18, 2024
e249797
lint fixes
Tulsishah Oct 18, 2024
4739993
lint fixes
Tulsishah Oct 18, 2024
bdce8f9
lint fixes
Tulsishah Oct 18, 2024
ea0806f
code patch fix
Tulsishah Oct 18, 2024
91d80bd
support full uploads
Tulsishah Oct 21, 2024
9a638da
remove unnecessary things
Tulsishah Oct 21, 2024
88b428b
remove unnecessary things
Tulsishah Oct 21, 2024
e588a5f
remove unnecessary things
Tulsishah Oct 21, 2024
4cb1655
adding comment
Tulsishah Oct 21, 2024
0fa865d
lint fix
Tulsishah Oct 21, 2024
97b0cb0
lint fix
Tulsishah Oct 21, 2024
8e195e4
stall should not happen if uploaded less amount of data then stall size
Tulsishah Oct 21, 2024
0d938a7
stall should not happen if uploaded less amount of data then stall size
Tulsishah Oct 21, 2024
0c1eda4
remove last two commit changes
Tulsishah Oct 21, 2024
5f7b7d9
remove env files
Tulsishah Oct 21, 2024
8a6d85e
lint fix
Tulsishah Oct 21, 2024
6f12b11
lint fix
Tulsishah Oct 21, 2024
e952ab7
review comment and adding scenario where upload size is less then sta…
Tulsishah Oct 24, 2024
fb936f5
lint fix
Tulsishah Oct 24, 2024
7b7a2fe
lint fix
Tulsishah Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ curl -H "x-retry-test-id: 1d05c20627844214a9ff7cbcf696317d" "http://localhost:91
| return-broken-stream | [HTTP] Testbench will fail after a few downloaded bytes <br> [GRPC] Testbench will fail with `UNAVAILABLE` after a few downloaded bytes
| return-broken-stream-after-YK | [HTTP] Testbench will fail after YKiB of downloaded data <br> [GRPC] Testbench will fail with `UNAVAILABLE` after YKiB of downloaded data
| return-reset-connection | [HTTP] Testbench will fail with a reset connection <br> [GRPC] Testbench will fail the RPC with `UNAVAILABLE`
| stall-for-Ts-after-YK | [HTTP] Testbench will stall for T second after reading YKiB of downloaded data, e.g. stall-for-10s-after-12K stalls after reading 12KiB of data <br> [GRPC] Not supported
| stall-for-Ts-after-YK | [HTTP] Testbench will stall for T second after reading YKiB of downloaded/uploaded data, e.g. stall-for-10s-after-12K stalls after reading/writing 12KiB of data <br> [GRPC] Not supported

## Releasing the testbench

Expand All @@ -264,4 +264,4 @@ Steps:
1. Title "v0.x.x"
1. Click Generate release notes
1. Make sure "Set as the latest release" is checked
1. Click "Publish Release" to release
1. Click "Publish Release" to release
58 changes: 51 additions & 7 deletions testbench/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,24 +844,24 @@ def handle_retry_test_instruction(database, request, socket_closer, method):
broken_stream_after_bytes = (
testbench.common.retry_return_broken_stream_after_bytes.match(next_instruction)
)
if broken_stream_after_bytes and method == "storage.objects.get":
items = list(broken_stream_after_bytes.groups())
after_bytes = int(items[0]) * 1024
return __get_streamer_response_fn(
database, method, socket_closer, test_id, limit=after_bytes
)

retry_stall_after_bytes_matches = testbench.common.retry_stall_after_bytes.match(
next_instruction
)
if retry_stall_after_bytes_matches:
if retry_stall_after_bytes_matches and method != "storage.objects.insert":
items = list(retry_stall_after_bytes_matches.groups())
stall_time = int(items[0])
after_bytes = int(items[1]) * 1024
return __get_stream_and_stall_fn(
database, method, test_id, limit=after_bytes, stall_time_sec=stall_time
)

if broken_stream_after_bytes and method == "storage.objects.get":
items = list(broken_stream_after_bytes.groups())
after_bytes = int(items[0]) * 1024
return __get_streamer_response_fn(
database, method, socket_closer, test_id, limit=after_bytes
)
Tulsishah marked this conversation as resolved.
Show resolved Hide resolved
retry_return_short_response = testbench.common.retry_return_short_response.match(
next_instruction
)
Expand Down Expand Up @@ -895,6 +895,30 @@ def wrapper(*args, **kwargs):
return retry_test

Tulsishah marked this conversation as resolved.
Show resolved Hide resolved

def get_stall_uploads_after_bytes(database, request, context=None, transport="HTTP"):
"""Retrieve stall time and #bytes corresponding to uploads from retry test instructions."""
method = "storage.objects.insert"
test_id = request.headers.get("x-retry-test-id", None)
if not test_id:
return 0, 0, ""
next_instruction = None
if database.has_instructions_retry_test(test_id, method, transport=transport):
next_instruction = database.peek_next_instruction(test_id, method)
if not next_instruction:
return 0, 0, ""

stall_after_byte_matches = testbench.common.retry_stall_after_bytes.match(
next_instruction
)
if stall_after_byte_matches:
items = list(stall_after_byte_matches.groups())
stall_time = int(items[0])
after_bytes = int(items[1]) * 1024
return stall_time, after_bytes, test_id

return 0, 0, ""


def get_retry_uploads_error_after_bytes(
database, request, context=None, transport="HTTP"
):
Expand All @@ -919,9 +943,29 @@ def get_retry_uploads_error_after_bytes(
error_code = int(items[0])
after_bytes = int(items[1]) * 1024
return error_code, after_bytes, test_id

return 0, 0, ""


def handle_stall_uploads_after_bytes(
upload,
data,
database,
stall_time,
after_bytes,
test_id=0,
):
"""
Handle stall-after-bytes instructions for resumable uploads.
Stall happens after given value of bytes.
e.g. We are uploading 120K of data then, stall-2s-after-100K will stall the request.
"""
if len(upload.media) <= after_bytes and len(upload.media) + len(data) > after_bytes:
if test_id:
database.dequeue_next_instruction(test_id, "storage.objects.insert")
time.sleep(stall_time)


def handle_retry_uploads_error_after_bytes(
upload,
data,
Expand Down
30 changes: 30 additions & 0 deletions testbench/rest_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import datetime
import json
import logging
import time

import flask
from google.protobuf import json_format
Expand Down Expand Up @@ -978,6 +979,18 @@ def object_insert(bucket_name):
blob, projection = gcs_type.object.Object.init_media(flask.request, bucket)
elif upload_type == "multipart":
blob, projection = gcs_type.object.Object.init_multipart(flask.request, bucket)
# Handle stall for full uploads.
testbench.common.extract_instruction(request, context=None)
(
stall_time,
after_bytes,
test_id,
) = testbench.common.get_stall_uploads_after_bytes(db, request)
if stall_time:
if test_id:
db.dequeue_next_instruction(test_id, "storage.objects.insert")
time.sleep(stall_time)

db.insert_object(
bucket_name,
blob,
Expand Down Expand Up @@ -1104,6 +1117,23 @@ def resumable_upload_chunk(bucket_name):
chunk_last_byte,
test_id,
)

Tulsishah marked this conversation as resolved.
Show resolved Hide resolved
testbench.common.extract_instruction(request, context=None)
(
stall_time,
after_bytes,
test_id,
) = testbench.common.get_stall_uploads_after_bytes(db, request)

if stall_time:
testbench.common.handle_stall_uploads_after_bytes(
upload,
data,
db,
stall_time,
after_bytes,
test_id,
)
Tulsishah marked this conversation as resolved.
Show resolved Hide resolved
# The testbench should ignore any request bytes that have already been persisted,
# to be aligned with GCS behavior (https://cloud.google.com/storage/docs/resumable-uploads#resent-data).
# Thus we validate chunk_first_byte against last_byte_persisted.
Expand Down
173 changes: 173 additions & 0 deletions tests/test_testbench_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import testbench
from google.storage.v2 import storage_pb2
from testbench import rest_server
from tests.format_multipart_upload import format_multipart_upload

UPLOAD_QUANTUM = 256 * 1024

Expand Down Expand Up @@ -655,6 +656,178 @@ def test_retry_test_return_error_after_bytes(self):
self.assertIn("size", create_rest)
self.assertEqual(int(create_rest.get("size")), 2 * UPLOAD_QUANTUM)

def test_write_retry_test_stall_after_bytes(self):
Tulsishah marked this conversation as resolved.
Show resolved Hide resolved
# Create a new bucket
response = self.client.post(
"/storage/v1/b", data=json.dumps({"name": "bucket-name"})
)
self.assertEqual(response.status_code, 200)

# Setup a stall for reading back the object.
response = self.client.post(
"/retry_test",
data=json.dumps(
{
"instructions": {
"storage.objects.insert": [
"stall-for-1s-after-250K",
"stall-for-1s-after-300K",
]
}
}
),
content_type="application/json",
)
self.assertEqual(response.status_code, 200)
self.assertTrue(
response.headers.get("content-type").startswith("application/json")
)

create_rest = json.loads(response.data)
self.assertIn("id", create_rest)
test_id = create_rest.get("id")

# Initiate resumable upload
response = self.client.post(
"/upload/storage/v1/b/bucket-name/o",
query_string={"uploadType": "resumable", "name": "stall"},
content_type="application/json",
)
self.assertEqual(response.status_code, 200)

location = response.headers.get("location")
self.assertIn("upload_id=", location)
match = re.search(r"[&?]upload_id=([^&]+)", location)
self.assertIsNotNone(match, msg=location)
upload_id = match.group(1)

# Upload the first 256KiB chunk of data and trigger the stall.
Tulsishah marked this conversation as resolved.
Show resolved Hide resolved
chunk = self._create_block(UPLOAD_QUANTUM)
self.assertEqual(len(chunk), UPLOAD_QUANTUM)

start_time = time.perf_counter()
response = self.client.put(
f"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes 0-{len:d}/{obj_size:d}".format(
len=UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM
),
"x-retry-test-id": test_id,
},
data=chunk,
)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
self.assertGreater(elapsed_time, 1)
self.assertEqual(response.status_code, 308)

# Upload the second 256KiB chunk of data and trigger the stall again.
start_time = time.perf_counter()
chunk = self._create_block(UPLOAD_QUANTUM)
self.assertEqual(len(chunk), UPLOAD_QUANTUM)
response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes 0-{len:d}/{obj_size:d}".format(
len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM
),
"x-retry-test-id": test_id,
},
data=chunk,
)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
self.assertGreater(elapsed_time, 1)
self.assertEqual(response.status_code, 200, msg=response.data)

# Upload the second 256KiB chunk of data and check that stall not happen
Tulsishah marked this conversation as resolved.
Show resolved Hide resolved
start_time = time.perf_counter()
chunk = self._create_block(UPLOAD_QUANTUM)
self.assertEqual(len(chunk), UPLOAD_QUANTUM)
response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes 0-{len:d}/{obj_size:d}".format(
len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM
),
"x-retry-test-id": test_id,
},
data=chunk,
)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
self.assertLess(elapsed_time, 1)
self.assertEqual(response.status_code, 200, msg=response.data)

def test_write_retry_test_stall_for_full_uploads(self):
Tulsishah marked this conversation as resolved.
Show resolved Hide resolved
# Create a new bucket
response = self.client.post(
"/storage/v1/b", data=json.dumps({"name": "bucket-name"})
)
self.assertEqual(response.status_code, 200)

# Setup a stall for reading back the object.
response = self.client.post(
"/retry_test",
data=json.dumps(
{
"instructions": {
"storage.objects.insert": [
"stall-for-1s-after-250K",
]
}
}
),
content_type="application/json",
)
self.assertEqual(response.status_code, 200)
self.assertTrue(
response.headers.get("content-type").startswith("application/json")
)

create_rest = json.loads(response.data)
self.assertIn("id", create_rest)
test_id = create_rest.get("id")

# Upload the 256KiB of data and trigger the stall.
data = self._create_block(UPLOAD_QUANTUM)
self.assertEqual(len(data), UPLOAD_QUANTUM)

start_time = time.perf_counter()
boundary, payload = format_multipart_upload({}, data)
response = self.client.post(
"/upload/storage/v1/b/bucket-name/o",
query_string={"uploadType": "multipart", "name": "stall"},
content_type="multipart/related; boundary=" + boundary,
headers={
"x-retry-test-id": test_id,
},
data=payload,
)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
self.assertEqual(response.status_code, 200)
self.assertGreater(elapsed_time, 1)

# Upload the data again and check that stall not happen.
start_time = time.perf_counter()
response = self.client.post(
"/upload/storage/v1/b/bucket-name/o",
query_string={"uploadType": "multipart", "name": "stall"},
content_type="multipart/related; boundary=" + boundary,
headers={
"x-retry-test-id": test_id,
},
data=payload,
)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
self.assertLess(elapsed_time, 1)
self.assertEqual(response.status_code, 200)


class TestTestbenchRetryGrpc(unittest.TestCase):
def setUp(self):
Expand Down