From 268961078e87233451134b57c89f06d628204882 Mon Sep 17 00:00:00 2001 From: cojenco Date: Wed, 20 Mar 2024 14:32:25 -0700 Subject: [PATCH] feat: support retry test api in BidiWriteObject (#619) * feat: support retry test api in BidiWriteObject * add test case * test initial response error * update comment --- gcs/upload.py | 23 +++++++++++ testbench/grpc_server.py | 1 + tests/test_testbench_retry.py | 74 +++++++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+) diff --git a/gcs/upload.py b/gcs/upload.py index d1d9b9bb..f1af0b1e 100644 --- a/gcs/upload.py +++ b/gcs/upload.py @@ -347,6 +347,29 @@ def process_bidi_write_object_grpc(cls, db, request_iterator, context): context, ) + # Handle retry test return-X-after-YK failures if applicable. + ( + rest_code, + after_bytes, + test_id, + ) = testbench.common.get_retry_uploads_error_after_bytes( + db, request, context=context, transport="GRPC" + ) + expected_persisted_size = request.write_offset + len(content) + if rest_code: + testbench.common.handle_grpc_retry_uploads_error_after_bytes( + context, + upload, + content, + db, + rest_code, + after_bytes, + write_offset=request.write_offset, + persisted_size=len(upload.media), + expected_persisted_size=expected_persisted_size, + test_id=test_id, + ) + # The testbench should ignore any request bytes that have already been persisted, # thus we validate write_offset against persisted_size. # https://github.com/googleapis/googleapis/blob/15b48f9ed0ae8b034e753c6895eb045f436e257c/google/storage/v2/storage.proto#L320-L329 diff --git a/testbench/grpc_server.py b/testbench/grpc_server.py index 2827fbba..b5573099 100644 --- a/testbench/grpc_server.py +++ b/testbench/grpc_server.py @@ -705,6 +705,7 @@ def WriteObject(self, request_iterator, context): ) return storage_pb2.WriteObjectResponse(resource=blob.metadata) + @retry_test(method="storage.objects.insert") def BidiWriteObject(self, request_iterator, context): return gcs.upload.Upload.process_bidi_write_object_grpc( self.db, request_iterator, context diff --git a/tests/test_testbench_retry.py b/tests/test_testbench_retry.py index c781ee13..28dcbb84 100644 --- a/tests/test_testbench_retry.py +++ b/tests/test_testbench_retry.py @@ -786,6 +786,80 @@ def test_grpc_return_error_after_bytes(self): self.assertEqual(blob.bucket, "projects/_/buckets/bucket-name") self.assertEqual(blob.size, 2 * UPLOAD_QUANTUM) + def test_grpc_bidiwrite_return_error_after_bytes(self): + # Setup an initial-response error and two after-bytes errors to test injecting + # failures in resumable uploads, both multiple chunks and a single chunk. + response = self.rest_client.post( + "/retry_test", + data=json.dumps( + { + "instructions": { + "storage.objects.insert": [ + "return-503", + "return-503-after-256K", + "return-503-after-300K", + ] + }, + "transport": "GRPC", + } + ), + ) + self.assertEqual(response.status_code, 200) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + id = create_rest.get("id") + + context = unittest.mock.Mock() + context.invocation_metadata = unittest.mock.Mock( + return_value=(("x-retry-test-id", id),) + ) + start = self.grpc.StartResumableWrite( + storage_pb2.StartResumableWriteRequest( + write_object_spec=storage_pb2.WriteObjectSpec( + resource=storage_pb2.Object( + name="object-name", bucket="projects/_/buckets/bucket-name" + ) + ) + ), + context=context, + ) + context.abort.assert_called_with(StatusCode.UNAVAILABLE, unittest.mock.ANY) + self.assertIsNotNone(start.upload_id) + + # Upload the first 256KiB chunk of data and trigger error. + content = self._create_block(UPLOAD_QUANTUM).encode("utf-8") + r1 = storage_pb2.BidiWriteObjectRequest( + upload_id=start.upload_id, + write_offset=0, + checksummed_data=storage_pb2.ChecksummedData( + content=content, crc32c=crc32c.crc32c(content) + ), + finish_write=False, + ) + list(self.grpc.BidiWriteObject([r1], context)) + context.abort.assert_called_with(StatusCode.UNAVAILABLE, unittest.mock.ANY) + + # Send a full object upload here to verify testbench can + # (1) trigger error_after_bytes instructions, + # (2) ignore duplicate request bytes and + # (3) return a forced failure with partial data. + media = self._create_block(2 * UPLOAD_QUANTUM).encode("utf-8") + r2 = storage_pb2.BidiWriteObjectRequest( + upload_id=start.upload_id, + write_offset=0, + checksummed_data=storage_pb2.ChecksummedData( + content=media, crc32c=crc32c.crc32c(media) + ), + finish_write=True, + ) + streamer = self.grpc.BidiWriteObject([r2], context) + responses = list(streamer) + context.abort.assert_called_with(StatusCode.UNAVAILABLE, unittest.mock.ANY) + blob = responses[0].resource + self.assertEqual(blob.name, "object-name") + self.assertEqual(blob.bucket, "projects/_/buckets/bucket-name") + self.assertEqual(blob.size, 2 * UPLOAD_QUANTUM) + if __name__ == "__main__": unittest.main()