diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index 6c806cb2b4..efe711d593 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -497,32 +497,45 @@ func (m *SeriesResponse) Marshal() (dAtA []byte, err error) { var respBuf []byte - // No pool defined, allocate directly. + // Slow path with no sync.Pool. if m.respPool == nil { respBuf = make([]byte, size) - } else { - if m.respBuf == nil { - poolBuf := m.respPool.Get() - if poolBuf == nil { - respBuf = make([]byte, size) - m.respBuf = &respBuf - } else { - m.respBuf = poolBuf.(*[]byte) - respBuf = *m.respBuf - } + + n, err := m.MarshalToSizedBuffer(respBuf) + if err != nil { + return nil, err + } + return respBuf[len(respBuf)-n:], nil + } + + // Fast path with sync.Pool. + // m.respBuf must not be nil so that it would be returned to the pool. + + // If no pre-allocated buffer has been passed then try to get a new one. + if m.respBuf == nil { + poolBuf := m.respPool.Get() + // No previous buffer found in the pool, try to allocate. + if poolBuf == nil { + respBuf = make([]byte, size) } else { - if cap(*m.respBuf) < size { - if m.respPool != nil { - m.respPool.Put(m.respBuf) - } - respBuf = make([]byte, size) - m.respBuf = &respBuf - } else { - respBuf = *m.respBuf - } + // Found something, let's see if it is big enough. + respBuf = *(poolBuf.(*[]byte)) } + } else { + respBuf = *m.respBuf + } + + // Last sanity check of the size before the marshaling. + if cap(respBuf) < size { + if m.respPool != nil { + m.respPool.Put(&respBuf) + } + respBuf = make([]byte, size) } + m.respBuf = &respBuf + // Possibly trim it so that there wouldn't be left-over "garbage" in the slice. + // TODO: check if it is needed to always trim this. marshalBuf := respBuf[:size] n, err := m.MarshalToSizedBuffer(marshalBuf) if err != nil { diff --git a/pkg/store/storepb/custom_test.go b/pkg/store/storepb/custom_test.go index 35901d52bc..db4d34f2d0 100644 --- a/pkg/store/storepb/custom_test.go +++ b/pkg/store/storepb/custom_test.go @@ -7,6 +7,7 @@ import ( "fmt" "path/filepath" "sort" + "sync" "testing" "github.com/pkg/errors" @@ -525,3 +526,47 @@ func TestMatchersToString_Translate(t *testing.T) { } } + +// Tests whether the Marshal() function properly checks the size +// of a slice returned from a sync.Pool. +// Regression test against https://github.com/thanos-io/thanos/issues/4591. +func TestMarshalChecksSize(t *testing.T) { + var s Series + + rawData := []rawSeries{ + { + lset: labels.Labels{labels.Label{Name: "a", Value: "c"}}, + chunks: [][]sample{ + {{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}}, + {{t: 1, v: 1}, {t: 2, v: 2}, {t: 3, v: 3}, {t: 4, v: 4}}, + {{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 22}, {t: 24, v: 24}}, + {{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}}, + {{t: 15, v: 15}, {t: 16, v: 16}, {t: 17, v: 17}, {t: 18, v: 18}}, + {{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 23}, {t: 24, v: 24}}, + {{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}}, + }, + }, + } + + listSS := newListSeriesSet(t, rawData) + lset, chks := listSS.At() + + s.Chunks = chks + s.Labels = labelpb.ZLabelsFromPromLabels(lset) + + resp := NewSeriesResponse(&s) + + smallPool := sync.Pool{ + New: func() interface{} { + b := make([]byte, 1) + return &b + }, + } + resp.respPool = &smallPool + + d, err := resp.Marshal() + testutil.Ok(t, err) + testutil.Assert(t, d != nil) + + resp.Close() +}