Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: fix marshaling with sync.Pool #4593

Merged
merged 3 commits into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 33 additions & 20 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,32 +497,45 @@ func (m *SeriesResponse) Marshal() (dAtA []byte, err error) {

var respBuf []byte

// No pool defined, allocate directly.
// Slow path with no sync.Pool.
if m.respPool == nil {
respBuf = make([]byte, size)
} else {
if m.respBuf == nil {
poolBuf := m.respPool.Get()
if poolBuf == nil {
respBuf = make([]byte, size)
m.respBuf = &respBuf
} else {
m.respBuf = poolBuf.(*[]byte)
respBuf = *m.respBuf
}
m.respBuf = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line necessary?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I don't think so


n, err := m.MarshalToSizedBuffer(respBuf)
if err != nil {
return nil, err
}
return respBuf[len(respBuf)-n:], nil
}

// Fast path with sync.Pool.
// m.respBuf must not be nil so that it would be returned to the pool.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to non nil if we do the work below? Can we always do that and remove any outside call pool Get?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understood this comment. The callers using the old function pass nil thus we need to handle this case - it is not guaranteed that the pool exists

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, i thought we can simplify, and allow this method to Get, never pass it from outside - not a biggie


// If no pre-allocated buffer has been passed then try to get a new one.
if m.respBuf == nil {
poolBuf := m.respPool.Get()
// No previous buffer found in the pool, try to allocate.
if poolBuf == nil {
respBuf = make([]byte, size)
} else {
if cap(*m.respBuf) < size {
if m.respPool != nil {
m.respPool.Put(m.respBuf)
}
respBuf = make([]byte, size)
m.respBuf = &respBuf
} else {
respBuf = *m.respBuf
}
// Found something, let's see if it is big enough.
respBuf = *(poolBuf.(*[]byte))
}
} else {
respBuf = *m.respBuf
}

// Last sanity check of the size before the marshaling.
if cap(respBuf) < size {
if m.respPool != nil {
m.respPool.Put(&respBuf)
}
respBuf = make([]byte, size)
}
m.respBuf = &respBuf

// Possibly trim it so that there wouldn't be left-over "garbage" in the slice.
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
marshalBuf := respBuf[:size]
n, err := m.MarshalToSizedBuffer(marshalBuf)
if err != nil {
Expand Down
45 changes: 45 additions & 0 deletions pkg/store/storepb/custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"path/filepath"
"sort"
"sync"
"testing"

"github.com/pkg/errors"
Expand Down Expand Up @@ -525,3 +526,47 @@ func TestMatchersToString_Translate(t *testing.T) {

}
}

// Tests whether the Marshal() function properly checks the size
// of a slice returned from a sync.Pool.
// Reference: https://github.com/thanos-io/thanos/issues/4591.
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
func TestMarshalChecksSize(t *testing.T) {
var s Series

rawData := []rawSeries{
{
lset: labels.Labels{labels.Label{Name: "a", Value: "c"}},
chunks: [][]sample{
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
{{t: 1, v: 1}, {t: 2, v: 2}, {t: 3, v: 3}, {t: 4, v: 4}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 22}, {t: 24, v: 24}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
{{t: 15, v: 15}, {t: 16, v: 16}, {t: 17, v: 17}, {t: 18, v: 18}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 23}, {t: 24, v: 24}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
},
},
}

listSS := newListSeriesSet(t, rawData)
lset, chks := listSS.At()

s.Chunks = chks
s.Labels = labelpb.ZLabelsFromPromLabels(lset)

resp := NewSeriesResponse(&s)

smallPool := sync.Pool{
New: func() interface{} {
b := make([]byte, 1)
return &b
},
}
resp.respPool = &smallPool

d, err := resp.Marshal()
testutil.Ok(t, err)
testutil.Assert(t, d != nil)

resp.Close()
}