Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): add full object checksum to reader.Attrs #10538

Merged
merged 4 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,9 +1101,11 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
wantCRC uint32
checkCRC bool
)
if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil && params.offset == 0 && params.length < 0 {
if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil {
if params.offset == 0 && params.length < 0 {
checkCRC = true
}
wantCRC = checksums.GetCrc32C()
checkCRC = true
}

r = &Reader{
Expand All @@ -1115,6 +1117,7 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
LastModified: obj.GetUpdateTime().AsTime(),
Metageneration: obj.GetMetageneration(),
Generation: obj.GetGeneration(),
CRC32C: wantCRC,
},
reader: &gRPCReader{
stream: res.stream,
Expand Down
28 changes: 16 additions & 12 deletions storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1415,18 +1415,20 @@ func parseReadResponse(res *http.Response, params *newRangeReaderParams, reopen
}
} else {
size = res.ContentLength
// Check the CRC iff all of the following hold:
// - We asked for content (length != 0).
// - We got all the content (status != PartialContent).
// - The server sent a CRC header.
// - The Go http stack did not uncompress the file.
// - We were not served compressed data that was uncompressed on download.
// The problem with the last two cases is that the CRC will not match -- GCS
// computes it on the compressed contents, but we compute it on the
// uncompressed contents.
if params.length != 0 && !res.Uncompressed && !uncompressedByServer(res) {
crc, checkCRC = parseCRC32c(res)
}
}

// Check the CRC iff all of the following hold:
// - We asked for content (length != 0).
// - We got all the content (status != PartialContent).
// - The server sent a CRC header.
// - The Go http stack did not uncompress the file.
// - We were not served compressed data that was uncompressed on download.
// The problem with the last two cases is that the CRC will not match -- GCS
// computes it on the compressed contents, but we compute it on the
// uncompressed contents.
crc, checkCRC = parseCRC32c(res)
if params.length == 0 || res.StatusCode == http.StatusPartialContent || res.Uncompressed || uncompressedByServer(res) {
checkCRC = false
}

remain := res.ContentLength
Expand Down Expand Up @@ -1463,6 +1465,8 @@ func parseReadResponse(res *http.Response, params *newRangeReaderParams, reopen
StartOffset: startOffset,
Generation: params.gen,
Metageneration: metaGen,
CRC32C: crc,
Decompressed: res.Uncompressed || uncompressedByServer(res),
}
return &Reader{
Attrs: attrs,
Expand Down
139 changes: 87 additions & 52 deletions storage/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3631,78 +3631,87 @@ func TestIntegration_ReadCRC(t *testing.T) {
offset, length int64
readCompressed bool // don't decompress a gzipped file

wantErr bool
wantCheck bool // Should Reader try to check the CRC?
wantErr bool
wantCheck bool // Should Reader try to check the CRC?
wantDecompressed bool
}{
{
desc: "uncompressed, entire file",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: -1,
readCompressed: false,
wantCheck: true,
desc: "uncompressed, entire file",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: -1,
readCompressed: false,
wantCheck: true,
wantDecompressed: false,
},
{
desc: "uncompressed, entire file, don't decompress",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: -1,
readCompressed: true,
wantCheck: true,
desc: "uncompressed, entire file, don't decompress",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: -1,
readCompressed: true,
wantCheck: true,
wantDecompressed: false,
},
{
desc: "uncompressed, suffix",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 1,
length: -1,
readCompressed: false,
wantCheck: false,
desc: "uncompressed, suffix",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 1,
length: -1,
readCompressed: false,
wantCheck: false,
wantDecompressed: false,
},
{
desc: "uncompressed, prefix",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: 18,
readCompressed: false,
wantCheck: false,
desc: "uncompressed, prefix",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: 18,
readCompressed: false,
wantCheck: false,
wantDecompressed: false,
},
{
// When a gzipped file is unzipped on read, we can't verify the checksum
// because it was computed against the zipped contents. We can detect
// this case using http.Response.Uncompressed.
desc: "compressed, entire file, unzipped",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 0,
length: -1,
readCompressed: false,
wantCheck: false,
desc: "compressed, entire file, unzipped",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 0,
length: -1,
readCompressed: false,
wantCheck: false,
wantDecompressed: true,
},
{
// When we read a gzipped file uncompressed, it's like reading a regular file:
// the served content and the CRC match.
desc: "compressed, entire file, read compressed",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 0,
length: -1,
readCompressed: true,
wantCheck: true,
desc: "compressed, entire file, read compressed",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 0,
length: -1,
readCompressed: true,
wantCheck: true,
wantDecompressed: false,
},
{
desc: "compressed, partial, server unzips",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 1,
length: 8,
readCompressed: false,
wantErr: true, // GCS can't serve part of a gzipped object
wantCheck: false,
desc: "compressed, partial, server unzips",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 1,
length: 8,
readCompressed: false,
wantErr: true, // GCS can't serve part of a gzipped object
wantCheck: false,
wantDecompressed: true,
},
{
desc: "compressed, partial, read compressed",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 1,
length: 8,
readCompressed: true,
wantCheck: false,
desc: "compressed, partial, read compressed",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 1,
length: 8,
readCompressed: true,
wantCheck: false,
wantDecompressed: false,
},
} {
t.Run(test.desc, func(t *testing.T) {
Expand All @@ -3720,13 +3729,17 @@ func TestIntegration_ReadCRC(t *testing.T) {
if got, want := r.checkCRC, test.wantCheck; got != want {
t.Errorf("%s, checkCRC: got %t, want %t", test.desc, got, want)
}

if got, want := r.Attrs.Decompressed, test.wantDecompressed; got != want {
t.Errorf("Attrs.Decompressed: got %t, want %t", got, want)
}

_, err = c.readFunc(r)
_ = r.Close()
if err != nil {
t.Fatalf("%s: %v", test.desc, err)
}
})

}
})
}
Expand Down Expand Up @@ -4767,6 +4780,10 @@ func TestIntegration_Reader(t *testing.T) {
if got, want := rc.ContentType(), "text/plain"; got != want {
t.Errorf("ContentType (%q) = %q; want %q", obj, got, want)
}

if got, want := rc.Attrs.CRC32C, crc32c(contents[obj]); got != want {
t.Errorf("CRC32C (%q) = %d; want %d", obj, got, want)
}
rc.Close()

// Check early close.
Expand Down Expand Up @@ -4831,6 +4848,15 @@ func TestIntegration_Reader(t *testing.T) {
if len(slurp) != int(r.want) {
t.Fatalf("%+v: RangeReader (%d, %d): Read %d bytes, wanted %d bytes", r.desc, r.offset, r.length, len(slurp), r.want)
}
// JSON does not return the crc32c on partial reads, so
// allow got == 0.
if got, want := rc.Attrs.CRC32C, crc32c(contents[obj]); got != 0 && got != want {
t.Errorf("RangeReader CRC32C (%q) = %d; want %d", obj, got, want)
}

if rc.Attrs.Decompressed {
t.Errorf("RangeReader Decompressed (%q) = want false, got %v", obj, rc.Attrs.Decompressed)
}

switch {
case r.offset < 0: // The case of reading the last N bytes.
Expand Down Expand Up @@ -4912,6 +4938,7 @@ func TestIntegration_ReaderAttrs(t *testing.T) {
LastModified: got.LastModified, // ignored, tested separately
Generation: attrs.Generation,
Metageneration: attrs.Metageneration,
CRC32C: crc32c(c),
}
if got != want {
t.Fatalf("got\t%v,\nwanted\t%v", got, want)
Expand Down Expand Up @@ -5211,6 +5238,10 @@ func TestIntegration_NewReaderWithContentEncodingGzip(t *testing.T) {
if g, w := blob2kBTo3kB, original; !bytes.Equal(g, w) {
t.Fatalf("Body mismatch\nGot:\n%s\n\nWant:\n%s", g, w)
}

if !r2kBTo3kB.Attrs.Decompressed {
t.Errorf("Attrs.Decompressed: want true, got %v", r2kBTo3kB.Attrs.Decompressed)
}
})
}
})
Expand Down Expand Up @@ -6350,3 +6381,7 @@ func setUpRequesterPaysBucket(ctx context.Context, t *testing.T, bucket, object
}
})
}

