From af30987715aa000110f4cd42840c4135b23713bc Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Mon, 12 Dec 2022 13:25:53 +1100 Subject: [PATCH 1/3] Use a shared pool of buffers across all DecbufFactory instances. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This results in a small improvement in performance and heap consumption: name old time/op new time/op delta NewStreamBinaryReader/20Names100Values-10 758µs ± 7% 726µs ± 1% ~ (p=0.111 n=5+4) NewStreamBinaryReader/20Names500Values-10 2.12ms ± 4% 2.15ms ± 4% ~ (p=0.548 n=5+5) NewStreamBinaryReader/20Names1000Values-10 3.49ms ± 2% 3.44ms ± 2% ~ (p=0.222 n=5+5) NewStreamBinaryReader/50Names100Values-10 1.54ms ± 4% 1.51ms ± 5% ~ (p=0.421 n=5+5) NewStreamBinaryReader/50Names500Values-10 4.33ms ± 1% 4.39ms ± 8% ~ (p=1.000 n=5+5) NewStreamBinaryReader/50Names1000Values-10 7.88ms ± 3% 7.84ms ± 3% ~ (p=0.690 n=5+5) NewStreamBinaryReader/100Names100Values-10 2.73ms ± 5% 2.57ms ± 5% -6.15% (p=0.032 n=5+5) NewStreamBinaryReader/100Names500Values-10 8.12ms ± 3% 8.02ms ± 2% ~ (p=0.548 n=5+5) NewStreamBinaryReader/100Names1000Values-10 15.3ms ± 3% 14.8ms ± 1% -3.43% (p=0.016 n=5+5) NewStreamBinaryReader/200Names100Values-10 4.87ms ± 7% 4.84ms ± 4% ~ (p=0.841 n=5+5) NewStreamBinaryReader/200Names500Values-10 15.9ms ± 2% 15.9ms ± 1% ~ (p=1.000 n=5+5) NewStreamBinaryReader/200Names1000Values-10 30.0ms ± 2% 30.2ms ± 3% ~ (p=1.000 n=5+5) name old alloc/op new alloc/op delta NewStreamBinaryReader/20Names100Values-10 3.36MB ± 0% 3.35MB ± 0% -0.29% (p=0.000 n=5+4) NewStreamBinaryReader/20Names500Values-10 4.00MB ± 0% 3.99MB ± 0% -0.25% (p=0.016 n=5+4) NewStreamBinaryReader/20Names1000Values-10 4.80MB ± 0% 4.79MB ± 0% -0.21% (p=0.008 n=5+5) NewStreamBinaryReader/50Names100Values-10 3.63MB ± 0% 3.62MB ± 0% -0.27% (p=0.016 n=4+5) NewStreamBinaryReader/50Names500Values-10 5.21MB ± 0% 5.20MB ± 0% -0.19% (p=0.008 n=5+5) NewStreamBinaryReader/50Names1000Values-10 7.23MB ± 0% 7.22MB ± 0% -0.14% (p=0.008 n=5+5) NewStreamBinaryReader/100Names100Values-10 4.08MB ± 0% 4.07MB ± 0% -0.24% (p=0.008 n=5+5) NewStreamBinaryReader/100Names500Values-10 7.24MB ± 0% 7.23MB ± 0% -0.14% (p=0.008 n=5+5) NewStreamBinaryReader/100Names1000Values-10 11.3MB ± 0% 11.3MB ± 0% -0.09% (p=0.029 n=4+4) NewStreamBinaryReader/200Names100Values-10 4.97MB ± 0% 4.96MB ± 0% -0.20% (p=0.008 n=5+5) NewStreamBinaryReader/200Names500Values-10 11.6MB ± 0% 11.6MB ± 0% -0.08% (p=0.008 n=5+5) NewStreamBinaryReader/200Names1000Values-10 20.1MB ± 0% 20.1MB ± 0% -0.05% (p=0.016 n=4+5) name old allocs/op new allocs/op delta NewStreamBinaryReader/20Names100Values-10 6.23k ± 0% 6.22k ± 0% -0.11% (p=0.029 n=4+4) NewStreamBinaryReader/20Names500Values-10 30.3k ± 0% 30.3k ± 0% -0.02% (p=0.029 n=4+4) NewStreamBinaryReader/20Names1000Values-10 60.3k ± 0% 60.3k ± 0% -0.01% (p=0.029 n=4+4) NewStreamBinaryReader/50Names100Values-10 15.5k ± 0% 15.5k ± 0% -0.05% (p=0.029 n=4+4) NewStreamBinaryReader/50Names500Values-10 75.6k ± 0% 75.6k ± 0% ~ (p=0.079 n=4+5) NewStreamBinaryReader/50Names1000Values-10 151k ± 0% 151k ± 0% -0.00% (p=0.029 n=4+4) NewStreamBinaryReader/100Names100Values-10 31.0k ± 0% 31.0k ± 0% -0.02% (p=0.000 n=4+5) NewStreamBinaryReader/100Names500Values-10 151k ± 0% 151k ± 0% -0.00% (p=0.000 n=5+4) NewStreamBinaryReader/100Names1000Values-10 301k ± 0% 301k ± 0% -0.00% (p=0.016 n=4+5) NewStreamBinaryReader/200Names100Values-10 61.9k ± 0% 61.9k ± 0% -0.01% (p=0.008 n=5+5) NewStreamBinaryReader/200Names500Values-10 302k ± 0% 302k ± 0% -0.00% (p=0.008 n=5+5) NewStreamBinaryReader/200Names1000Values-10 602k ± 0% 602k ± 0% -0.00% (p=0.016 n=5+4) --- .../indexheader/encoding/factory.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/storegateway/indexheader/encoding/factory.go b/pkg/storegateway/indexheader/encoding/factory.go index e91f7fd8321..6c3082276ad 100644 --- a/pkg/storegateway/indexheader/encoding/factory.go +++ b/pkg/storegateway/indexheader/encoding/factory.go @@ -19,17 +19,17 @@ const readerBufferSize = 4096 // DecbufFactory creates new file-backed decoding buffer instances for a specific index-header file. type DecbufFactory struct { - pool sync.Pool path string } +var bufferPool = sync.Pool{ + New: func() any { + return bufio.NewReaderSize(nil, readerBufferSize) + }, +} + func NewDecbufFactory(path string) *DecbufFactory { return &DecbufFactory{ - pool: sync.Pool{ - New: func() any { - return bufio.NewReaderSize(nil, readerBufferSize) - }, - }, path: path, } } @@ -65,7 +65,7 @@ func (df *DecbufFactory) NewDecbufAtChecked(offset int, table *crc32.Table) Decb return Decbuf{E: errors.Wrapf(ErrInvalidSize, "insufficient bytes read for size (got %d, wanted %d)", n, 4)} } - bufReader := df.pool.Get().(*bufio.Reader) + bufReader := bufferPool.Get().(*bufio.Reader) contentLength := int(binary.BigEndian.Uint32(lengthBytes)) bufferLength := len(lengthBytes) + contentLength + crc32.Size r, err := NewFileReader(f, offset, bufferLength, bufReader) @@ -124,7 +124,7 @@ func (df *DecbufFactory) NewRawDecbuf() Decbuf { return Decbuf{E: errors.Wrap(err, "stat file for decbuf")} } - bufReader := df.pool.Get().(*bufio.Reader) + bufReader := bufferPool.Get().(*bufio.Reader) fileSize := stat.Size() reader, err := NewFileReader(f, 0, int(fileSize), bufReader) if err != nil { @@ -138,7 +138,7 @@ func (df *DecbufFactory) NewRawDecbuf() Decbuf { // Close cleans up any resources associated with the Decbuf func (df *DecbufFactory) Close(d Decbuf) error { if d.r != nil { - df.pool.Put(d.r.buf) + bufferPool.Put(d.r.buf) } return d.close() From 9f7d5c9c7d49bd9497bcf904f94619315f535ad1 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Mon, 12 Dec 2022 13:29:54 +1100 Subject: [PATCH 2/3] Move buffer pool management into FileReader. --- .../indexheader/encoding/factory.go | 18 ++---------------- .../indexheader/encoding/reader.go | 13 +++++++++++-- .../indexheader/encoding/reader_test.go | 7 +++---- 3 files changed, 16 insertions(+), 22 deletions(-) diff --git a/pkg/storegateway/indexheader/encoding/factory.go b/pkg/storegateway/indexheader/encoding/factory.go index 6c3082276ad..0a8d19343ca 100644 --- a/pkg/storegateway/indexheader/encoding/factory.go +++ b/pkg/storegateway/indexheader/encoding/factory.go @@ -3,11 +3,9 @@ package encoding import ( - "bufio" "encoding/binary" "hash/crc32" "os" - "sync" "github.com/grafana/dskit/multierror" "github.com/pkg/errors" @@ -22,12 +20,6 @@ type DecbufFactory struct { path string } -var bufferPool = sync.Pool{ - New: func() any { - return bufio.NewReaderSize(nil, readerBufferSize) - }, -} - func NewDecbufFactory(path string) *DecbufFactory { return &DecbufFactory{ path: path, @@ -65,10 +57,9 @@ func (df *DecbufFactory) NewDecbufAtChecked(offset int, table *crc32.Table) Decb return Decbuf{E: errors.Wrapf(ErrInvalidSize, "insufficient bytes read for size (got %d, wanted %d)", n, 4)} } - bufReader := bufferPool.Get().(*bufio.Reader) contentLength := int(binary.BigEndian.Uint32(lengthBytes)) bufferLength := len(lengthBytes) + contentLength + crc32.Size - r, err := NewFileReader(f, offset, bufferLength, bufReader) + r, err := NewFileReader(f, offset, bufferLength) if err != nil { return Decbuf{E: errors.Wrap(err, "create file reader")} } @@ -124,9 +115,8 @@ func (df *DecbufFactory) NewRawDecbuf() Decbuf { return Decbuf{E: errors.Wrap(err, "stat file for decbuf")} } - bufReader := bufferPool.Get().(*bufio.Reader) fileSize := stat.Size() - reader, err := NewFileReader(f, 0, int(fileSize), bufReader) + reader, err := NewFileReader(f, 0, int(fileSize)) if err != nil { return Decbuf{E: errors.Wrap(err, "file reader for decbuf")} } @@ -137,10 +127,6 @@ func (df *DecbufFactory) NewRawDecbuf() Decbuf { // Close cleans up any resources associated with the Decbuf func (df *DecbufFactory) Close(d Decbuf) error { - if d.r != nil { - bufferPool.Put(d.r.buf) - } - return d.close() } diff --git a/pkg/storegateway/indexheader/encoding/reader.go b/pkg/storegateway/indexheader/encoding/reader.go index 1dc9bab673f..0cb238be6ac 100644 --- a/pkg/storegateway/indexheader/encoding/reader.go +++ b/pkg/storegateway/indexheader/encoding/reader.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "os" + "sync" ) type FileReader struct { @@ -18,12 +19,18 @@ type FileReader struct { pos int } +var bufferPool = sync.Pool{ + New: func() any { + return bufio.NewReaderSize(nil, readerBufferSize) + }, +} + // NewFileReader creates a new FileReader for the segment of file beginning at base bytes // extending length bytes using the supplied buffered reader. -func NewFileReader(file *os.File, base, length int, buf *bufio.Reader) (*FileReader, error) { +func NewFileReader(file *os.File, base, length int) (*FileReader, error) { f := &FileReader{ file: file, - buf: buf, + buf: bufferPool.Get().(*bufio.Reader), base: base, length: length, } @@ -143,5 +150,7 @@ func (f *FileReader) Len() int { // is unexported to ensure that all resource management is handled by DecbufFactory // which pools resources. func (f *FileReader) close() error { + bufferPool.Put(f.buf) + return f.file.Close() } diff --git a/pkg/storegateway/indexheader/encoding/reader_test.go b/pkg/storegateway/indexheader/encoding/reader_test.go index 1b6d9da25a7..5a44ddc1aeb 100644 --- a/pkg/storegateway/indexheader/encoding/reader_test.go +++ b/pkg/storegateway/indexheader/encoding/reader_test.go @@ -3,7 +3,6 @@ package encoding import ( - "bufio" "os" "path" "testing" @@ -185,7 +184,7 @@ func TestReaders_CreationWithEmptyContents(t *testing.T) { require.NoError(t, f.Close()) }) - r, err := NewFileReader(f, 0, 0, bufio.NewReader(f)) + r, err := NewFileReader(f, 0, 0) require.NoError(t, err) require.ErrorIs(t, r.Skip(1), ErrInvalidSize) require.ErrorIs(t, r.ResetAt(1), ErrInvalidSize) @@ -206,7 +205,7 @@ func testReaders(t *testing.T, test func(t *testing.T, r *FileReader)) { require.NoError(t, f.Close()) }) - r, err := NewFileReader(f, 0, len(testReaderContents), bufio.NewReader(f)) + r, err := NewFileReader(f, 0, len(testReaderContents)) require.NoError(t, err) test(t, r) @@ -226,7 +225,7 @@ func testReaders(t *testing.T, test func(t *testing.T, r *FileReader)) { require.NoError(t, f.Close()) }) - r, err := NewFileReader(f, len(offsetBytes), len(testReaderContents), bufio.NewReader(f)) + r, err := NewFileReader(f, len(offsetBytes), len(testReaderContents)) require.NoError(t, err) test(t, r) From 9e85f11f077cb5b73fa81c65618184ebff735684 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 13 Dec 2022 09:56:06 +1100 Subject: [PATCH 3/3] Add note about how pooled buffers are cleaned up. --- pkg/storegateway/indexheader/encoding/reader.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/storegateway/indexheader/encoding/reader.go b/pkg/storegateway/indexheader/encoding/reader.go index 0cb238be6ac..730fd646c17 100644 --- a/pkg/storegateway/indexheader/encoding/reader.go +++ b/pkg/storegateway/indexheader/encoding/reader.go @@ -150,6 +150,8 @@ func (f *FileReader) Len() int { // is unexported to ensure that all resource management is handled by DecbufFactory // which pools resources. func (f *FileReader) close() error { + // Note that we don't do anything to clean up the buffer before returning it to the pool here: + // we reset the buffer when we retrieve it from the pool instead. bufferPool.Put(f.buf) return f.file.Close()