Skip to content

Commit

Permalink
streaming store-gateway: Polish batchSetsForBlocks (#3651)
Browse files Browse the repository at this point in the history
* Polish batchSetsForBlocks

* rename to batchedSeriesSetForBlocks
* remove unused cleanups
* move to bucket.go
* some formatting
* also remove cleanups from synchronousSeriesSet

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Rename batchedSeriesSetForBlocks to streamingSeriesSetForBlocks

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov authored Dec 6, 2022
1 parent 18add51 commit 56a6af6
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 113 deletions.
91 changes: 0 additions & 91 deletions pkg/storegateway/batch_series.go

This file was deleted.

3 changes: 0 additions & 3 deletions pkg/storegateway/batch_series_test.go

This file was deleted.

116 changes: 97 additions & 19 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
var (
seriesSets storepb.SeriesSet
resHints = &hintspb.SeriesResponseHints{}
cleanup func()
)

if s.maxSeriesPerBatch <= 0 {
Expand All @@ -906,17 +905,14 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
defer chunksPool.Release()
}

seriesSets, cleanup, err = s.synchronousSeriesSet(ctx, req, stats, blocks, indexReaders, chunkr, chunksPool, resHints, shardSelector, matchers, chunksLimiter, seriesLimiter)
seriesSets, err = s.synchronousSeriesSet(ctx, req, stats, blocks, indexReaders, chunkr, chunksPool, resHints, shardSelector, matchers, chunksLimiter, seriesLimiter)
} else {
var readers *chunkReaders
if !req.SkipChunks {
readers = newChunkReaders(chunkr)
}

seriesSets, resHints, cleanup, err = s.batchSetsForBlocks(ctx, req, blocks, indexReaders, readers, s.chunkPool, shardSelector, matchers, chunksLimiter, seriesLimiter, stats)
}
if cleanup != nil {
defer cleanup()
seriesSets, resHints, err = s.streamingSeriesSetForBlocks(ctx, req, blocks, indexReaders, readers, s.chunkPool, shardSelector, matchers, chunksLimiter, seriesLimiter, stats)
}

if err != nil {
Expand Down Expand Up @@ -1009,11 +1005,10 @@ func (s *BucketStore) synchronousSeriesSet(
matchers []*labels.Matcher,
chunksLimiter ChunksLimiter,
seriesLimiter SeriesLimiter,
) (storepb.SeriesSet, func(), error) {
) (storepb.SeriesSet, error) {
var (
resMtx sync.Mutex
res []storepb.SeriesSet
cleanups []func()
resMtx sync.Mutex
res []storepb.SeriesSet
)
g, ctx := errgroup.WithContext(ctx)

Expand Down Expand Up @@ -1061,13 +1056,6 @@ func (s *BucketStore) synchronousSeriesSet(
})
}

cleanup := func() {
// Iterate from last to first so that we mimic defer semantics
for i := len(cleanups) - 1; i >= 0; i-- {
cleanups[i]()
}
}

// Wait until data is fetched from all blocks
begin := time.Now()
err := g.Wait()
Expand All @@ -1076,7 +1064,7 @@ func (s *BucketStore) synchronousSeriesSet(
if s, ok := status.FromError(errors.Cause(err)); ok {
code = s.Code()
}
return nil, cleanup, status.Error(code, err.Error())
return nil, status.Error(code, err.Error())
}

getAllDuration := time.Since(begin)
Expand All @@ -1087,7 +1075,97 @@ func (s *BucketStore) synchronousSeriesSet(
s.metrics.seriesGetAllDuration.Observe(getAllDuration.Seconds())
s.metrics.seriesBlocksQueried.Observe(float64(len(res)))

return storepb.MergeSeriesSets(res...), cleanup, err
return storepb.MergeSeriesSets(res...), err
}

func (s *BucketStore) streamingSeriesSetForBlocks(
ctx context.Context,
req *storepb.SeriesRequest,
blocks []*bucketBlock,
indexReaders map[ulid.ULID]*bucketIndexReader,
chunkReaders *chunkReaders,
chunksPool pool.Bytes,
shardSelector *sharding.ShardSelector,
matchers []*labels.Matcher,
chunksLimiter ChunksLimiter,
seriesLimiter SeriesLimiter,
stats *safeQueryStats,
) (storepb.SeriesSet, *hintspb.SeriesResponseHints, error) {
var (
resHints = &hintspb.SeriesResponseHints{}
mtx = sync.Mutex{}
batches = make([]seriesChunkRefsSetIterator, 0, len(blocks))
g, _ = errgroup.WithContext(ctx)
)

for _, b := range blocks {
b := b

// Keep track of queried blocks.
resHints.AddQueriedBlock(b.meta.ULID)
indexr := indexReaders[b.meta.ULID]

// If query sharding is enabled we have to get the block-specific series hash cache
// which is used by blockSeries().
var blockSeriesHashCache *hashcache.BlockSeriesHashCache
if shardSelector != nil {
blockSeriesHashCache = s.seriesHashCache.GetBlockCache(b.meta.ULID.String())
}
g.Go(func() error {
var (
part seriesChunkRefsSetIterator
err error
)

part, err = openBlockSeriesChunkRefsSetsIterator(
ctx,
s.maxSeriesPerBatch,
indexr,
b.meta,
matchers,
shardSelector,
blockSeriesHashCache,
chunksLimiter,
seriesLimiter,
req.SkipChunks,
req.MinTime, req.MaxTime,
stats,
s.logger,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
}

mtx.Lock()
batches = append(batches, part)
mtx.Unlock()

return nil
})
}

begin := time.Now()
err := g.Wait()
if err != nil {
return nil, nil, err
}

getAllDuration := time.Since(begin)
stats.update(func(stats *queryStats) {
stats.blocksQueried = len(batches)
stats.getAllDuration = getAllDuration
})
s.metrics.seriesGetAllDuration.Observe(getAllDuration.Seconds())
s.metrics.seriesBlocksQueried.Observe(float64(len(batches)))

mergedBatches := mergedSeriesChunkRefsSetIterators(s.maxSeriesPerBatch, batches...)
var set storepb.SeriesSet
if chunkReaders != nil {
set = newSeriesSetWithChunks(ctx, *chunkReaders, chunksPool, mergedBatches, stats)
} else {
set = newSeriesSetWithoutChunks(ctx, mergedBatches)
}
return set, resHints, nil
}

func (s *BucketStore) recordSeriesCallResult(safeStats *safeQueryStats) {
Expand Down

0 comments on commit 56a6af6

Please sign in to comment.