Skip to content

Commit

Permalink
feat(storage): add full object checksum to reader.Attrs (#10538)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennaEpp authored Jul 19, 2024
1 parent 29b52dc commit 245d2ea
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 67 deletions.
7 changes: 5 additions & 2 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,9 +1101,11 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
wantCRC uint32
checkCRC bool
)
if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil && params.offset == 0 && params.length < 0 {
if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil {
if params.offset == 0 && params.length < 0 {
checkCRC = true
}
wantCRC = checksums.GetCrc32C()
checkCRC = true
}

r = &Reader{
Expand All @@ -1115,6 +1117,7 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
LastModified: obj.GetUpdateTime().AsTime(),
Metageneration: obj.GetMetageneration(),
Generation: obj.GetGeneration(),
CRC32C: wantCRC,
},
reader: &gRPCReader{
stream: res.stream,
Expand Down
28 changes: 16 additions & 12 deletions storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1415,18 +1415,20 @@ func parseReadResponse(res *http.Response, params *newRangeReaderParams, reopen
}
} else {
size = res.ContentLength
// Check the CRC iff all of the following hold:
// - We asked for content (length != 0).
// - We got all the content (status != PartialContent).
// - The server sent a CRC header.
// - The Go http stack did not uncompress the file.
// - We were not served compressed data that was uncompressed on download.
// The problem with the last two cases is that the CRC will not match -- GCS
// computes it on the compressed contents, but we compute it on the
// uncompressed contents.
if params.length != 0 && !res.Uncompressed && !uncompressedByServer(res) {
crc, checkCRC = parseCRC32c(res)
}
}

// Check the CRC iff all of the following hold:
// - We asked for content (length != 0).
// - We got all the content (status != PartialContent).
// - The server sent a CRC header.
// - The Go http stack did not uncompress the file.
// - We were not served compressed data that was uncompressed on download.
// The problem with the last two cases is that the CRC will not match -- GCS
// computes it on the compressed contents, but we compute it on the
// uncompressed contents.
crc, checkCRC = parseCRC32c(res)
if params.length == 0 || res.StatusCode == http.StatusPartialContent || res.Uncompressed || uncompressedByServer(res) {
checkCRC = false
}

remain := res.ContentLength
Expand Down Expand Up @@ -1463,6 +1465,8 @@ func parseReadResponse(res *http.Response, params *newRangeReaderParams, reopen
StartOffset: startOffset,
Generation: params.gen,
Metageneration: metaGen,
CRC32C: crc,
Decompressed: res.Uncompressed || uncompressedByServer(res),
}
return &Reader{
Attrs: attrs,
Expand Down
139 changes: 87 additions & 52 deletions storage/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3631,78 +3631,87 @@ func TestIntegration_ReadCRC(t *testing.T) {
offset, length int64
readCompressed bool // don't decompress a gzipped file

wantErr bool
wantCheck bool // Should Reader try to check the CRC?
wantErr bool
wantCheck bool // Should Reader try to check the CRC?
wantDecompressed bool
}{
{
desc: "uncompressed, entire file",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: -1,
readCompressed: false,
wantCheck: true,
desc: "uncompressed, entire file",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: -1,
readCompressed: false,
wantCheck: true,
wantDecompressed: false,
},
{
desc: "uncompressed, entire file, don't decompress",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: -1,
readCompressed: true,
wantCheck: true,
desc: "uncompressed, entire file, don't decompress",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: -1,
readCompressed: true,
wantCheck: true,
wantDecompressed: false,
},
{
desc: "uncompressed, suffix",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 1,
length: -1,
readCompressed: false,
wantCheck: false,
desc: "uncompressed, suffix",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 1,
length: -1,
readCompressed: false,
wantCheck: false,
wantDecompressed: false,
},
{
desc: "uncompressed, prefix",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: 18,
readCompressed: false,
wantCheck: false,
desc: "uncompressed, prefix",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: 18,
readCompressed: false,
wantCheck: false,
wantDecompressed: false,
},
{
// When a gzipped file is unzipped on read, we can't verify the checksum
// because it was computed against the zipped contents. We can detect
// this case using http.Response.Uncompressed.
desc: "compressed, entire file, unzipped",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 0,
length: -1,
readCompressed: false,
wantCheck: false,
desc: "compressed, entire file, unzipped",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 0,
length: -1,
readCompressed: false,
wantCheck: false,
wantDecompressed: true,
},
{
// When we read a gzipped file uncompressed, it's like reading a regular file:
// the served content and the CRC match.
desc: "compressed, entire file, read compressed",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 0,
length: -1,
readCompressed: true,
wantCheck: true,
desc: "compressed, entire file, read compressed",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 0,
length: -1,
readCompressed: true,
wantCheck: true,
wantDecompressed: false,
},
{
desc: "compressed, partial, server unzips",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 1,
length: 8,
readCompressed: false,
wantErr: true, // GCS can't serve part of a gzipped object
wantCheck: false,
desc: "compressed, partial, server unzips",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 1,
length: 8,
readCompressed: false,
wantErr: true, // GCS can't serve part of a gzipped object
wantCheck: false,
wantDecompressed: true,
},
{
desc: "compressed, partial, read compressed",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 1,
length: 8,
readCompressed: true,
wantCheck: false,
desc: "compressed, partial, read compressed",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 1,
length: 8,
readCompressed: true,
wantCheck: false,
wantDecompressed: false,
},
} {
t.Run(test.desc, func(t *testing.T) {
Expand All @@ -3720,13 +3729,17 @@ func TestIntegration_ReadCRC(t *testing.T) {
if got, want := r.checkCRC, test.wantCheck; got != want {
t.Errorf("%s, checkCRC: got %t, want %t", test.desc, got, want)
}

if got, want := r.Attrs.Decompressed, test.wantDecompressed; got != want {
t.Errorf("Attrs.Decompressed: got %t, want %t", got, want)
}

_, err = c.readFunc(r)
_ = r.Close()
if err != nil {
t.Fatalf("%s: %v", test.desc, err)
}
})

}
})
}
Expand Down Expand Up @@ -4767,6 +4780,10 @@ func TestIntegration_Reader(t *testing.T) {
if got, want := rc.ContentType(), "text/plain"; got != want {
t.Errorf("ContentType (%q) = %q; want %q", obj, got, want)
}

if got, want := rc.Attrs.CRC32C, crc32c(contents[obj]); got != want {
t.Errorf("CRC32C (%q) = %d; want %d", obj, got, want)
}
rc.Close()

// Check early close.
Expand Down Expand Up @@ -4831,6 +4848,15 @@ func TestIntegration_Reader(t *testing.T) {
if len(slurp) != int(r.want) {
t.Fatalf("%+v: RangeReader (%d, %d): Read %d bytes, wanted %d bytes", r.desc, r.offset, r.length, len(slurp), r.want)
}
// JSON does not return the crc32c on partial reads, so
// allow got == 0.
if got, want := rc.Attrs.CRC32C, crc32c(contents[obj]); got != 0 && got != want {
t.Errorf("RangeReader CRC32C (%q) = %d; want %d", obj, got, want)
}

if rc.Attrs.Decompressed {
t.Errorf("RangeReader Decompressed (%q) = want false, got %v", obj, rc.Attrs.Decompressed)
}

switch {
case r.offset < 0: // The case of reading the last N bytes.
Expand Down Expand Up @@ -4912,6 +4938,7 @@ func TestIntegration_ReaderAttrs(t *testing.T) {
LastModified: got.LastModified, // ignored, tested separately
Generation: attrs.Generation,
Metageneration: attrs.Metageneration,
CRC32C: crc32c(c),
}
if got != want {
t.Fatalf("got\t%v,\nwanted\t%v", got, want)
Expand Down Expand Up @@ -5211,6 +5238,10 @@ func TestIntegration_NewReaderWithContentEncodingGzip(t *testing.T) {
if g, w := blob2kBTo3kB, original; !bytes.Equal(g, w) {
t.Fatalf("Body mismatch\nGot:\n%s\n\nWant:\n%s", g, w)
}

if !r2kBTo3kB.Attrs.Decompressed {
t.Errorf("Attrs.Decompressed: want true, got %v", r2kBTo3kB.Attrs.Decompressed)
}
})
}
})
Expand Down Expand Up @@ -6350,3 +6381,7 @@ func setUpRequesterPaysBucket(ctx context.Context, t *testing.T, bucket, object
}
})
}

