Skip to content

Commit

Permalink
Store: keep track of lazy buffers in posting decoders
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Jun 22, 2023
1 parent 5a077c6 commit 0fcdfb9
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 12 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/hashicorp/golang-lru v0.6.0
github.com/jpillora/backoff v1.0.0
github.com/json-iterator/go v1.1.12
github.com/klauspost/compress v1.15.9
github.com/klauspost/compress v1.16.7-0.20230622111944-67a538e2b4df
github.com/leanovate/gopter v0.2.9
github.com/lightstep/lightstep-tracer-go v0.25.0
github.com/lovoo/gcloud-opentracing v0.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -609,8 +609,8 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.16.7-0.20230622111944-67a538e2b4df h1:YVmtlF3q1+H7fzHO2iCZ6n/LmfF3HfqshO/jIqbqpRU=
github.com/klauspost/compress v1.16.7-0.20230622111944-67a538e2b4df/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.1.0 h1:eyi1Ad2aNJMW95zcSbmGg7Cg6cq3ADwLpMAP96d8rF0=
Expand Down
55 changes: 46 additions & 9 deletions pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"sync"
"sync/atomic"

Check failure on line 12 in pkg/store/postings_codec.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

package "sync/atomic" shouldn't be imported, suggested: "go.uber.org/atomic"

"github.com/golang/snappy"
"github.com/klauspost/compress/s2"
Expand Down Expand Up @@ -58,12 +59,6 @@ func isDiffVarintSnappyStreamedEncodedPostings(input []byte) bool {
}

var (
readersPool = sync.Pool{
New: func() interface{} {
return s2.NewReader(nil, s2.ReaderAllocBlock(1<<10), s2.ReaderMaxBlockSize(64<<10))
},
}

writersPool = sync.Pool{
New: func() interface{} {
return s2.NewWriter(nil, s2.WriterSnappyCompat(), s2.WriterBetterCompression())
Expand Down Expand Up @@ -98,9 +93,51 @@ func compress(w io.Writer) io.WriteCloser {
return writeCloser{wr, &writersPool}
}

// trackingReadersPool keeps track of the average buffer capacity of the last 10
// returned readers. It uses this information to preallocate proper buffers for new readers.
type trackingReadersPool struct {
pool sync.Pool

capMu sync.Mutex
capIdx int
caps [10]int
runCapAvg atomic.Int64
}

func (p *trackingReadersPool) Get() *s2.Reader {
r := p.pool.Get()
if r != nil {
return r.(*s2.Reader)
}
return s2.NewReader(nil, s2.ReaderAllocBlock(p.estimatedBufferCapacity()), s2.ReaderMaxBlockSize(64<<10))
}

func (p *trackingReadersPool) Put(r *s2.Reader) {
p.capMu.Lock()
defer p.capMu.Unlock()

wout := p.caps[p.capIdx]
p.capIdx = (p.capIdx + 1) % 10
p.caps[p.capIdx] = r.GetBufferCapacity()
win := p.caps[p.capIdx]
p.runCapAvg.Add(int64(1 / 10 * (win - wout)))

p.pool.Put(r)
}

func (p *trackingReadersPool) estimatedBufferCapacity() int {
runAvg := int(p.runCapAvg.Load())
if runAvg < 1<<10 {
return 1 << 10
}
return runAvg
}

var readersPool = &trackingReadersPool{}

type reader struct {
reader *s2.Reader
pool *sync.Pool
pool *trackingReadersPool
}

func (r reader) ReadByte() (n byte, err error) {
Expand All @@ -113,9 +150,9 @@ func (r reader) ReadByte() (n byte, err error) {
}

func decompressByteReader(r io.Reader) io.ByteReader {
dr := readersPool.Get().(*s2.Reader)
dr := readersPool.Get()
dr.Reset(r)
return reader{dr, &readersPool}
return reader{dr, readersPool}
}

// estimateSnappyStreamSize estimates the number of bytes
Expand Down

0 comments on commit 0fcdfb9

Please sign in to comment.