Skip to content

Commit

Permalink
Re-instate (Decompressor).DecompressedSize optimization
Browse files Browse the repository at this point in the history
This is a) for parity with how gRPC v1.56.3 worked. But also
it showed up as a giant regression in allocations, as we were
now going through `io.Copy` which allocates a temporary
32KiB buffer. Our payloads are often much smaller.
  • Loading branch information
tbg committed Nov 29, 2024
1 parent 3093b97 commit 5710e83
Showing 1 changed file with 24 additions and 18 deletions.
42 changes: 24 additions & 18 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"compress/gzip"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -880,24 +881,29 @@ func decompress(compressor encoding.Compressor, d mem.BufferSlice, maxReceiveMes
// TODO: Can/should this still be preserved with the new BufferSlice API? Are
// there any actual benefits to allocating a single large buffer instead of
// multiple smaller ones?
//if sizer, ok := compressor.(interface {
// DecompressedSize(compressedBytes []byte) int
//}); ok {
// if size := sizer.DecompressedSize(d); size >= 0 {
// if size > maxReceiveMessageSize {
// return nil, size, nil
// }
// // size is used as an estimate to size the buffer, but we
// // will read more data if available.
// // +MinRead so ReadFrom will not reallocate if size is correct.
// //
// // TODO: If we ensure that the buffer size is the same as the DecompressedSize,
// // we can also utilize the recv buffer pool here.
// buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead))
// bytesRead, err := buf.ReadFrom(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
// return buf.Bytes(), int(bytesRead), err
// }
//}
if sizer, ok := compressor.(interface {
DecompressedSize(compressedBytes io.Reader) int
}); ok {
if size := sizer.DecompressedSize(d.Reader()); size >= 0 {
if size > maxReceiveMessageSize {
return nil, size, nil
}
// DecompressedSize needs to be exact.
buf := pool.Get(size)
if _, err := io.ReadFull(dcReader, *buf); err != nil {
return nil, 0, err
}
// This read is crucial because some compressors optimize for having
// been read entirely, for example by releasing memory back to a pool.
if _, err := dcReader.Read(nil); err != io.EOF {
if err == nil {
err = errors.New("read after decompressed size")
}
return nil, 0, err
}
return mem.BufferSlice{mem.NewBuffer(buf, pool)}, size, nil
}
}

var out mem.BufferSlice
_, err = io.Copy(mem.NewWriter(&out, pool), io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
Expand Down

0 comments on commit 5710e83

Please sign in to comment.