func crc32c(b []byte) uint32 {
return crc32.Checksum(b, crc32.MakeTable(crc32.Castagnoli))
}
16 changes: 15 additions & 1 deletion storage/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ type ReaderObjectAttrs struct {
// meaningful in the context of a particular generation of a
// particular object.
Metageneration int64

// CRC32C is the CRC32 checksum of the entire object's content using the
// Castagnoli93 polynomial, if available.
CRC32C uint32

// Decompressed is true if the object is stored as a gzip file and was
// decompressed when read.
// Objects are automatically decompressed if the object's metadata property
// "Content-Encoding" is set to "gzip" or satisfies decompressive
// transcoding as per https://cloud.google.com/storage/docs/transcoding.
//
// To prevent decompression on reads, use [ObjectHandle.ReadCompressed].
Decompressed bool
}

// NewReader creates a new Reader to read the contents of the
Expand All @@ -91,7 +104,8 @@ func (o *ObjectHandle) NewReader(ctx context.Context) (*Reader, error) {
// If the object's metadata property "Content-Encoding" is set to "gzip" or satisfies
// decompressive transcoding per https://cloud.google.com/storage/docs/transcoding
// that file will be served back whole, regardless of the requested range as
// Google Cloud Storage dictates.
// Google Cloud Storage dictates. If decompressive transcoding occurs,
// [Reader.Attrs.Decompressed] will be true.
//
// By default, reads are made using the Cloud Storage XML API. We recommend
// using the JSON API instead, which can be done by setting [WithJSONReads]
Expand Down

0 comments on commit 245d2ea

Please sign in to comment.