Skip to content

Commit

Permalink
[v2] store: reuse buffers for serializing Series() responses (thanos-…
Browse files Browse the repository at this point in the history
…io#4535)

* store: reuse buffers for serializing Series() responses

Compared with v1:
* Uses less CPU because Size() is called only once just as before;

* Uses `protoc-go-inject-field` to inject extra fields into SeriesResponse
which are used for marshaling the responses;

* Uses []byte instead of bytes.Buffer because bytes.Buffer allocates 2x
amount of memory and in our cases the responses are more or less of the
same size so less memory is wasted;

I think this version is more maintainable since the details of the
allocation are hidden from the Series() function itself, it all happens
in the Marshal() function.

Benchmarks:
```
name                                                           old time/op    new time/op    delta
BucketSeries/1000000SeriesWith1Samples/1of1000000-16             97.8ms ± 6%    89.4ms ± 8%   -8.63%  (p=0.000 n=49+45)
BucketSeries/1000000SeriesWith1Samples/10of1000000-16            92.7ms ±12%    88.6ms ± 7%   -4.42%  (p=0.000 n=48+47)
BucketSeries/1000000SeriesWith1Samples/1000000of1000000-16        1.14s ± 4%     1.11s ± 3%   -2.89%  (p=0.000 n=50+49)
BucketSeries/100000SeriesWith100Samples/1of10000000-16           6.58ms ± 2%    6.70ms ± 9%     ~     (p=0.125 n=46+50)
BucketSeries/100000SeriesWith100Samples/100of10000000-16         6.73ms ± 5%    6.82ms ± 9%     ~     (p=0.575 n=49+50)
BucketSeries/100000SeriesWith100Samples/10000000of10000000-16     123ms ± 5%     119ms ± 7%   -2.92%  (p=0.000 n=48+50)
BucketSeries/1SeriesWith10000000Samples/1of10000000-16            131µs ± 6%     129µs ± 6%   -0.85%  (p=0.027 n=47+47)
BucketSeries/1SeriesWith10000000Samples/100of10000000-16          129µs ± 2%     129µs ± 8%     ~     (p=0.358 n=44+49)
BucketSeries/1SeriesWith10000000Samples/10000000of10000000-16    38.6ms ± 9%    34.7ms ±10%  -10.11%  (p=0.000 n=49+50)

name                                                           old alloc/op   new alloc/op   delta
BucketSeries/1000000SeriesWith1Samples/1of1000000-16             62.0MB ± 0%    62.0MB ± 0%     ~     (p=0.529 n=47+50)
BucketSeries/1000000SeriesWith1Samples/10of1000000-16            62.1MB ± 0%    62.1MB ± 0%     ~     (p=0.334 n=49+50)
BucketSeries/1000000SeriesWith1Samples/1000000of1000000-16       1.37GB ± 0%    1.27GB ± 0%   -7.03%  (p=0.000 n=49+50)
BucketSeries/100000SeriesWith100Samples/1of10000000-16           4.85MB ± 0%    4.85MB ± 0%     ~     (p=0.365 n=50+50)
BucketSeries/100000SeriesWith100Samples/100of10000000-16         4.86MB ± 0%    4.86MB ± 0%     ~     (p=0.579 n=50+50)
BucketSeries/100000SeriesWith100Samples/10000000of10000000-16     157MB ± 4%     130MB ± 5%  -16.99%  (p=0.000 n=50+50)
BucketSeries/1SeriesWith10000000Samples/1of10000000-16            213kB ± 0%     213kB ± 0%   +0.14%  (p=0.000 n=50+48)
BucketSeries/1SeriesWith10000000Samples/100of10000000-16          213kB ± 0%     213kB ± 0%   +0.14%  (p=0.000 n=50+50)
BucketSeries/1SeriesWith10000000Samples/10000000of10000000-16     115MB ± 0%      62MB ± 8%  -45.98%  (p=0.000 n=49+50)

name                                                           old allocs/op  new allocs/op  delta
BucketSeries/1000000SeriesWith1Samples/1of1000000-16              9.69k ± 0%     9.70k ± 1%     ~     (p=0.143 n=49+50)
BucketSeries/1000000SeriesWith1Samples/10of1000000-16             9.79k ± 0%     9.79k ± 0%     ~     (p=0.845 n=49+49)
BucketSeries/1000000SeriesWith1Samples/1000000of1000000-16        11.0M ± 0%     10.0M ± 0%   -9.06%  (p=0.000 n=49+50)
BucketSeries/100000SeriesWith100Samples/1of10000000-16            1.10k ± 0%     1.10k ± 0%   +0.27%  (p=0.000 n=50+50)
BucketSeries/100000SeriesWith100Samples/100of10000000-16          1.14k ± 0%     1.14k ± 0%     ~     (p=0.622 n=50+50)
BucketSeries/100000SeriesWith100Samples/10000000of10000000-16     1.10M ± 0%     1.00M ± 0%   -9.04%  (p=0.000 n=49+50)
BucketSeries/1SeriesWith10000000Samples/1of10000000-16              200 ± 0%       202 ± 0%   +1.00%  (p=0.000 n=48+50)
BucketSeries/1SeriesWith10000000Samples/100of10000000-16            200 ± 0%       202 ± 0%   +1.00%  (p=0.000 n=49+50)
BucketSeries/1SeriesWith10000000Samples/10000000of10000000-16      167k ± 0%      167k ± 0%   +0.00%  (p=0.000 n=50+50)
```

CPU usage is +/- the same when you take the variance into account, the
allocated memory is the real difference and, as expected, the benchmarks
show the most improvement on the cases with lots of callers.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* *: linter fixes

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* Makefile: run formatter after proto

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* store: use old behaviour when nil has been passed

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* store: clean up custom.go

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* store: add different constructors for responses with sync.Pool

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* store: "hide" *[]byte in the SeriesResponse

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* CHANGELOG: update

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS authored Aug 16, 2021
1 parent 2be2db7 commit 7a8d189
Show file tree
Hide file tree
Showing 14 changed files with 309 additions and 179 deletions.
6 changes: 6 additions & 0 deletions .bingo/Variables.mk
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ $(PROTOC_GEN_GOGOFAST): $(BINGO_DIR)/protoc-gen-gogofast.mod
@echo "(re)installing $(GOBIN)/protoc-gen-gogofast-v1.3.2"
@cd $(BINGO_DIR) && $(GO) build -mod=mod -modfile=protoc-gen-gogofast.mod -o=$(GOBIN)/protoc-gen-gogofast-v1.3.2 "github.com/gogo/protobuf/protoc-gen-gogofast"

PROTOC_GO_INJECT_FIELD := $(GOBIN)/protoc-go-inject-field-v0.0.0-20170110051745-00204be12496
$(PROTOC_GO_INJECT_FIELD): $(BINGO_DIR)/protoc-go-inject-field.mod
@# Install binary/ries using Go 1.14+ build command. This is using bwplotka/bingo-controlled, separate go module with pinned dependencies.
@echo "(re)installing $(GOBIN)/protoc-go-inject-field-v0.0.0-20170110051745-00204be12496"
@cd $(BINGO_DIR) && $(GO) build -mod=mod -modfile=protoc-go-inject-field.mod -o=$(GOBIN)/protoc-go-inject-field-v0.0.0-20170110051745-00204be12496 "github.com/favadi/protoc-go-inject-field"

SHFMT := $(GOBIN)/shfmt-v3.1.2
$(SHFMT): $(BINGO_DIR)/shfmt.mod
@# Install binary/ries using Go 1.14+ build command. This is using bwplotka/bingo-controlled, separate go module with pinned dependencies.
Expand Down
5 changes: 5 additions & 0 deletions .bingo/protoc-go-inject-field.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module _ // Auto generated by https://github.com/bwplotka/bingo. DO NOT EDIT

go 1.16

require github.com/favadi/protoc-go-inject-field v0.0.0-20170110051745-00204be12496
2 changes: 2 additions & 0 deletions .bingo/variables.env
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,7 @@ PROMU="${GOBIN}/promu-v0.5.0"

PROTOC_GEN_GOGOFAST="${GOBIN}/protoc-gen-gogofast-v1.3.2"

PROTOC_GO_INJECT_FIELD="${GOBIN}/protoc-go-inject-field-v0.0.0-20170110051745-00204be12496"

SHFMT="${GOBIN}/shfmt-v3.1.2"

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Changed
- [#4519](https://github.com/thanos-io/thanos/pull/4519) Query: switch to miekgdns DNS resolver as the default one.
- [#4535](https://github.com/thanos-io/thanos/pull/4535) Store: Reuse same buffer for Series() responses. On bigger queries this reduces Thanos Store memory usage by up to 50%.

## [v0.22.0](https://github.com/thanos-io/thanos/tree/release-0.22) - 2021.07.22

Expand Down
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ go-format: $(GOIMPORTS)

.PHONY: proto
proto: ## Generates Go files from Thanos proto files.
proto: check-git $(GOIMPORTS) $(PROTOC) $(PROTOC_GEN_GOGOFAST)
@GOIMPORTS_BIN="$(GOIMPORTS)" PROTOC_BIN="$(PROTOC)" PROTOC_GEN_GOGOFAST_BIN="$(PROTOC_GEN_GOGOFAST)" scripts/genproto.sh
proto: check-git $(GOIMPORTS) $(PROTOC) $(PROTOC_GEN_GOGOFAST) $(PROTOC_GO_INJECT_FIELD)
@PROTOC_GO_INJECT_FIELD_BIN="$(PROTOC_GO_INJECT_FIELD)" GOIMPORTS_BIN="$(GOIMPORTS)" PROTOC_BIN="$(PROTOC)" PROTOC_GEN_GOGOFAST_BIN="$(PROTOC_GEN_GOGOFAST)" scripts/genproto.sh

.PHONY: tarballs-release
tarballs-release: ## Build tarballs.
Expand Down Expand Up @@ -314,6 +314,7 @@ sync/atomic=go.uber.org/atomic" ./...
@go run ./scripts/copyright
@echo ">> ensuring generated proto files are up to date"
@$(MAKE) proto
@$(MAKE) format
$(call require_clean_work_tree,'detected files without copyright, run make lint and commit changes')

.PHONY: shell-lint
Expand Down
33 changes: 31 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ type BucketStore struct {

// Enables hints in the Series() response.
enableSeriesResponseHints bool

// respPool is a sync.Pool for marshaling Series() responses.
respPool sync.Pool
}

type noopCache struct{}
Expand Down Expand Up @@ -396,6 +399,7 @@ func NewBucketStore(
enableCompatibilityLabel: enableCompatibilityLabel,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
enableSeriesResponseHints: enableSeriesResponseHints,
respPool: sync.Pool{},
}

for _, option := range options {
Expand Down Expand Up @@ -1093,8 +1097,17 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
s.metrics.seriesGetAllDuration.Observe(stats.getAllDuration.Seconds())
s.metrics.seriesBlocksQueried.Observe(float64(stats.blocksQueried))
}

var resp *storepb.SeriesResponse
defer func(r **storepb.SeriesResponse) {
if *r != nil {
(*r).Close()
}
}(&resp)

// Merge the sub-results from each selected block.
tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) {

begin := time.Now()

// NOTE: We "carefully" assume series and chunks are sorted within each SeriesSet. This should be guaranteed by
Expand All @@ -1115,7 +1128,15 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks)))
}
series.Labels = labelpb.ZLabelsFromPromLabels(lset)
if err = srv.Send(storepb.NewSeriesResponse(&series)); err != nil {

if resp == nil {
resp = storepb.NewSeriesResponseWithPool(&series, &s.respPool)
} else {
resp.Result = &storepb.SeriesResponse_Series{
Series: &series,
}
}
if err = srv.Send(resp); err != nil {
err = status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
return
}
Expand All @@ -1138,7 +1159,15 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
return
}

