From 0fcdfb997b37cec02ce53dbb59caae1c5caae362 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Thu, 22 Jun 2023 19:42:15 +0200 Subject: [PATCH] Store: keep track of lazy buffers in posting decoders Signed-off-by: Michael Hoffmann --- go.mod | 2 +- go.sum | 4 +-- pkg/store/postings_codec.go | 55 +++++++++++++++++++++++++++++++------ 3 files changed, 49 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index 44c7e822251..942dfa04ee4 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/hashicorp/golang-lru v0.6.0 github.com/jpillora/backoff v1.0.0 github.com/json-iterator/go v1.1.12 - github.com/klauspost/compress v1.15.9 + github.com/klauspost/compress v1.16.7-0.20230622111944-67a538e2b4df github.com/leanovate/gopter v0.2.9 github.com/lightstep/lightstep-tracer-go v0.25.0 github.com/lovoo/gcloud-opentracing v0.3.0 diff --git a/go.sum b/go.sum index d801f1f5851..2db84b89851 100644 --- a/go.sum +++ b/go.sum @@ -609,8 +609,8 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= -github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.16.7-0.20230622111944-67a538e2b4df h1:YVmtlF3q1+H7fzHO2iCZ6n/LmfF3HfqshO/jIqbqpRU= +github.com/klauspost/compress v1.16.7-0.20230622111944-67a538e2b4df/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.1.0 h1:eyi1Ad2aNJMW95zcSbmGg7Cg6cq3ADwLpMAP96d8rF0= diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index bb4ff2ad017..de07d715533 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "sync" + "sync/atomic" "github.com/golang/snappy" "github.com/klauspost/compress/s2" @@ -58,12 +59,6 @@ func isDiffVarintSnappyStreamedEncodedPostings(input []byte) bool { } var ( - readersPool = sync.Pool{ - New: func() interface{} { - return s2.NewReader(nil, s2.ReaderAllocBlock(1<<10), s2.ReaderMaxBlockSize(64<<10)) - }, - } - writersPool = sync.Pool{ New: func() interface{} { return s2.NewWriter(nil, s2.WriterSnappyCompat(), s2.WriterBetterCompression()) @@ -98,9 +93,51 @@ func compress(w io.Writer) io.WriteCloser { return writeCloser{wr, &writersPool} } +// trackingReadersPool keeps track of the average buffer capacity of the last 10 +// returned readers. It uses this information to preallocate proper buffers for new readers. +type trackingReadersPool struct { + pool sync.Pool + + capMu sync.Mutex + capIdx int + caps [10]int + runCapAvg atomic.Int64 +} + +func (p *trackingReadersPool) Get() *s2.Reader { + r := p.pool.Get() + if r != nil { + return r.(*s2.Reader) + } + return s2.NewReader(nil, s2.ReaderAllocBlock(p.estimatedBufferCapacity()), s2.ReaderMaxBlockSize(64<<10)) +} + +func (p *trackingReadersPool) Put(r *s2.Reader) { + p.capMu.Lock() + defer p.capMu.Unlock() + + wout := p.caps[p.capIdx] + p.capIdx = (p.capIdx + 1) % 10 + p.caps[p.capIdx] = r.GetBufferCapacity() + win := p.caps[p.capIdx] + p.runCapAvg.Add(int64(1 / 10 * (win - wout))) + + p.pool.Put(r) +} + +func (p *trackingReadersPool) estimatedBufferCapacity() int { + runAvg := int(p.runCapAvg.Load()) + if runAvg < 1<<10 { + return 1 << 10 + } + return runAvg +} + +var readersPool = &trackingReadersPool{} + type reader struct { reader *s2.Reader - pool *sync.Pool + pool *trackingReadersPool } func (r reader) ReadByte() (n byte, err error) { @@ -113,9 +150,9 @@ func (r reader) ReadByte() (n byte, err error) { } func decompressByteReader(r io.Reader) io.ByteReader { - dr := readersPool.Get().(*s2.Reader) + dr := readersPool.Get() dr.Reset(r) - return reader{dr, &readersPool} + return reader{dr, readersPool} } // estimateSnappyStreamSize estimates the number of bytes