Skip to content

Commit

Permalink
Add batched loading of series in the store-gateway
Browse files Browse the repository at this point in the history
This is a POC and is not meant to be merged yet.

Overview

Instead of loading all series (label sets and chunks) in memory before
responding to a Series() RPC, we can batch them and load X at a time.
This gives more predictability on the memory utilization of the
store-gateway. The tradeoff is having to do one trip to the index cache
and the bucket for each batch, which will affect overall latency of requests.

How to use

This change disables batch series loading by default and adds two flags
to control this - whether it's enabled via
`-blocks-storage.bucket-store.batched-series-loading=false` and how many
series go into each batch via
`-blocks-storage.bucket-store.batch-series-size=65536`.

Limiting

Ideally we want ot put a limit on the number of bytes that we want to
load in each batch instead of the number of series. For now limiting
the number of series should still give us some resilience against "big"
requests, while still being vulnerable to a flurry of many requests.

Testing

I've changed all tests within pkg/storegateway to use this new loading
strategy. This should give confidence that it is producing correct
results. Further work should improve testing around resource utilization
(i.e. batches are indeed freed one after the other)
and should test both batched and non-batched strategies.

This commit has TODOs, which should be addressed before merging this.

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov committed Nov 2, 2022
1 parent 3508a0b commit e763037
Show file tree
Hide file tree
Showing 10 changed files with 510 additions and 39 deletions.
22 changes: 22 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -5693,6 +5693,28 @@
"fieldFlag": "blocks-storage.bucket-store.max-concurrent-reject-over-limit",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "batched_series_loading",
"required": false,
"desc": "If enabled, store-gateway will load series from the store in batches instead of loading them all in memory.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "blocks-storage.bucket-store.batched-series-loading",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "batch_series_size",
"required": false,
"desc": "How many series to fetch per batch when batched-series-loading is enabled.",
"fieldValue": null,
"fieldDefaultValue": 65536,
"fieldFlag": "blocks-storage.bucket-store.batch-series-size",
"fieldType": "int",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
4 changes: 4 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ Usage of ./cmd/mimir/mimir:
User assigned identity. If empty, then System assigned identity is used.
-blocks-storage.backend string
Backend storage to use. Supported backends are: s3, gcs, azure, swift, filesystem. (default "filesystem")
-blocks-storage.bucket-store.batch-series-size int
[experimental] How many series to fetch per batch when batched-series-loading is enabled. (default 65536)
-blocks-storage.bucket-store.batched-series-loading
[experimental] If enabled, store-gateway will load series from the store in batches instead of loading them all in memory.
-blocks-storage.bucket-store.block-sync-concurrency int
Maximum number of concurrent blocks synching per tenant. (default 20)
-blocks-storage.bucket-store.bucket-index.enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3018,6 +3018,16 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.max-concurrent-reject-over-limit
[max_concurrent_reject_over_limit: <boolean> | default = false]
# (experimental) If enabled, store-gateway will load series from the store in
# batches instead of loading them all in memory.
# CLI flag: -blocks-storage.bucket-store.batched-series-loading
[batched_series_loading: <boolean> | default = false]
# (experimental) How many series to fetch per batch when
# batched-series-loading is enabled.
# CLI flag: -blocks-storage.bucket-store.batch-series-size
[batch_series_size: <int> | default = 65536]
tsdb:
# Directory to store TSDBs (including WAL) in the ingesters. This directory is
# required to be persisted between restarts.
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ type BucketStoreConfig struct {

// Controls what to do when MaxConcurrent is exceeded: fail immediately or wait for a slot to run.
MaxConcurrentRejectOverLimit bool `yaml:"max_concurrent_reject_over_limit" category:"experimental"`
BatchedSeriesLoading bool `yaml:"batched_series_loading" category:"experimental"`
BatchSeriesSize int `yaml:"batch_series_size" category:"experimental"`
}

