Skip to content

Commit

Permalink
Streaming store-gateway: Move seriesHasher (#3643)
Browse files Browse the repository at this point in the history
  • Loading branch information
dimitarvdimitrov authored Dec 5, 2022
1 parent f3c733e commit 4f89734
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 31 deletions.
31 changes: 0 additions & 31 deletions pkg/storegateway/batch_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/hashcache"
"golang.org/x/sync/errgroup"

Expand All @@ -20,36 +19,6 @@ import (
"github.com/grafana/mimir/pkg/util/pool"
)

type seriesHasher interface {
Hash(seriesID storage.SeriesRef, lset labels.Labels, stats *queryStats) uint64
}

type cachedSeriesHasher struct {
cache *hashcache.BlockSeriesHashCache
}

func (b cachedSeriesHasher) Hash(id storage.SeriesRef, lset labels.Labels, stats *queryStats) uint64 {
stats.seriesHashCacheRequests++

hash, ok := b.cache.Fetch(id)
if !ok {
hash = lset.Hash()
b.cache.Store(id, hash)
} else {
stats.seriesHashCacheHits++
}
return hash
}

func shardOwned(shard *sharding.ShardSelector, hasher seriesHasher, id storage.SeriesRef, lset labels.Labels, stats *queryStats) bool {
if shard == nil {
return true
}
hash := hasher.Hash(id, lset, stats)

return hash%shard.ShardCount == shard.ShardIndex
}

func (s *BucketStore) batchSetsForBlocks(ctx context.Context, req *storepb.SeriesRequest, blocks []*bucketBlock, indexReaders map[ulid.ULID]*bucketIndexReader, chunkReaders *chunkReaders, chunksPool pool.Bytes, shardSelector *sharding.ShardSelector, matchers []*labels.Matcher, chunksLimiter ChunksLimiter, seriesLimiter SeriesLimiter, stats *safeQueryStats) (storepb.SeriesSet, *hintspb.SeriesResponseHints, func(), error) {
resHints := &hintspb.SeriesResponseHints{}
mtx := sync.Mutex{}
Expand Down
30 changes: 30 additions & 0 deletions pkg/storegateway/series_refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,36 @@ func metasToChunks(blockID ulid.ULID, metas []chunks.Meta) []seriesChunkRef {
return chks
}

type seriesHasher interface {
Hash(seriesID storage.SeriesRef, lset labels.Labels, stats *queryStats) uint64
}

type cachedSeriesHasher struct {
cache *hashcache.BlockSeriesHashCache
}

func (b cachedSeriesHasher) Hash(id storage.SeriesRef, lset labels.Labels, stats *queryStats) uint64 {
stats.seriesHashCacheRequests++

hash, ok := b.cache.Fetch(id)
if !ok {
hash = lset.Hash()
b.cache.Store(id, hash)
} else {
stats.seriesHashCacheHits++
}
return hash
}

func shardOwned(shard *sharding.ShardSelector, hasher seriesHasher, id storage.SeriesRef, lset labels.Labels, stats *queryStats) bool {
if shard == nil {
return true
}
hash := hasher.Hash(id, lset, stats)

return hash%shard.ShardCount == shard.ShardIndex
}

type postingsSetsIterator struct {
postings []storage.SeriesRef

Expand Down

0 comments on commit 4f89734

Please sign in to comment.