Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gRPC): implement BidiWriteObject #586

Merged
merged 10 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions gcs/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import testbench
from google.storage.v2 import storage_pb2

from . import object


class Upload(types.SimpleNamespace):
"""Holds data during an upload.
Expand Down Expand Up @@ -280,6 +282,136 @@ def init_write_object_grpc(cls, db, request_iterator, context):
upload.metadata.metadata["x_emulator_no_md5"] = "true"
return upload, is_resumable

@classmethod
def process_bidi_write_object_grpc(cls, db, request_iterator, context):
coryan marked this conversation as resolved.
Show resolved Hide resolved
"""Process a BidiWriteObject streaming RPC, and yield a stream of responses."""
upload, object_checksums, is_resumable = None, None, False
for request in request_iterator:
first_message = request.WhichOneof("first_message")
if first_message == "upload_id": # resumable upload
upload = db.get_upload(request.upload_id, context)
if upload.complete:
# Resuming an already finalized object will result with a response
# containing the uploaded object's metadata.
yield storage_pb2.BidiWriteObjectResponse(
resource=upload.blob.metadata
)
is_resumable = True
elif (
first_message == "write_object_spec"
): # one shot upload (non-resumable)
bucket = db.get_bucket(
request.write_object_spec.resource.bucket, context
).metadata
upload = cls.__init_first_write_grpc(request, bucket, context)
elif upload is None:
return testbench.error.invalid(
"Upload missing a first_message field", context
)

if request.HasField("object_checksums"):
# The object checksums may appear only in the first message *or* the last message, but not both
if first_message is None and request.finish_write == False:
return testbench.error.invalid(
"Object checksums can be included only in the first or last message",
context,
)
if object_checksums is not None:
return testbench.error.invalid(
"Duplicate object checksums in upload",
context,
)
object_checksums = request.object_checksums

data = request.WhichOneof("data")
if data == "checksummed_data":
checksummed_data = request.checksummed_data
elif data is None and request.finish_write:
# Handles final message with no data to insert.
upload.complete = True
continue

content = checksummed_data.content
crc32c_hash = (
checksummed_data.crc32c if checksummed_data.HasField("crc32c") else None
)
if crc32c_hash is not None:
actual_crc32c = crc32c.crc32c(content)
if actual_crc32c != crc32c_hash:
return testbench.error.mismatch(
"crc32c in checksummed data",
crc32c_hash,
actual_crc32c,
context,
)

# The testbench should ignore any request bytes that have already been persisted,
# thus we validate write_offset against persisted_size.
# https://github.com/googleapis/googleapis/blob/15b48f9ed0ae8b034e753c6895eb045f436e257c/google/storage/v2/storage.proto#L320-L329
if request.write_offset < len(upload.media):
range_start = len(upload.media) - request.write_offset
content = testbench.common.partial_media(
content, range_end=len(content), range_start=range_start
)
# Currently, the testbench will always checkpoint and flush data for testing purposes,
# instead of the 15 seconds interval used in the GCS server.
# TODO(#592): Refactor testbench checkpointing to more closely follow GCS server behavior.
upload.media += content
if request.finish_write:
upload.complete = True
elif request.state_lookup:
# For uploads not yet completed, yield response with persisted_size.
# For uploads that are complete, finalize the upload outside the request loop by
# storing full object checksums, creating new object, and yielding response with
# object metadata.
yield storage_pb2.BidiWriteObjectResponse(
persisted_size=len(upload.media)
)

if upload is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking this after doing upload.media += content seems like a bug.

Seems like this check should be around line 350 or so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to handle cases with missing BidiWriteObjectRequest. I probably went overboard, and there's also a test case for this gcs.upload.Upload.process_bidi_write_object_grpc(db, [], context)

def test_init_object_write_grpc_empty(self):
db = unittest.mock.Mock()
context = self.mock_context()
upload, _ = gcs.upload.Upload.init_write_object_grpc(db, [], context)
self.assertIsNone(upload)
context.abort.assert_called_once_with(
grpc.StatusCode.INVALID_ARGUMENT, unittest.mock.ANY
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but if upload is None then line 359 would have raised an exception before you got here, right? You probably want to detect the problem before the first use of upload. Did I miss something? If so, please just merge, otherwise please reorder the test to prevent crashes exceptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inside the request loop on L307, we're checking if upload is None before the first use.

This extra check is outside the request loop. It covers the rare case of missing requests, and prevents crashes in the attempt to finalize an upload

return testbench.error.invalid("Missing BidiWriteObjectRequest", context)
if object_checksums is None:
upload.metadata.metadata["x_emulator_no_crc32c"] = "true"
upload.metadata.metadata["x_emulator_no_md5"] = "true"
else:
if object_checksums.HasField("crc32c"):
upload.metadata.metadata[
"x_emulator_crc32c"
] = testbench.common.rest_crc32c_from_proto(object_checksums.crc32c)
else:
upload.metadata.metadata["x_emulator_no_crc32c"] = "true"
if (
object_checksums.md5_hash is not None
and object_checksums.md5_hash != b""
):
upload.metadata.metadata[
"x_emulator_md5"
] = testbench.common.rest_md5_from_proto(object_checksums.md5_hash)
else:
upload.metadata.metadata["x_emulator_no_md5"] = "true"

# Create a new object when the write is completed.
if upload.complete:
blob, _ = object.Object.init(
upload.request,
upload.metadata,
upload.media,
upload.bucket,
False,
context,
)
upload.blob = blob
db.insert_object(
upload.bucket.name,
blob,
context=context,
preconditions=upload.preconditions,
)
yield storage_pb2.BidiWriteObjectResponse(resource=blob.metadata)
else:
if not is_resumable:
return testbench.error.missing("finish_write in request", context)

def resumable_status_rest(self, override_308=False):
response = flask.make_response()
if len(self.media) > 1 and not self.complete:
Expand Down
5 changes: 5 additions & 0 deletions testbench/grpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,11 @@ def WriteObject(self, request_iterator, context):
)
return storage_pb2.WriteObjectResponse(resource=blob.metadata)

def BidiWriteObject(self, request_iterator, context):
return gcs.upload.Upload.process_bidi_write_object_grpc(
self.db, request_iterator, context
)

@retry_test(method="storage.objects.list")
def ListObjects(self, request, context):
items, prefixes = self.db.list_object(request, request.parent, context)
Expand Down
122 changes: 122 additions & 0 deletions tests/test_grpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2056,6 +2056,128 @@ def test_echo_metadata(self):
self.assertIn(("x-req-hdr1", "foo"), call.initial_metadata())
server.stop(grace=0)

def test_bidi_write_object(self):
QUANTUM = 256 * 1024
media = TestGrpc._create_block(2 * QUANTUM + QUANTUM / 2).encode("utf-8")

offset = 0
content = media[0:QUANTUM]
r1 = storage_pb2.BidiWriteObjectRequest(
write_object_spec=storage_pb2.WriteObjectSpec(
resource={
"name": "object-name",
"bucket": "projects/_/buckets/bucket-name",
},
),
write_offset=offset,
checksummed_data=storage_pb2.ChecksummedData(
content=content, crc32c=crc32c.crc32c(content)
),
flush=True,
state_lookup=True,
finish_write=False,
)

offset = QUANTUM
content = media[QUANTUM : 2 * QUANTUM]
r2 = storage_pb2.BidiWriteObjectRequest(
write_offset=offset,
checksummed_data=storage_pb2.ChecksummedData(
content=content, crc32c=crc32c.crc32c(content)
),
flush=True,
state_lookup=True,
finish_write=False,
)

offset = 2 * QUANTUM
content = media[QUANTUM:]
r3 = storage_pb2.BidiWriteObjectRequest(
write_offset=QUANTUM,
checksummed_data=storage_pb2.ChecksummedData(
content=content, crc32c=crc32c.crc32c(content)
),
finish_write=True,
)
streamer = self.grpc.BidiWriteObject([r1, r2, r3], context=self.mock_context())
responses = list(streamer)
# We expect a total of 3 responses with state_lookup set to True.
self.assertEqual(len(responses), 3)
self.assertIsNotNone(responses[0].persisted_size)
self.assertIsNotNone(responses[1].persisted_size)
self.assertIsNotNone(responses[2].resource)
blob = responses[2].resource
self.assertEqual(blob.name, "object-name")
self.assertEqual(blob.bucket, "projects/_/buckets/bucket-name")

def test_bidi_write_object_resumable(self):
start = self.grpc.StartResumableWrite(
storage_pb2.StartResumableWriteRequest(
write_object_spec=storage_pb2.WriteObjectSpec(
resource=storage_pb2.Object(
name="object-name", bucket="projects/_/buckets/bucket-name"
)
)
),
context=unittest.mock.MagicMock(),
)
self.assertIsNotNone(start.upload_id)

QUANTUM = 256 * 1024
media = TestGrpc._create_block(2 * QUANTUM + QUANTUM / 2).encode("utf-8")
offset = 0
content = media[0:QUANTUM]
r1 = storage_pb2.BidiWriteObjectRequest(
upload_id=start.upload_id,
write_offset=offset,
checksummed_data=storage_pb2.ChecksummedData(
content=content, crc32c=crc32c.crc32c(content)
),
flush=True,
finish_write=False,
)
offset = QUANTUM
content = media[QUANTUM : 2 * QUANTUM]
r2 = storage_pb2.BidiWriteObjectRequest(
upload_id=start.upload_id,
write_offset=offset,
checksummed_data=storage_pb2.ChecksummedData(
content=content, crc32c=crc32c.crc32c(content)
),
flush=True,
state_lookup=True,
finish_write=False,
)
streamer = self.grpc.BidiWriteObject([r1, r2], "fake-context")
responses = list(streamer)
# We only expect 1 response with r2 state_lookup set to True.
self.assertEqual(len(responses), 1)
self.assertIsNotNone(responses[0])
self.assertEqual(responses[0].persisted_size, 2 * QUANTUM)

status = self.grpc.QueryWriteStatus(
storage_pb2.QueryWriteStatusRequest(upload_id=start.upload_id),
"fake-context",
)
self.assertEqual(status.persisted_size, 2 * QUANTUM)

offset = 2 * QUANTUM
content = media[2 * QUANTUM :]
r3 = storage_pb2.BidiWriteObjectRequest(
upload_id=start.upload_id,
write_offset=QUANTUM,
checksummed_data=storage_pb2.ChecksummedData(
content=content, crc32c=crc32c.crc32c(content)
),
finish_write=True,
)
streamer = self.grpc.BidiWriteObject([r3], "fake-context")
responses = list(streamer)
self.assertEqual(len(responses), 1)
blob = responses[0].resource
self.assertEqual(blob.name, "object-name")
self.assertEqual(blob.bucket, "projects/_/buckets/bucket-name")


if __name__ == "__main__":
unittest.main()
Loading