// RegisterFlags registers the BucketStore flags
Expand Down Expand Up @@ -331,6 +333,8 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.IndexHeaderLazyLoadingEnabled, "blocks-storage.bucket-store.index-header-lazy-loading-enabled", true, "If enabled, store-gateway will lazy load an index-header only once required by a query.")
f.DurationVar(&cfg.IndexHeaderLazyLoadingIdleTimeout, "blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout", 60*time.Minute, "If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity.")
f.Uint64Var(&cfg.PartitionerMaxGapBytes, "blocks-storage.bucket-store.partitioner-max-gap-bytes", DefaultPartitionerMaxGapSize, "Max size - in bytes - of a gap for which the partitioner aggregates together two bucket GET object requests.")
f.BoolVar(&cfg.BatchedSeriesLoading, "blocks-storage.bucket-store.batched-series-loading", false, "If enabled, store-gateway will load series from the store in batches instead of loading them all in memory.")
f.IntVar(&cfg.BatchSeriesSize, "blocks-storage.bucket-store.batch-series-size", 65536, "How many series to fetch per batch when batched-series-loading is enabled.")
}

// Validate the config.
Expand Down
255 changes: 255 additions & 0 deletions pkg/storegateway/batch_series.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
// SPDX-License-Identifier: AGPL-3.0-only

package storegateway

import (
"context"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/hashcache"

"github.com/grafana/mimir/pkg/storage/sharding"
"github.com/grafana/mimir/pkg/storegateway/storepb"
)

type batchedSeriesSet struct {
ctx context.Context
postings []storage.SeriesRef

batchSize int
i int // where we're at in the labels and chunks
iOffset int // if i==0, which index in postings does this correspond to
preloaded []seriesEntry
stats *queryStats
err error

indexr *bucketIndexReader // Index reader for block.
chunkr *bucketChunkReader // Chunk reader for block.
matchers []*labels.Matcher // Series matchers.
shard *sharding.ShardSelector // Shard selector.
seriesHashCache *hashcache.BlockSeriesHashCache // Block-specific series hash cache (used only if shard selector is specified).
chunksLimiter ChunksLimiter // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter // Rate limiter for loading series.
skipChunks bool // If true chunks are not loaded and minTime/maxTime are ignored.
minTime, maxTime int64 // Series must have data in this time range to be returned (ignored if skipChunks=true).
loadAggregates []storepb.Aggr // List of aggregates to load when loading chunks.
logger log.Logger

cleanupFuncs []func()
}

func batchedBlockSeries(
ctx context.Context,
batchSize int,
indexr *bucketIndexReader, // Index reader for block.
chunkr *bucketChunkReader, // Chunk reader for block.
matchers []*labels.Matcher, // Series matchers.
shard *sharding.ShardSelector, // Shard selector.
seriesHashCache *hashcache.BlockSeriesHashCache, // Block-specific series hash cache (used only if shard selector is specified).
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
skipChunks bool, // If true chunks are not loaded and minTime/maxTime are ignored.
minTime, maxTime int64, // Series must have data in this time range to be returned (ignored if skipChunks=true).
loadAggregates []storepb.Aggr, // List of aggregates to load when loading chunks.
logger log.Logger,
) (storepb.SeriesSet, error) {
if batchSize <= 0 {
return nil, errors.New("batch size must be a positive number")
}
if skipChunks {
res, ok := fetchCachedSeries(ctx, indexr.block.userID, indexr.block.indexCache, indexr.block.meta.ULID, matchers, shard, logger)
if ok {
return newBucketSeriesSet(res), nil
}
}

ps, err := indexr.ExpandedPostings(ctx, matchers)
if err != nil {
return nil, errors.Wrap(err, "expanded matching posting")
}

// We can't compute the series hash yet because we're still missing the series labels.
// However, if the hash is already in the cache, then we can remove all postings for series
// not belonging to the shard.
var seriesCacheStats queryStats
if shard != nil {
ps, seriesCacheStats = filterPostingsByCachedShardHash(ps, shard, seriesHashCache)
}

return &batchedSeriesSet{
batchSize: batchSize,
preloaded: make([]seriesEntry, 0, batchSize),
stats: &seriesCacheStats,
iOffset: -batchSize,
ctx: ctx,
postings: ps,
indexr: indexr,
chunkr: chunkr,
matchers: matchers,
shard: shard,
seriesHashCache: seriesHashCache,
chunksLimiter: chunksLimiter,
seriesLimiter: seriesLimiter,
skipChunks: skipChunks,
minTime: minTime,
maxTime: maxTime,
loadAggregates: loadAggregates,
logger: logger,
}, nil
}

func (s *batchedSeriesSet) Next() bool {
if s.iOffset+s.i >= len(s.postings)-1 || s.err != nil {
return false
}
s.i++
if s.i >= len(s.preloaded) {
return s.preload()
}
return true
}