if err = srv.Send(storepb.NewHintsSeriesResponse(anyHints)); err != nil {
if resp == nil {
resp = storepb.NewHintsSeriesResponseWithPool(anyHints, &s.respPool)
} else {
resp.Result = &storepb.SeriesResponse_Hints{
Hints: anyHints,
}
}

if err = srv.Send(resp); err != nil {
err = status.Error(codes.Unknown, errors.Wrap(err, "send series response hints").Error())
return
}
Expand Down
1 change: 1 addition & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
queryGate: gate.NewNoop(),
chunksLimiterFactory: NewChunksLimiterFactory(0),
seriesLimiterFactory: NewSeriesLimiterFactory(0),
respPool: sync.Pool{},
}

t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/store/hintspb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ package hintspb

import "github.com/oklog/ulid"

func (m *SeriesResponseHints) Len() int {
return len(m.QueriedBlocks)
}

func (m *SeriesResponseHints) AddQueriedBlock(id ulid.ULID) {
m.QueriedBlocks = append(m.QueriedBlocks, Block{
Id: id.String(),
Expand Down
4 changes: 0 additions & 4 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1547,17 +1547,13 @@ type storeSeriesServer struct {
SeriesSet []storepb.Series
Warnings []string
HintsSet []*types.Any

Size int64
}

func newStoreSeriesServer(ctx context.Context) *storeSeriesServer {
return &storeSeriesServer{ctx: ctx}
}

func (s *storeSeriesServer) Send(r *storepb.SeriesResponse) error {
s.Size += int64(r.Size())

if r.GetWarning() != "" {
s.Warnings = append(s.Warnings, r.GetWarning())
return nil
Expand Down
154 changes: 154 additions & 0 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sort"
"strconv"
"strings"
"sync"

"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
Expand Down Expand Up @@ -41,6 +42,24 @@ func NewSeriesResponse(series *Series) *SeriesResponse {
}
}

func NewSeriesResponseWithPool(series *Series, respPool *sync.Pool) *SeriesResponse {
return &SeriesResponse{
respPool: respPool,
Result: &SeriesResponse_Series{
Series: series,
},
}
}

func NewHintsSeriesResponseWithPool(hints *types.Any, respPool *sync.Pool) *SeriesResponse {
return &SeriesResponse{
respPool: respPool,
Result: &SeriesResponse_Hints{
Hints: hints,
},
}
}

func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse {
return &SeriesResponse{
Result: &SeriesResponse_Hints{
Expand Down Expand Up @@ -456,3 +475,138 @@ func CompareLabels(a, b []Label) int {
func LabelsToPromLabelsUnsafe(lset []Label) labels.Labels {
return labelpb.ZLabelsToPromLabels(lset)
}

// Type alias because protoc-go-inject-field does not support
// managing imports.
type syncPool = sync.Pool

// Close returns the memory used for marshaling, if any.
func (m *SeriesResponse) Close() {
if m == nil || m.respBuf == nil {
return
}

m.respPool.Put(m.respBuf)
m.respBuf = nil
}

// The following were copied/pasted from gogoprotobuf generated code with changes
// to make it work with sync.Pool / []byte slice.
func (m *SeriesResponse) Marshal() (dAtA []byte, err error) {
size := m.Size()

var respBuf []byte

// No pool defined, allocate directly.
if m.respPool == nil {
respBuf = make([]byte, size)
} else {
if m.respBuf == nil {
poolBuf := m.respPool.Get()
if poolBuf == nil {
respBuf = make([]byte, size)
m.respBuf = &respBuf
} else {
m.respBuf = poolBuf.(*[]byte)
respBuf = *m.respBuf
}
} else {
if cap(*m.respBuf) < size {
if m.respPool != nil {
m.respPool.Put(m.respBuf)
}
respBuf = make([]byte, size)
m.respBuf = &respBuf
} else {
respBuf = *m.respBuf
}
}
}

marshalBuf := respBuf[:size]
n, err := m.MarshalToSizedBuffer(marshalBuf)
if err != nil {
return nil, err
}
return marshalBuf[len(marshalBuf)-n:], nil
}

func (m *SeriesResponse) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}

type marshaler interface {
MarshalTo([]byte) (int, error)
}

func (m *SeriesResponse) MarshalToSizedBuffer(data []byte) (int, error) {
i := len(data)

if m.Result != nil {
size := m.Result.Size()
i -= size

if _, err := m.Result.(marshaler).MarshalTo(data[i:]); err != nil {
return 0, err
}
}
return len(data) - i, nil
}

func (m *SeriesResponse_Series) MarshalTo(data []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(data[:size])
}

func (m *SeriesResponse_Series) MarshalToSizedBuffer(data []byte) (int, error) {
i := len(data)
if m.Series != nil {
{
size, err := m.Series.MarshalToSizedBuffer(data[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRpc(data, i, uint64(size))
}
i--
data[i] = 0xa
}
return len(data) - i, nil
}
func (m *SeriesResponse_Warning) MarshalTo(data []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(data[:size])
}

func (m *SeriesResponse_Warning) MarshalToSizedBuffer(data []byte) (int, error) {
i := len(data)
i -= len(m.Warning)
copy(data[i:], m.Warning)
i = encodeVarintRpc(data, i, uint64(len(m.Warning)))
i--
data[i] = 0x12
return len(data) - i, nil
}
func (m *SeriesResponse_Hints) MarshalTo(data []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(data[:size])
}

func (m *SeriesResponse_Hints) MarshalToSizedBuffer(data []byte) (int, error) {
i := len(data)
if m.Hints != nil {
{
size, err := m.Hints.MarshalToSizedBuffer(data[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRpc(data, i, uint64(size))
}
i--
data[i] = 0x1a
}
return len(data) - i, nil
}
Loading

0 comments on commit 7a8d189

Please sign in to comment.