Skip to content

Commit

Permalink
store: fix marshaling with sync.Pool (#4593)
Browse files Browse the repository at this point in the history
* store: fix marshaling with sync.Pool

Do not forget to check the length of a slice returned by sync.Pool.
Annotate the whole function with comments to aid understanding of it.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* store: fix according to comments

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* store: improve comment as per Bartek's suggestion

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS authored Aug 24, 2021
1 parent 8ab7fa2 commit 8b4c3c9
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 20 deletions.
53 changes: 33 additions & 20 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,32 +497,45 @@ func (m *SeriesResponse) Marshal() (dAtA []byte, err error) {

var respBuf []byte

// No pool defined, allocate directly.
// Slow path with no sync.Pool.
if m.respPool == nil {
respBuf = make([]byte, size)
} else {
if m.respBuf == nil {
poolBuf := m.respPool.Get()
if poolBuf == nil {
respBuf = make([]byte, size)
m.respBuf = &respBuf
} else {
m.respBuf = poolBuf.(*[]byte)
respBuf = *m.respBuf
}

n, err := m.MarshalToSizedBuffer(respBuf)
if err != nil {
return nil, err
}
return respBuf[len(respBuf)-n:], nil
}

// Fast path with sync.Pool.
// m.respBuf must not be nil so that it would be returned to the pool.

// If no pre-allocated buffer has been passed then try to get a new one.
if m.respBuf == nil {
poolBuf := m.respPool.Get()
// No previous buffer found in the pool, try to allocate.
if poolBuf == nil {
respBuf = make([]byte, size)
} else {
if cap(*m.respBuf) < size {
if m.respPool != nil {
m.respPool.Put(m.respBuf)
}
respBuf = make([]byte, size)
m.respBuf = &respBuf
} else {
respBuf = *m.respBuf
}
// Found something, let's see if it is big enough.
respBuf = *(poolBuf.(*[]byte))
}
} else {
respBuf = *m.respBuf
}

// Last sanity check of the size before the marshaling.
if cap(respBuf) < size {
if m.respPool != nil {
m.respPool.Put(&respBuf)
}
respBuf = make([]byte, size)
}
m.respBuf = &respBuf

// Possibly trim it so that there wouldn't be left-over "garbage" in the slice.
// TODO: check if it is needed to always trim this.
marshalBuf := respBuf[:size]
n, err := m.MarshalToSizedBuffer(marshalBuf)
if err != nil {
Expand Down
45 changes: 45 additions & 0 deletions pkg/store/storepb/custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"path/filepath"
"sort"
"sync"
"testing"

"github.com/pkg/errors"
Expand Down Expand Up @@ -525,3 +526,47 @@ func TestMatchersToString_Translate(t *testing.T) {

}
}

// Tests whether the Marshal() function properly checks the size
// of a slice returned from a sync.Pool.
// Regression test against https://github.com/thanos-io/thanos/issues/4591.
func TestMarshalChecksSize(t *testing.T) {
var s Series

rawData := []rawSeries{
{
lset: labels.Labels{labels.Label{Name: "a", Value: "c"}},
chunks: [][]sample{
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
{{t: 1, v: 1}, {t: 2, v: 2}, {t: 3, v: 3}, {t: 4, v: 4}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 22}, {t: 24, v: 24}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
{{t: 15, v: 15}, {t: 16, v: 16}, {t: 17, v: 17}, {t: 18, v: 18}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 23}, {t: 24, v: 24}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
},
},
}

listSS := newListSeriesSet(t, rawData)
lset, chks := listSS.At()

s.Chunks = chks
s.Labels = labelpb.ZLabelsFromPromLabels(lset)

resp := NewSeriesResponse(&s)

smallPool := sync.Pool{
New: func() interface{} {
b := make([]byte, 1)
return &b
},
}
resp.respPool = &smallPool

d, err := resp.Marshal()
testutil.Ok(t, err)
testutil.Assert(t, d != nil)

resp.Close()
}

0 comments on commit 8b4c3c9

Please sign in to comment.