Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): add full object checksum to reader.Attrs #10538

Merged
merged 4 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,9 +1101,11 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
wantCRC uint32
checkCRC bool
)
if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil && params.offset == 0 && params.length < 0 {
if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil {
if params.offset == 0 && params.length < 0 {
checkCRC = true
}
wantCRC = checksums.GetCrc32C()
checkCRC = true
}

r = &Reader{
Expand All @@ -1115,6 +1117,7 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
LastModified: obj.GetUpdateTime().AsTime(),
Metageneration: obj.GetMetageneration(),
Generation: obj.GetGeneration(),
CRC32C: wantCRC,
},
reader: &gRPCReader{
stream: res.stream,
Expand Down
28 changes: 16 additions & 12 deletions storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1415,18 +1415,20 @@ func parseReadResponse(res *http.Response, params *newRangeReaderParams, reopen
}
} else {
size = res.ContentLength
// Check the CRC iff all of the following hold:
// - We asked for content (length != 0).
// - We got all the content (status != PartialContent).
// - The server sent a CRC header.
// - The Go http stack did not uncompress the file.
// - We were not served compressed data that was uncompressed on download.
// The problem with the last two cases is that the CRC will not match -- GCS
// computes it on the compressed contents, but we compute it on the
// uncompressed contents.
if params.length != 0 && !res.Uncompressed && !uncompressedByServer(res) {
crc, checkCRC = parseCRC32c(res)
}
}

// Check the CRC iff all of the following hold:
// - We asked for content (length != 0).
// - We got all the content (status != PartialContent).
// - The server sent a CRC header.
// - The Go http stack did not uncompress the file.
// - We were not served compressed data that was uncompressed on download.
// The problem with the last two cases is that the CRC will not match -- GCS
// computes it on the compressed contents, but we compute it on the
// uncompressed contents.
crc, checkCRC = parseCRC32c(res)
if params.length == 0 || res.StatusCode == http.StatusPartialContent || res.Uncompressed || uncompressedByServer(res) {
checkCRC = false
}

remain := res.ContentLength
Expand Down Expand Up @@ -1463,6 +1465,8 @@ func parseReadResponse(res *http.Response, params *newRangeReaderParams, reopen
StartOffset: startOffset,
Generation: params.gen,
Metageneration: metaGen,
CRC32C: crc,
Decompressed: res.Uncompressed || uncompressedByServer(res),
}
return &Reader{
Attrs: attrs,
Expand Down
139 changes: 87 additions & 52 deletions storage/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3631,78 +3631,87 @@ func TestIntegration_ReadCRC(t *testing.T) {
offset, length int64
readCompressed bool // don't decompress a gzipped file

wantErr bool
wantCheck bool // Should Reader try to check the CRC?
wantErr bool
wantCheck bool // Should Reader try to check the CRC?
wantDecompressed bool
}{
{
desc: "uncompressed, entire file",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: -1,
readCompressed: false,
wantCheck: true,
desc: "uncompressed, entire file",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: -1,
readCompressed: false,
wantCheck: true,
wantDecompressed: false,
},
{
desc: "uncompressed, entire file, don't decompress",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: -1,
readCompressed: true,
wantCheck: true,
desc: "uncompressed, entire file, don't decompress",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: -1,
readCompressed: true,
wantCheck: true,
wantDecompressed: false,
},
{
desc: "uncompressed, suffix",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 1,
length: -1,
readCompressed: false,
wantCheck: false,
desc: "uncompressed, suffix",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 1,
length: -1,
readCompressed: false,
wantCheck: false,
wantDecompressed: false,
},
{
desc: "uncompressed, prefix",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: 18,
readCompressed: false,
wantCheck: false,
desc: "uncompressed, prefix",
obj: client.Bucket(uncompressedBucket).Object(uncompressedObject),
offset: 0,
length: 18,
readCompressed: false,
wantCheck: false,
wantDecompressed: false,
},
{
// When a gzipped file is unzipped on read, we can't verify the checksum
// because it was computed against the zipped contents. We can detect
// this case using http.Response.Uncompressed.
desc: "compressed, entire file, unzipped",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 0,
length: -1,
readCompressed: false,
wantCheck: false,
desc: "compressed, entire file, unzipped",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 0,
length: -1,
readCompressed: false,
wantCheck: false,
wantDecompressed: true,
},
{
// When we read a gzipped file uncompressed, it's like reading a regular file:
// the served content and the CRC match.
desc: "compressed, entire file, read compressed",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 0,
length: -1,
readCompressed: true,
wantCheck: true,
desc: "compressed, entire file, read compressed",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 0,
length: -1,
readCompressed: true,
wantCheck: true,
wantDecompressed: false,
},
{
desc: "compressed, partial, server unzips",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 1,
length: 8,
readCompressed: false,
wantErr: true, // GCS can't serve part of a gzipped object
wantCheck: false,
desc: "compressed, partial, server unzips",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 1,
length: 8,
readCompressed: false,
wantErr: true, // GCS can't serve part of a gzipped object
wantCheck: false,
wantDecompressed: true,
},
{
desc: "compressed, partial, read compressed",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 1,
length: 8,
readCompressed: true,
wantCheck: false,
desc: "compressed, partial, read compressed",
obj: client.Bucket(bucket).Object(gzippedObject),
offset: 1,
length: 8,
readCompressed: true,
wantCheck: false,
wantDecompressed: false,
},
} {
t.Run(test.desc, func(t *testing.T) {
Expand All @@ -3720,13 +3729,17 @@ func TestIntegration_ReadCRC(t *testing.T) {
if got, want := r.checkCRC, test.wantCheck; got != want {
t.Errorf("%s, checkCRC: got %t, want %t", test.desc, got, want)
}

if got, want := r.Attrs.Decompressed, test.wantDecompressed; got != want {
t.Errorf("Attrs.Decompressed: got %t, want %t", got, want)
}

_, err = c.readFunc(r)
_ = r.Close()
if err != nil {
t.Fatalf("%s: %v", test.desc, err)
}
})

}
})
}
Expand Down Expand Up @@ -4767,6 +4780,10 @@ func TestIntegration_Reader(t *testing.T) {
if got, want := rc.ContentType(), "text/plain"; got != want {
t.Errorf("ContentType (%q) = %q; want %q", obj, got, want)
}

if got, want := rc.Attrs.CRC32C, crc32c(contents[obj]); got != want {
t.Errorf("CRC32C (%q) = %d; want %d", obj, got, want)
}
rc.Close()

// Check early close.
Expand Down Expand Up @@ -4831,6 +4848,15 @@ func TestIntegration_Reader(t *testing.T) {
if len(slurp) != int(r.want) {
t.Fatalf("%+v: RangeReader (%d, %d): Read %d bytes, wanted %d bytes", r.desc, r.offset, r.length, len(slurp), r.want)
}
// JSON does not return the crc32c on partial reads, so
// allow got == 0.
if got, want := rc.Attrs.CRC32C, crc32c(contents[obj]); got != 0 && got != want {
t.Errorf("RangeReader CRC32C (%q) = %d; want %d", obj, got, want)
}

if rc.Attrs.Decompressed {
t.Errorf("RangeReader Decompressed (%q) = want false, got %v", obj, rc.Attrs.Decompressed)
}

switch {
case r.offset < 0: // The case of reading the last N bytes.
Expand Down Expand Up @@ -4912,6 +4938,7 @@ func TestIntegration_ReaderAttrs(t *testing.T) {
LastModified: got.LastModified, // ignored, tested separately
Generation: attrs.Generation,
Metageneration: attrs.Metageneration,
CRC32C: crc32c(c),
}
if got != want {
t.Fatalf("got\t%v,\nwanted\t%v", got, want)
Expand Down Expand Up @@ -5211,6 +5238,10 @@ func TestIntegration_NewReaderWithContentEncodingGzip(t *testing.T) {
if g, w := blob2kBTo3kB, original; !bytes.Equal(g, w) {
t.Fatalf("Body mismatch\nGot:\n%s\n\nWant:\n%s", g, w)
}

if !r2kBTo3kB.Attrs.Decompressed {
t.Errorf("Attrs.Decompressed: want true, got %v", r2kBTo3kB.Attrs.Decompressed)
}
})
}
})
Expand Down Expand Up @@ -6350,3 +6381,7 @@ func setUpRequesterPaysBucket(ctx context.Context, t *testing.T, bucket, object
}
})
}

func crc32c(b []byte) uint32 {
return crc32.Checksum(b, crc32.MakeTable(crc32.Castagnoli))
}
8 changes: 8 additions & 0 deletions storage/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ type ReaderObjectAttrs struct {
// meaningful in the context of a particular generation of a
// particular object.
Metageneration int64

// CRC32C is the CRC32 checksum of the entire object's content using the
// Castagnoli93 polynomial, if available.
CRC32C uint32

// Decompressed is true if the object was read decompressed and different
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a little vague, I would make something that more closely echoes the NewRangeReader docs about transcoding (and/or update those docs to clarify that this field will be True if decompressive transcoding occurs).

// than its original storage form.
Decompressed bool
}

// NewReader creates a new Reader to read the contents of the
Expand Down
Loading