Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: remove unused min/max in chunk merge iterator #8464

Merged
merged 2 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pkg/querier/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package batch
import (
"fmt"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/tsdb/chunkenc"

Expand Down Expand Up @@ -56,7 +55,7 @@ type iterator interface {
}

// NewChunkMergeIterator returns a chunkenc.Iterator that merges Mimir chunks together.
func NewChunkMergeIterator(it chunkenc.Iterator, chunks []chunk.Chunk, _, _ model.Time) chunkenc.Iterator {
func NewChunkMergeIterator(it chunkenc.Iterator, chunks []chunk.Chunk) chunkenc.Iterator {
converted := make([]GenericChunk, len(chunks))
for i, c := range chunks {
converted[i] = NewGenericChunk(int64(c.From), int64(c.Through), c.Data.NewIterator)
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) {
fh *histogram.FloatHistogram
)
for n := 0; n < b.N; n++ {
it = NewChunkMergeIterator(it, chunks, 0, 0)
it = NewChunkMergeIterator(it, chunks)
for valType := it.Next(); valType != chunkenc.ValNone; valType = it.Next() {
switch valType {
case chunkenc.ValFloat:
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) {
chunkTwo := mkChunk(t, model.Time(10*step/time.Millisecond), 1, chunk.PrometheusXorChunk)
chunks := []chunk.Chunk{chunkOne, chunkTwo}

sut := NewChunkMergeIterator(nil, chunks, 0, 0)
sut := NewChunkMergeIterator(nil, chunks)

// Following calls mimics Prometheus's query engine behaviour for VectorSelector.
require.Equal(t, chunkenc.ValFloat, sut.Next())
Expand Down
4 changes: 0 additions & 4 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, minT, maxT int
serieses = append(serieses, &chunkSeries{
labels: ls,
chunks: chunks,
mint: minT,
maxt: maxT,
})
}

Expand All @@ -158,8 +156,6 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, minT, maxT int
if len(results.StreamingSeries) > 0 {
streamingSeries := make([]storage.Series, 0, len(results.StreamingSeries))
streamingChunkSeriesConfig := &streamingChunkSeriesContext{
mint: minT,
maxt: maxT,
queryMetrics: q.queryMetrics,
queryStats: stats.FromContext(ctx),
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/querier/distributor_queryable_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package querier
import (
"fmt"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"

Expand All @@ -16,7 +15,6 @@ import (
)

type streamingChunkSeriesContext struct {
mint, maxt int64
queryMetrics *stats.QueryMetrics
queryStats *stats.Stats
}
Expand Down Expand Up @@ -76,5 +74,5 @@ func (s *streamingChunkSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator
return series.NewErrIterator(err)
}

return batch.NewChunkMergeIterator(it, chunks, model.Time(s.context.mint), model.Time(s.context.maxt))
return batch.NewChunkMergeIterator(it, chunks)
}
12 changes: 1 addition & 11 deletions pkg/querier/distributor_queryable_streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ import (
)

func TestStreamingChunkSeries_HappyPath(t *testing.T) {
const (
minT = 1000
maxT = 6000
)
chunkUniqueToFirstSource := createTestChunk(t, 1500, 1.23)
chunkUniqueToSecondSource := createTestChunk(t, 2000, 4.56)
chunkPresentInBothSources := createTestChunk(t, 2500, 7.89)
Expand All @@ -42,8 +38,6 @@ func TestStreamingChunkSeries_HappyPath(t *testing.T) {
{SeriesIndex: 0, StreamReader: createTestStreamReader([]client.QueryStreamSeriesChunks{{SeriesIndex: 0, Chunks: []client.Chunk{chunkUniqueToSecondSource, chunkPresentInBothSources}}})},
},
context: &streamingChunkSeriesContext{
mint: minT,
maxt: maxT,
queryMetrics: stats.NewQueryMetrics(reg),
queryStats: queryStats,
},
Expand All @@ -54,7 +48,7 @@ func TestStreamingChunkSeries_HappyPath(t *testing.T) {

expectedChunks, err := client.FromChunks(series.labels, []client.Chunk{chunkUniqueToFirstSource, chunkUniqueToSecondSource, chunkPresentInBothSources})
require.NoError(t, err)
assertChunkIteratorsEqual(t, iterator, batch.NewChunkMergeIterator(nil, expectedChunks, minT, maxT))
assertChunkIteratorsEqual(t, iterator, batch.NewChunkMergeIterator(nil, expectedChunks))

m, err := metrics.NewMetricFamilyMapFromGatherer(reg)
require.NoError(t, err)
Expand Down Expand Up @@ -106,8 +100,6 @@ func TestStreamingChunkSeries_StreamReaderReturnsError(t *testing.T) {
{SeriesIndex: 0, StreamReader: createTestStreamReader([]client.QueryStreamSeriesChunks{})},
},
context: &streamingChunkSeriesContext{
mint: 1000,
maxt: 6000,
queryMetrics: stats.NewQueryMetrics(reg),
queryStats: queryStats,
},
Expand All @@ -125,8 +117,6 @@ func TestStreamingChunkSeries_CreateIteratorTwice(t *testing.T) {
{SeriesIndex: 0, StreamReader: createTestStreamReader([]client.QueryStreamSeriesChunks{{SeriesIndex: 0, Chunks: []client.Chunk{createTestChunk(t, 1500, 1.23)}}})},
},
context: &streamingChunkSeriesContext{
mint: 1000,
maxt: 6000,
queryMetrics: stats.NewQueryMetrics(prometheus.NewPedanticRegistry()),
queryStats: &stats.Stats{},
},
Expand Down
12 changes: 4 additions & 8 deletions pkg/querier/partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package querier

import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
Expand All @@ -17,7 +16,7 @@ import (
)

// Series in the returned set are sorted alphabetically by labels.
func partitionChunks(chunks []chunk.Chunk, mint, maxt int64) storage.SeriesSet {
func partitionChunks(chunks []chunk.Chunk) storage.SeriesSet {
chunksBySeries := map[string][]chunk.Chunk{}
var buf [1024]byte
for _, c := range chunks {
Expand All @@ -30,8 +29,6 @@ func partitionChunks(chunks []chunk.Chunk, mint, maxt int64) storage.SeriesSet {
series = append(series, &chunkSeries{
labels: chunksBySeries[i][0].Metric,
chunks: chunksBySeries[i],
mint: mint,
maxt: maxt,
})
}

Expand All @@ -40,9 +37,8 @@ func partitionChunks(chunks []chunk.Chunk, mint, maxt int64) storage.SeriesSet {

// Implements SeriesWithChunks
type chunkSeries struct {
labels labels.Labels
chunks []chunk.Chunk
mint, maxt int64
labels labels.Labels
chunks []chunk.Chunk
}

func (s *chunkSeries) Labels() labels.Labels {
Expand All @@ -51,7 +47,7 @@ func (s *chunkSeries) Labels() labels.Labels {

// Iterator returns a new iterator of the data of the series.
func (s *chunkSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
return batch.NewChunkMergeIterator(it, s.chunks, model.Time(s.mint), model.Time(s.maxt))
return batch.NewChunkMergeIterator(it, s.chunks)
}

// Chunks implements SeriesWithChunks interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/partitioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func testPartitionChunksOutputIsSortedByLabels(t *testing.T, encoding chunk.Enco
allChunks = append(allChunks, ch)
}

res := partitionChunks(allChunks, 0, 1000)
res := partitionChunks(allChunks)

// collect labels from each series
var seriesLabels []labels.Labels
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func (mq multiQuerier) mergeSeriesSets(sets []storage.SeriesSet) storage.SeriesS
}

// partitionChunks returns set with sorted series, so it can be used by NewMergeSeriesSet
chunksSet := partitionChunks(chunks, mq.minT, mq.maxT)
chunksSet := partitionChunks(chunks)

if len(otherSets) == 0 {
return chunksSet
Expand Down
Loading