func crc32c(b []byte) uint32 {
return crc32.Checksum(b, crc32.MakeTable(crc32.Castagnoli))
}
16 changes: 15 additions & 1 deletion storage/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ type ReaderObjectAttrs struct {
// meaningful in the context of a particular generation of a
// particular object.
Metageneration int64

// CRC32C is the CRC32 checksum of the entire object's content using the
// Castagnoli93 polynomial, if available.
CRC32C uint32

// Decompressed is true if the object is stored as a gzip file and was
// decompressed when read.
// Objects are automatically decompressed if the object's metadata property
// "Content-Encoding" is set to "gzip" or satisfies decompressive
// transcoding as per https://cloud.google.com/storage/docs/transcoding.
//
// To prevent decompression on reads, use [ObjectHandle.ReadCompressed].
Decompressed bool
}

// NewReader creates a new Reader to read the contents of the
Expand All @@ -91,7 +104,8 @@ func (o *ObjectHandle) NewReader(ctx context.Context) (*Reader, error) {
// If the object's metadata property "Content-Encoding" is set to "gzip" or satisfies
// decompressive transcoding per https://cloud.google.com/storage/docs/transcoding
// that file will be served back whole, regardless of the requested range as
// Google Cloud Storage dictates.
// Google Cloud Storage dictates. If decompressive transcoding occurs,
// [Reader.Attrs.Decompressed] will be true.
//
// By default, reads are made using the Cloud Storage XML API. We recommend
// using the JSON API instead, which can be done by setting [WithJSONReads]
Expand Down
Loading