diff --git a/gcs/upload.py b/gcs/upload.py index 864d4e58..82a423ca 100644 --- a/gcs/upload.py +++ b/gcs/upload.py @@ -26,6 +26,8 @@ import testbench from google.storage.v2 import storage_pb2 +from . import object + class Upload(types.SimpleNamespace): """Holds data during an upload. @@ -280,6 +282,136 @@ def init_write_object_grpc(cls, db, request_iterator, context): upload.metadata.metadata["x_emulator_no_md5"] = "true" return upload, is_resumable + @classmethod + def process_bidi_write_object_grpc(cls, db, request_iterator, context): + """Process a BidiWriteObject streaming RPC, and yield a stream of responses.""" + upload, object_checksums, is_resumable = None, None, False + for request in request_iterator: + first_message = request.WhichOneof("first_message") + if first_message == "upload_id": # resumable upload + upload = db.get_upload(request.upload_id, context) + if upload.complete: + # Resuming an already finalized object will result with a response + # containing the uploaded object's metadata. + yield storage_pb2.BidiWriteObjectResponse( + resource=upload.blob.metadata + ) + is_resumable = True + elif ( + first_message == "write_object_spec" + ): # one shot upload (non-resumable) + bucket = db.get_bucket( + request.write_object_spec.resource.bucket, context + ).metadata + upload = cls.__init_first_write_grpc(request, bucket, context) + elif upload is None: + return testbench.error.invalid( + "Upload missing a first_message field", context + ) + + if request.HasField("object_checksums"): + # The object checksums may appear only in the first message *or* the last message, but not both + if first_message is None and request.finish_write == False: + return testbench.error.invalid( + "Object checksums can be included only in the first or last message", + context, + ) + if object_checksums is not None: + return testbench.error.invalid( + "Duplicate object checksums in upload", + context, + ) + object_checksums = request.object_checksums + + data = request.WhichOneof("data") + if data == "checksummed_data": + checksummed_data = request.checksummed_data + elif data is None and request.finish_write: + # Handles final message with no data to insert. + upload.complete = True + continue + + content = checksummed_data.content + crc32c_hash = ( + checksummed_data.crc32c if checksummed_data.HasField("crc32c") else None + ) + if crc32c_hash is not None: + actual_crc32c = crc32c.crc32c(content) + if actual_crc32c != crc32c_hash: + return testbench.error.mismatch( + "crc32c in checksummed data", + crc32c_hash, + actual_crc32c, + context, + ) + + # The testbench should ignore any request bytes that have already been persisted, + # thus we validate write_offset against persisted_size. + # https://github.com/googleapis/googleapis/blob/15b48f9ed0ae8b034e753c6895eb045f436e257c/google/storage/v2/storage.proto#L320-L329 + if request.write_offset < len(upload.media): + range_start = len(upload.media) - request.write_offset + content = testbench.common.partial_media( + content, range_end=len(content), range_start=range_start + ) + # Currently, the testbench will always checkpoint and flush data for testing purposes, + # instead of the 15 seconds interval used in the GCS server. + # TODO(#592): Refactor testbench checkpointing to more closely follow GCS server behavior. + upload.media += content + if request.finish_write: + upload.complete = True + elif request.state_lookup: + # For uploads not yet completed, yield response with persisted_size. + # For uploads that are complete, finalize the upload outside the request loop by + # storing full object checksums, creating new object, and yielding response with + # object metadata. + yield storage_pb2.BidiWriteObjectResponse( + persisted_size=len(upload.media) + ) + + if upload is None: + return testbench.error.invalid("Missing BidiWriteObjectRequest", context) + if object_checksums is None: + upload.metadata.metadata["x_emulator_no_crc32c"] = "true" + upload.metadata.metadata["x_emulator_no_md5"] = "true" + else: + if object_checksums.HasField("crc32c"): + upload.metadata.metadata[ + "x_emulator_crc32c" + ] = testbench.common.rest_crc32c_from_proto(object_checksums.crc32c) + else: + upload.metadata.metadata["x_emulator_no_crc32c"] = "true" + if ( + object_checksums.md5_hash is not None + and object_checksums.md5_hash != b"" + ): + upload.metadata.metadata[ + "x_emulator_md5" + ] = testbench.common.rest_md5_from_proto(object_checksums.md5_hash) + else: + upload.metadata.metadata["x_emulator_no_md5"] = "true" + + # Create a new object when the write is completed. + if upload.complete: + blob, _ = object.Object.init( + upload.request, + upload.metadata, + upload.media, + upload.bucket, + False, + context, + ) + upload.blob = blob + db.insert_object( + upload.bucket.name, + blob, + context=context, + preconditions=upload.preconditions, + ) + yield storage_pb2.BidiWriteObjectResponse(resource=blob.metadata) + else: + if not is_resumable: + return testbench.error.missing("finish_write in request", context) + def resumable_status_rest(self, override_308=False): response = flask.make_response() if len(self.media) > 1 and not self.complete: diff --git a/testbench/grpc_server.py b/testbench/grpc_server.py index e42ebe3b..c3f4a510 100644 --- a/testbench/grpc_server.py +++ b/testbench/grpc_server.py @@ -702,6 +702,11 @@ def WriteObject(self, request_iterator, context): ) return storage_pb2.WriteObjectResponse(resource=blob.metadata) + def BidiWriteObject(self, request_iterator, context): + return gcs.upload.Upload.process_bidi_write_object_grpc( + self.db, request_iterator, context + ) + @retry_test(method="storage.objects.list") def ListObjects(self, request, context): items, prefixes = self.db.list_object(request, request.parent, context) diff --git a/tests/test_grpc_server.py b/tests/test_grpc_server.py index 68785371..ac6f8abf 100755 --- a/tests/test_grpc_server.py +++ b/tests/test_grpc_server.py @@ -2056,6 +2056,128 @@ def test_echo_metadata(self): self.assertIn(("x-req-hdr1", "foo"), call.initial_metadata()) server.stop(grace=0) + def test_bidi_write_object(self): + QUANTUM = 256 * 1024 + media = TestGrpc._create_block(2 * QUANTUM + QUANTUM / 2).encode("utf-8") + + offset = 0 + content = media[0:QUANTUM] + r1 = storage_pb2.BidiWriteObjectRequest( + write_object_spec=storage_pb2.WriteObjectSpec( + resource={ + "name": "object-name", + "bucket": "projects/_/buckets/bucket-name", + }, + ), + write_offset=offset, + checksummed_data=storage_pb2.ChecksummedData( + content=content, crc32c=crc32c.crc32c(content) + ), + flush=True, + state_lookup=True, + finish_write=False, + ) + + offset = QUANTUM + content = media[QUANTUM : 2 * QUANTUM] + r2 = storage_pb2.BidiWriteObjectRequest( + write_offset=offset, + checksummed_data=storage_pb2.ChecksummedData( + content=content, crc32c=crc32c.crc32c(content) + ), + flush=True, + state_lookup=True, + finish_write=False, + ) + + offset = 2 * QUANTUM + content = media[QUANTUM:] + r3 = storage_pb2.BidiWriteObjectRequest( + write_offset=QUANTUM, + checksummed_data=storage_pb2.ChecksummedData( + content=content, crc32c=crc32c.crc32c(content) + ), + finish_write=True, + ) + streamer = self.grpc.BidiWriteObject([r1, r2, r3], context=self.mock_context()) + responses = list(streamer) + # We expect a total of 3 responses with state_lookup set to True. + self.assertEqual(len(responses), 3) + self.assertIsNotNone(responses[0].persisted_size) + self.assertIsNotNone(responses[1].persisted_size) + self.assertIsNotNone(responses[2].resource) + blob = responses[2].resource + self.assertEqual(blob.name, "object-name") + self.assertEqual(blob.bucket, "projects/_/buckets/bucket-name") + + def test_bidi_write_object_resumable(self): + start = self.grpc.StartResumableWrite( + storage_pb2.StartResumableWriteRequest( + write_object_spec=storage_pb2.WriteObjectSpec( + resource=storage_pb2.Object( + name="object-name", bucket="projects/_/buckets/bucket-name" + ) + ) + ), + context=unittest.mock.MagicMock(), + ) + self.assertIsNotNone(start.upload_id) + + QUANTUM = 256 * 1024 + media = TestGrpc._create_block(2 * QUANTUM + QUANTUM / 2).encode("utf-8") + offset = 0 + content = media[0:QUANTUM] + r1 = storage_pb2.BidiWriteObjectRequest( + upload_id=start.upload_id, + write_offset=offset, + checksummed_data=storage_pb2.ChecksummedData( + content=content, crc32c=crc32c.crc32c(content) + ), + flush=True, + finish_write=False, + ) + offset = QUANTUM + content = media[QUANTUM : 2 * QUANTUM] + r2 = storage_pb2.BidiWriteObjectRequest( + upload_id=start.upload_id, + write_offset=offset, + checksummed_data=storage_pb2.ChecksummedData( + content=content, crc32c=crc32c.crc32c(content) + ), + flush=True, + state_lookup=True, + finish_write=False, + ) + streamer = self.grpc.BidiWriteObject([r1, r2], "fake-context") + responses = list(streamer) + # We only expect 1 response with r2 state_lookup set to True. + self.assertEqual(len(responses), 1) + self.assertIsNotNone(responses[0]) + self.assertEqual(responses[0].persisted_size, 2 * QUANTUM) + + status = self.grpc.QueryWriteStatus( + storage_pb2.QueryWriteStatusRequest(upload_id=start.upload_id), + "fake-context", + ) + self.assertEqual(status.persisted_size, 2 * QUANTUM) + + offset = 2 * QUANTUM + content = media[2 * QUANTUM :] + r3 = storage_pb2.BidiWriteObjectRequest( + upload_id=start.upload_id, + write_offset=QUANTUM, + checksummed_data=storage_pb2.ChecksummedData( + content=content, crc32c=crc32c.crc32c(content) + ), + finish_write=True, + ) + streamer = self.grpc.BidiWriteObject([r3], "fake-context") + responses = list(streamer) + self.assertEqual(len(responses), 1) + blob = responses[0].resource + self.assertEqual(blob.name, "object-name") + self.assertEqual(blob.bucket, "projects/_/buckets/bucket-name") + if __name__ == "__main__": unittest.main() diff --git a/tests/test_upload.py b/tests/test_upload.py index 8664c5fb..ed241482 100644 --- a/tests/test_upload.py +++ b/tests/test_upload.py @@ -694,6 +694,302 @@ def test_init_object_write_grpc_final_message_empty_data(self): self.assertEqual(upload.metadata.name, "object") self.assertEqual(upload.metadata.bucket, "projects/_/buckets/bucket-name") + def test_process_bidi_write_grpc_resumable(self): + request = testbench.common.FakeRequest( + args={}, data=json.dumps({"name": "bucket-name"}) + ) + bucket, _ = gcs.bucket.Bucket.init(request, None) + request = storage_pb2.StartResumableWriteRequest( + write_object_spec=storage_pb2.WriteObjectSpec( + resource={"name": "object", "bucket": "projects/_/buckets/bucket-name"} + ) + ) + context = self.mock_context() + upload = gcs.upload.Upload.init_resumable_grpc( + request, bucket.metadata, context + ) + + line = b"The quick brown fox jumps over the lazy dog" + r1 = storage_pb2.BidiWriteObjectRequest( + upload_id=upload.upload_id, + write_offset=0, + checksummed_data=storage_pb2.ChecksummedData( + content=line, crc32c=crc32c.crc32c(line) + ), + finish_write=True, + ) + db = unittest.mock.Mock() + db.get_bucket = unittest.mock.MagicMock(return_value=bucket) + db.get_upload = unittest.mock.MagicMock(return_value=upload) + + context = self.mock_context() + streamer = gcs.upload.Upload.process_bidi_write_object_grpc(db, [r1], context) + responses = list(streamer) + blob = responses[0].resource + self.assertEqual(blob.name, "object") + self.assertEqual(blob.bucket, "projects/_/buckets/bucket-name") + + # Test resuming an already finalized object will result with + # a response containing the finalized object's metadata. + context = self.mock_context() + streamer = gcs.upload.Upload.process_bidi_write_object_grpc(db, [r1], context) + responses = list(streamer) + blob = responses[0].resource + self.assertEqual(blob.name, "object") + self.assertEqual(blob.bucket, "projects/_/buckets/bucket-name") + + def test_process_bidi_write_grpc_missing_first_message(self): + line = b"The quick brown fox jumps over the lazy dog" + r1 = storage_pb2.BidiWriteObjectRequest( + write_offset=0, + checksummed_data=storage_pb2.ChecksummedData( + content=line, crc32c=crc32c.crc32c(line) + ), + finish_write=True, + ) + db = unittest.mock.Mock() + context = self.mock_context() + list(gcs.upload.Upload.process_bidi_write_object_grpc(db, [r1], context)) + context.abort.assert_called_once_with( + grpc.StatusCode.INVALID_ARGUMENT, unittest.mock.ANY + ) + + def test_process_bidi_write_grpc_missing_finish_write(self): + line = b"The quick brown fox jumps over the lazy dog" + r1 = storage_pb2.BidiWriteObjectRequest( + write_object_spec=storage_pb2.WriteObjectSpec( + resource={"name": "object", "bucket": "projects/_/buckets/bucket-name"}, + ), + write_offset=0, + checksummed_data=storage_pb2.ChecksummedData( + content=line, crc32c=crc32c.crc32c(line) + ), + finish_write=False, + ) + db = unittest.mock.Mock() + context = self.mock_context() + list(gcs.upload.Upload.process_bidi_write_object_grpc(db, [r1], context)) + context.abort.assert_called_once_with( + grpc.StatusCode.INVALID_ARGUMENT, unittest.mock.ANY + ) + + def test_process_bidi_write_grpc_missing_checksum_at_invalid_place(self): + line = b"The quick brown fox jumps over the lazy dog" + r1 = storage_pb2.BidiWriteObjectRequest( + write_object_spec=storage_pb2.WriteObjectSpec( + resource={"name": "object", "bucket": "projects/_/buckets/bucket-name"}, + ), + write_offset=0, + checksummed_data=storage_pb2.ChecksummedData( + content=line, crc32c=crc32c.crc32c(line) + ), + finish_write=False, + ) + r2 = storage_pb2.BidiWriteObjectRequest( + write_offset=len(line), + checksummed_data=storage_pb2.ChecksummedData( + content=line, crc32c=crc32c.crc32c(line) + ), + object_checksums=storage_pb2.ObjectChecksums( + crc32c=crc32c.crc32c(b"".join(3 * [line])) + ), + finish_write=False, + ) + r3 = storage_pb2.BidiWriteObjectRequest( + write_offset=2 * len(line), + checksummed_data=storage_pb2.ChecksummedData( + content=line, crc32c=crc32c.crc32c(line) + ), + finish_write=True, + ) + db = unittest.mock.Mock() + context = self.mock_context() + list( + gcs.upload.Upload.process_bidi_write_object_grpc(db, [r1, r2, r3], context) + ) + context.abort.assert_called_once_with( + grpc.StatusCode.INVALID_ARGUMENT, unittest.mock.ANY + ) + + def test_process_bidi_write_grpc_checksum_duplicated(self): + line = b"The quick brown fox jumps over the lazy dog" + r1 = storage_pb2.BidiWriteObjectRequest( + write_object_spec=storage_pb2.WriteObjectSpec( + resource={"name": "object", "bucket": "projects/_/buckets/bucket-name"}, + ), + write_offset=0, + checksummed_data=storage_pb2.ChecksummedData( + content=line, crc32c=crc32c.crc32c(line) + ), + object_checksums=storage_pb2.ObjectChecksums( + crc32c=crc32c.crc32c(b"".join(3 * [line])) + ), + finish_write=False, + ) + r2 = storage_pb2.BidiWriteObjectRequest( + write_offset=len(line), + checksummed_data=storage_pb2.ChecksummedData( + content=line, crc32c=crc32c.crc32c(line) + ), + finish_write=False, + ) + r3 = storage_pb2.BidiWriteObjectRequest( + write_offset=2 * len(line), + checksummed_data=storage_pb2.ChecksummedData( + content=line, crc32c=crc32c.crc32c(line) + ), + object_checksums=storage_pb2.ObjectChecksums( + crc32c=crc32c.crc32c(b"".join(3 * [line])) + ), + finish_write=True, + ) + db = unittest.mock.Mock() + context = self.mock_context() + list( + gcs.upload.Upload.process_bidi_write_object_grpc(db, [r1, r2, r3], context) + ) + context.abort.assert_called_once_with( + grpc.StatusCode.INVALID_ARGUMENT, unittest.mock.ANY + ) + + def test_process_bidi_write_grpc_invalid_checksum(self): + line = b"The quick brown fox jumps over the lazy dog" + r1 = storage_pb2.BidiWriteObjectRequest( + write_object_spec=storage_pb2.WriteObjectSpec( + resource={"name": "object", "bucket": "projects/_/buckets/bucket-name"}, + ), + write_offset=0, + checksummed_data=storage_pb2.ChecksummedData( + content=line, crc32c=crc32c.crc32c(2 * line) + ), + object_checksums=storage_pb2.ObjectChecksums( + crc32c=crc32c.crc32c(b"".join(3 * [line])) + ), + finish_write=True, + ) + db = unittest.mock.Mock() + context = self.mock_context() + list(gcs.upload.Upload.process_bidi_write_object_grpc(db, [r1], context)) + context.abort.assert_called_once_with( + grpc.StatusCode.FAILED_PRECONDITION, unittest.mock.ANY + ) + + def test_process_bidi_write_grpc_checksums(self): + media = b"The quick brown fox jumps over the lazy dog" + proto_crc32c = crc32c.crc32c(media) + proto_md5_hash = hashlib.md5(media).digest() + + TEST_CASES = { + "both": { + "checksums": storage_pb2.ObjectChecksums( + crc32c=proto_crc32c, md5_hash=proto_md5_hash + ), + "expected": { + "x_emulator_crc32c": testbench.common.rest_crc32c_from_proto( + proto_crc32c + ), + "x_emulator_md5": testbench.common.rest_md5_from_proto( + proto_md5_hash + ), + }, + }, + "only md5": { + "checksums": storage_pb2.ObjectChecksums(md5_hash=proto_md5_hash), + "expected": { + "x_emulator_md5": testbench.common.rest_md5_from_proto( + proto_md5_hash + ), + "x_emulator_no_crc32c": "true", + }, + }, + "only crc32c": { + "checksums": storage_pb2.ObjectChecksums(crc32c=proto_crc32c), + "expected": { + "x_emulator_crc32c": testbench.common.rest_crc32c_from_proto( + proto_crc32c + ), + "x_emulator_no_md5": "true", + }, + }, + } + for name, test in TEST_CASES.items(): + request = testbench.common.FakeRequest( + args={}, data=json.dumps({"name": "bucket-name"}) + ) + bucket, _ = gcs.bucket.Bucket.init(request, None) + spec = storage_pb2.WriteObjectSpec( + resource={"name": "object", "bucket": "projects/_/buckets/bucket-name"}, + ) + request = storage_pb2.BidiWriteObjectRequest( + write_object_spec=spec, + write_offset=0, + object_checksums=test["checksums"], + finish_write=True, + ) + + context = self.mock_context() + db = unittest.mock.Mock() + db.get_bucket = unittest.mock.MagicMock(return_value=bucket) + responses = list( + gcs.upload.Upload.process_bidi_write_object_grpc(db, [request], context) + ) + # Verify the annotations inserted by the testbench. + annotations = responses[0].resource.metadata + expected = test["expected"] + self.maxDiff = None + self.assertEqual( + annotations, + {**annotations, **expected}, + msg="Testing with %s checksums" % name, + ) + + def test_process_bidi_write_grpc_empty(self): + db = unittest.mock.Mock() + context = self.mock_context() + list(gcs.upload.Upload.process_bidi_write_object_grpc(db, [], context)) + context.abort.assert_called_once_with( + grpc.StatusCode.INVALID_ARGUMENT, unittest.mock.ANY + ) + + def test_process_bidi_write_grpc_final_message_empty_data(self): + request = testbench.common.FakeRequest( + args={}, data=json.dumps({"name": "bucket-name"}) + ) + bucket, _ = gcs.bucket.Bucket.init(request, None) + request = storage_pb2.StartResumableWriteRequest( + write_object_spec=storage_pb2.WriteObjectSpec( + resource={"name": "object", "bucket": "projects/_/buckets/bucket-name"} + ) + ) + context = self.mock_context() + upload = gcs.upload.Upload.init_resumable_grpc( + request, bucket.metadata, context + ) + + line = b"The quick brown fox jumps over the lazy dog" + r1 = storage_pb2.BidiWriteObjectRequest( + upload_id=upload.upload_id, + write_offset=0, + checksummed_data=storage_pb2.ChecksummedData( + content=line, crc32c=crc32c.crc32c(line) + ), + finish_write=False, + ) + r2 = storage_pb2.BidiWriteObjectRequest( + write_offset=len(line), + finish_write=True, + ) + context = self.mock_context() + db = unittest.mock.Mock() + db.get_bucket = unittest.mock.MagicMock(return_value=bucket) + db.get_upload = unittest.mock.MagicMock(return_value=upload) + responses = list( + gcs.upload.Upload.process_bidi_write_object_grpc(db, [r1, r2], context) + ) + blob = responses[0].resource + self.assertEqual(blob.name, "object") + self.assertEqual(blob.bucket, "projects/_/buckets/bucket-name") + if __name__ == "__main__": unittest.main()