Skip to content

Commit

Permalink
Store: smaller block size for snappy decoding postings
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Jun 21, 2023
1 parent 2ef6c76 commit 9c0476f
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 20 deletions.
16 changes: 0 additions & 16 deletions pkg/extgrpc/snappy/snappy.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
return writeCloser{wr, &c.writersPool}, nil
}

func (c *compressor) DecompressByteReader(r io.Reader) (io.ByteReader, error) {
dr := c.readersPool.Get().(*snappy.Reader)
dr.Reset(r)
return reader{dr, &c.readersPool}, nil
}

func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
dr := c.readersPool.Get().(*snappy.Reader)
dr.Reset(r)
Expand Down Expand Up @@ -96,13 +90,3 @@ func (r reader) Read(p []byte) (n int, err error) {
}
return n, err
}

func (r reader) ReadByte() (n byte, err error) {
n, err = r.reader.ReadByte()
if err == io.EOF {
r.reader.Reset(nil)
r.pool.Put(r.reader)
}
return n, err

}
68 changes: 64 additions & 4 deletions pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/index"
extsnappy "github.com/thanos-io/thanos/pkg/extgrpc/snappy"
)

// This file implements encoding and decoding of postings using diff (or delta) + varint
Expand Down Expand Up @@ -58,6 +57,67 @@ func isDiffVarintSnappyStreamedEncodedPostings(input []byte) bool {
return bytes.HasPrefix(input, []byte(codecHeaderStreamedSnappy))
}

var (
readersPool = sync.Pool{
New: func() interface{} {
return s2.NewReader(nil, s2.ReaderMaxBlockSize(4<<10))
},
}

writersPool = sync.Pool{
New: func() interface{} {
return s2.NewWriter(nil)
},
}
)

type writeCloser struct {
writer *s2.Writer
pool *sync.Pool
}

func (w writeCloser) Write(p []byte) (n int, err error) {
return w.writer.Write(p)
}

func (w writeCloser) Close() error {
defer func() {
w.writer.Reset(nil)
w.pool.Put(w.writer)
}()

if w.writer != nil {
return w.writer.Close()
}
return nil
}

func compress(w io.Writer) (io.WriteCloser, error) {
wr := writersPool.Get().(*s2.Writer)
wr.Reset(w)
return writeCloser{wr, &writersPool}, nil
}

type reader struct {
reader *s2.Reader
pool *sync.Pool
}

func (r reader) ReadByte() (n byte, err error) {
n, err = r.reader.ReadByte()
if err == io.EOF {
r.reader.Reset(nil)
r.pool.Put(r.reader)
}
return n, err
}

func decompressByteReader(r io.Reader) (io.ByteReader, error) {
dr := readersPool.Get().(*s2.Reader)
dr.Reset(r)
return reader{dr, &readersPool}, nil
}

// estimateSnappyStreamSize estimates the number of bytes
// needed for encoding length postings. Note that in reality
// the number of bytes needed could be much bigger if postings
Expand Down Expand Up @@ -98,7 +158,7 @@ func diffVarintSnappyStreamedEncode(p index.Postings, length int) ([]byte, error

uvarintEncodeBuf := make([]byte, binary.MaxVarintLen64)

sw, err := extsnappy.Compressor.Compress(compressedBuf)
sw, err := compress(compressedBuf)
if err != nil {
return nil, fmt.Errorf("creating snappy compressor: %w", err)
}
Expand Down Expand Up @@ -145,7 +205,7 @@ type streamedDiffVarintPostings struct {
}

func newStreamedDiffVarintPostings(input []byte) (closeablePostings, error) {
r, err := extsnappy.Compressor.DecompressByteReader(bytes.NewBuffer(input))
r, err := decompressByteReader(bytes.NewBuffer(input))
if err != nil {
return nil, fmt.Errorf("decompressing snappy postings: %w", err)
}
Expand Down Expand Up @@ -348,7 +408,7 @@ func snappyStreamedEncode(postingsLength int, diffVarintPostings []byte) ([]byte
return nil, fmt.Errorf("short-write streamed snappy header")
}

sw, err := extsnappy.Compressor.Compress(compressedBuf)
sw, err := compress(compressedBuf)
if err != nil {
return nil, fmt.Errorf("creating snappy compressor: %w", err)
}
Expand Down

0 comments on commit 9c0476f

Please sign in to comment.