func (s *batchedSeriesSet) preload() bool {
s.resetPreloaded()
s.iOffset += s.batchSize
if s.iOffset > len(s.postings) {
return false
}

end := s.iOffset + s.batchSize
if end > len(s.postings) {
end = len(s.postings)
}
nextBatch := s.postings[s.iOffset:end]

if err := s.indexr.PreloadSeries(s.ctx, nextBatch); err != nil {
s.err = errors.Wrap(err, "preload series")
return false
}

var (
symbolizedLset []symbolizedLabel
chks []chunks.Meta
)
for _, id := range nextBatch {
ok, err := s.indexr.LoadSeriesForTime(id, &symbolizedLset, &chks, s.skipChunks, s.minTime, s.maxTime)
if err != nil {
s.err = errors.Wrap(err, "read series")
return false
}
if !ok {
// No matching chunks for this time duration, skip series and extend nextBatch
continue
}

lset, err := s.indexr.LookupLabelsSymbols(symbolizedLset)
if err != nil {
s.err = errors.Wrap(err, "lookup labels symbols")
return false
}

// Skip the series if it doesn't belong to the shard.
if s.shard != nil {
hash, ok := s.seriesHashCache.Fetch(id)
s.stats.seriesHashCacheRequests++

if !ok {
hash = lset.Hash()
s.seriesHashCache.Store(id, hash)
} else {
s.stats.seriesHashCacheHits++
}

if hash%s.shard.ShardCount != s.shard.ShardIndex {
continue
}
}

// Check series limit after filtering out series not belonging to the requested shard (if any).
if err := s.seriesLimiter.Reserve(1); err != nil {
s.err = errors.Wrap(err, "exceeded series limit")
return false
}

entry := seriesEntry{lset: lset}

if !s.skipChunks {
// Schedule loading chunks.
entry.refs = make([]chunks.ChunkRef, 0, len(chks))
entry.chks = make([]storepb.AggrChunk, 0, len(chks))
for j, meta := range chks {
// seriesEntry s is appended to res, but not at every outer loop iteration,
// therefore len(res) is the index we need here, not outer loop iteration number.
if err := s.chunkr.addLoad(meta.Ref, len(s.preloaded), j); err != nil {
s.err = errors.Wrap(err, "add chunk load")
return false
}
entry.chks = append(entry.chks, storepb.AggrChunk{
MinTime: meta.MinTime,
MaxTime: meta.MaxTime,
})
entry.refs = append(entry.refs, meta.Ref)
}

// Ensure sample limit through chunksLimiter if we return chunks.
if err := s.chunksLimiter.Reserve(uint64(len(entry.chks))); err != nil {
s.err = errors.Wrap(err, "exceeded chunks limit")
return false
}
}

s.preloaded = append(s.preloaded, entry)
}

if len(s.preloaded) == 0 {
return s.preload() // we didn't find any suitable series in this batch, try with the next one
}

if s.skipChunks {
storeCachedSeries(s.ctx, s.indexr.block.indexCache, s.indexr.block.userID, s.indexr.block.meta.ULID, s.matchers, s.shard, s.preloaded, s.logger)
return true
}

if err := s.chunkr.load(s.preloaded, s.loadAggregates); err != nil {
s.err = errors.Wrap(err, "load chunks")
return false
}

s.stats = s.stats.merge(s.indexr.stats).merge(s.chunkr.stats)
return true
}

func (s *batchedSeriesSet) resetPreloaded() {
s.preloaded = s.preloaded[:0]
s.loadAggregates = s.loadAggregates[:0]
s.cleanupFuncs = append(s.cleanupFuncs, s.indexr.unload)
if s.chunkr != nil { // can be nil when the client didn't want to load chunks
s.chunkr.reset()
s.cleanupFuncs = append(s.cleanupFuncs, s.chunkr.unload)
}
s.i = 0
}

func (s *batchedSeriesSet) At() (labels.Labels, []storepb.AggrChunk) {
if s.i >= len(s.preloaded) {
return nil, nil
}
return s.preloaded[s.i].lset, s.preloaded[s.i].chks
}

func (s *batchedSeriesSet) Err() error {
return s.err
}

func (s *batchedSeriesSet) CleanupFunc() func() {
return func() {
for _, cleanup := range s.cleanupFuncs {
cleanup()
}
s.cleanupFuncs = s.cleanupFuncs[:0]
}
}
Loading

0 comments on commit e763037

Please sign in to comment.