Skip to content

Commit

Permalink
feat: support retry test api in BidiWriteObject (#619)
Browse files Browse the repository at this point in the history
* feat: support retry test api in BidiWriteObject

* add test case

* test initial response error

* update comment
  • Loading branch information
cojenco authored Mar 20, 2024
1 parent 18a8daf commit 2689610
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 0 deletions.
23 changes: 23 additions & 0 deletions gcs/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,29 @@ def process_bidi_write_object_grpc(cls, db, request_iterator, context):
context,
)

# Handle retry test return-X-after-YK failures if applicable.
(
rest_code,
after_bytes,
test_id,
) = testbench.common.get_retry_uploads_error_after_bytes(
db, request, context=context, transport="GRPC"
)
expected_persisted_size = request.write_offset + len(content)
if rest_code:
testbench.common.handle_grpc_retry_uploads_error_after_bytes(
context,
upload,
content,
db,
rest_code,
after_bytes,
write_offset=request.write_offset,
persisted_size=len(upload.media),
expected_persisted_size=expected_persisted_size,
test_id=test_id,
)

# The testbench should ignore any request bytes that have already been persisted,
# thus we validate write_offset against persisted_size.
# https://github.com/googleapis/googleapis/blob/15b48f9ed0ae8b034e753c6895eb045f436e257c/google/storage/v2/storage.proto#L320-L329
Expand Down
1 change: 1 addition & 0 deletions testbench/grpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ def WriteObject(self, request_iterator, context):
)
return storage_pb2.WriteObjectResponse(resource=blob.metadata)

@retry_test(method="storage.objects.insert")
def BidiWriteObject(self, request_iterator, context):
return gcs.upload.Upload.process_bidi_write_object_grpc(
self.db, request_iterator, context
Expand Down
74 changes: 74 additions & 0 deletions tests/test_testbench_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,80 @@ def test_grpc_return_error_after_bytes(self):
self.assertEqual(blob.bucket, "projects/_/buckets/bucket-name")
self.assertEqual(blob.size, 2 * UPLOAD_QUANTUM)

def test_grpc_bidiwrite_return_error_after_bytes(self):
# Setup an initial-response error and two after-bytes errors to test injecting
# failures in resumable uploads, both multiple chunks and a single chunk.
response = self.rest_client.post(
"/retry_test",
data=json.dumps(
{
"instructions": {
"storage.objects.insert": [
"return-503",
"return-503-after-256K",
"return-503-after-300K",
]
},
"transport": "GRPC",
}
),
)
self.assertEqual(response.status_code, 200)
create_rest = json.loads(response.data)
self.assertIn("id", create_rest)
id = create_rest.get("id")

context = unittest.mock.Mock()
context.invocation_metadata = unittest.mock.Mock(
return_value=(("x-retry-test-id", id),)
)
start = self.grpc.StartResumableWrite(
storage_pb2.StartResumableWriteRequest(
write_object_spec=storage_pb2.WriteObjectSpec(
resource=storage_pb2.Object(
name="object-name", bucket="projects/_/buckets/bucket-name"
)
)
),
context=context,
)
context.abort.assert_called_with(StatusCode.UNAVAILABLE, unittest.mock.ANY)
self.assertIsNotNone(start.upload_id)

# Upload the first 256KiB chunk of data and trigger error.
content = self._create_block(UPLOAD_QUANTUM).encode("utf-8")
r1 = storage_pb2.BidiWriteObjectRequest(
upload_id=start.upload_id,
write_offset=0,
checksummed_data=storage_pb2.ChecksummedData(
content=content, crc32c=crc32c.crc32c(content)
),
finish_write=False,
)
list(self.grpc.BidiWriteObject([r1], context))
context.abort.assert_called_with(StatusCode.UNAVAILABLE, unittest.mock.ANY)

# Send a full object upload here to verify testbench can
# (1) trigger error_after_bytes instructions,
# (2) ignore duplicate request bytes and
# (3) return a forced failure with partial data.
media = self._create_block(2 * UPLOAD_QUANTUM).encode("utf-8")
r2 = storage_pb2.BidiWriteObjectRequest(
upload_id=start.upload_id,
write_offset=0,
checksummed_data=storage_pb2.ChecksummedData(
content=media, crc32c=crc32c.crc32c(media)
),
finish_write=True,
)
streamer = self.grpc.BidiWriteObject([r2], context)
responses = list(streamer)
context.abort.assert_called_with(StatusCode.UNAVAILABLE, unittest.mock.ANY)
blob = responses[0].resource
self.assertEqual(blob.name, "object-name")
self.assertEqual(blob.bucket, "projects/_/buckets/bucket-name")
self.assertEqual(blob.size, 2 * UPLOAD_QUANTUM)


if __name__ == "__main__":
unittest.main()

0 comments on commit 2689610

Please sign in to comment.