Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return only samples within the queried start/end time range when executing a remote read request using SAMPLES mode #8463

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* [CHANGE] Store-gateway / querier: enable streaming chunks from store-gateways to queriers by default. #6646
* [CHANGE] Querier: honor the start/end time range specified in the read hints when executing a remote read request. #8431
* [CHANGE] Querier: return only samples within the queried start/end time range when executing a remote read request using "SAMPLES" mode. Previously, samples outside of the range could have been returned. Samples outside of the queried time range may still be returned when executing a remote read request using "STREAMED_XOR_CHUNKS" mode. #8463
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455
* [ENHANCEMENT] Compactor: Add `cortex_compactor_compaction_job_duration_seconds` and `cortex_compactor_compaction_job_blocks` histogram metrics to track duration of individual compaction jobs and number of blocks per job. #8371
* [ENHANCEMENT] Rules: Added per namespace max rules per rule group limit. The maximum number of rules per rule groups for all namespaces continues to be configured by `-ruler.max-rules-per-rule-group`, but now, this can be superseded by the new `-ruler.max-rules-per-rule-group-by-namespace` option on a per namespace basis. This new limit can be overridden using the overrides mechanism to be applied per-tenant. #8378
Expand Down
107 changes: 81 additions & 26 deletions integration/querier_remote_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/grafana/e2e"
e2edb "github.com/grafana/e2e/db"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
Expand All @@ -32,6 +33,10 @@ func TestQuerierRemoteRead(t *testing.T) {
flags := mergeFlags(
BlocksStorageFlags(),
BlocksStorageS3Flags(),
map[string]string{
// This test writes samples sparse in time. We don't want compaction to trigger while testing.
"-blocks-storage.tsdb.block-ranges-period": "2h",
},
)

// Start dependencies.
Expand All @@ -58,40 +63,90 @@ func TestQuerierRemoteRead(t *testing.T) {
// Wait until the querier has updated the ring.
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

runTestPushSeriesForQuerierRemoteRead(t, c, querier, "series_1", generateFloatSeries)
runTestPushSeriesForQuerierRemoteRead(t, c, querier, "hseries_1", generateHistogramSeries)
t.Run("float series", func(t *testing.T) {
runTestPushSeriesForQuerierRemoteRead(t, c, querier, "series_1", generateFloatSeries)
})

t.Run("histogram series", func(t *testing.T) {
runTestPushSeriesForQuerierRemoteRead(t, c, querier, "hseries_1", generateHistogramSeries)
})
}

func runTestPushSeriesForQuerierRemoteRead(t *testing.T, c *e2emimir.Client, querier *e2emimir.MimirService, seriesName string, genSeries generateSeriesFunc) {
// Push a series for each user to Mimir.
now := time.Now()

series, expectedVectors, _ := genSeries(seriesName, now)
res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
// Generate multiple series, sparse in time.
series1, expectedVector1, _ := genSeries(seriesName, now.Add(-10*time.Minute))
series2, expectedVector2, _ := genSeries(seriesName, now)

startMs := now.Add(-1 * time.Minute)
endMs := now.Add(time.Minute)
for _, series := range [][]prompb.TimeSeries{series1, series2} {
res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
}

client, err := e2emimir.NewClient("", querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)
httpResp, resp, _, err := client.RemoteRead(remoteReadQueryByMetricName(seriesName, startMs, endMs))
require.Equal(t, http.StatusOK, httpResp.StatusCode)
require.NoError(t, err)
tests := map[string]struct {
query *prompb.Query
expectedVector model.Vector
}{
"remote read request without hints": {
query: &prompb.Query{
Matchers: remoteReadQueryMatchersByMetricName(seriesName),
StartTimestampMs: now.Add(-1 * time.Minute).UnixMilli(),
EndTimestampMs: now.Add(+1 * time.Minute).UnixMilli(),
},
expectedVector: expectedVector2,
},
"remote read request with hints time range equal to query time range": {
query: &prompb.Query{
Matchers: remoteReadQueryMatchersByMetricName(seriesName),
StartTimestampMs: now.Add(-1 * time.Minute).UnixMilli(),
EndTimestampMs: now.Add(+1 * time.Minute).UnixMilli(),
Hints: &prompb.ReadHints{
StartMs: now.Add(-1 * time.Minute).UnixMilli(),
EndMs: now.Add(+1 * time.Minute).UnixMilli(),
},
},
expectedVector: expectedVector2,
},
"remote read request with hints time range different than query time range": {
query: &prompb.Query{
Matchers: remoteReadQueryMatchersByMetricName(seriesName),
StartTimestampMs: now.Add(-1 * time.Minute).UnixMilli(),
EndTimestampMs: now.Add(+1 * time.Minute).UnixMilli(),
Hints: &prompb.ReadHints{
StartMs: now.Add(-11 * time.Minute).UnixMilli(),
EndMs: now.Add(-9 * time.Minute).UnixMilli(),
},
},
expectedVector: expectedVector1,
},
}

// Validate the returned remote read data matches what was written
require.Len(t, resp.Timeseries, 1)
require.Len(t, resp.Timeseries[0].Labels, 1)
require.Equal(t, seriesName, resp.Timeseries[0].Labels[0].GetValue())
isSeriesFloat := len(resp.Timeseries[0].Samples) == 1
isSeriesHistogram := len(resp.Timeseries[0].Histograms) == 1
require.Equal(t, isSeriesFloat, !isSeriesHistogram)
if isSeriesFloat {
require.Equal(t, int64(expectedVectors[0].Timestamp), resp.Timeseries[0].Samples[0].Timestamp)
require.Equal(t, float64(expectedVectors[0].Value), resp.Timeseries[0].Samples[0].Value)
} else if isSeriesHistogram {
require.Equal(t, expectedVectors[0].Histogram, mimirpb.FromHistogramToPromHistogram(remote.HistogramProtoToHistogram(resp.Timeseries[0].Histograms[0])))
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
client, err := e2emimir.NewClient("", querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)
httpResp, resp, _, err := client.RemoteRead(testData.query)
require.Equal(t, http.StatusOK, httpResp.StatusCode)
require.NoError(t, err)

// Validate the returned remote read data matches what was written
require.Len(t, resp.Timeseries, 1)
require.Len(t, resp.Timeseries[0].Labels, 1)
require.Equal(t, seriesName, resp.Timeseries[0].Labels[0].GetValue())
isSeriesFloat := len(resp.Timeseries[0].Samples) > 0
isSeriesHistogram := len(resp.Timeseries[0].Histograms) > 0
require.Equal(t, isSeriesFloat, !isSeriesHistogram)
if isSeriesFloat {
require.Len(t, resp.Timeseries[0].Samples, 1)
require.Equal(t, int64(testData.expectedVector[0].Timestamp), resp.Timeseries[0].Samples[0].Timestamp)
require.Equal(t, float64(testData.expectedVector[0].Value), resp.Timeseries[0].Samples[0].Value)
} else if isSeriesHistogram {
require.Len(t, resp.Timeseries[0].Histograms, 1)
require.Equal(t, testData.expectedVector[0].Histogram, mimirpb.FromHistogramToPromHistogram(remote.HistogramProtoToHistogram(resp.Timeseries[0].Histograms[0])))
}
})
}
}

Expand Down
23 changes: 18 additions & 5 deletions pkg/querier/remote_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func remoteReadSamples(

for i, qr := range req.Queries {
go func(i int, qr *prompb.Query) {
start, end, matchers, hints, err := queryFromRemoteReadQuery(qr)
start, end, minT, maxT, matchers, hints, err := queryFromRemoteReadQuery(qr)
if err != nil {
errCh <- err
return
Expand All @@ -101,7 +101,10 @@ func remoteReadSamples(
}

seriesSet := querier.Select(ctx, false, hints, matchers...)
resp.Results[i], err = seriesSetToQueryResult(seriesSet)

// We can over-read when querying, but we don't need to return samples
// outside the queried range, so can filter them out.
resp.Results[i], err = seriesSetToQueryResult(seriesSet, int64(minT), int64(maxT))
errCh <- err
}(i, qr)
}
Expand Down Expand Up @@ -181,7 +184,7 @@ func processReadStreamedQueryRequest(
f http.Flusher,
maxBytesInFrame int,
) error {
start, end, matchers, hints, err := queryFromRemoteReadQuery(queryReq)
start, end, _, _, matchers, hints, err := queryFromRemoteReadQuery(queryReq)
if err != nil {
return err
}
Expand All @@ -200,7 +203,7 @@ func processReadStreamedQueryRequest(
)
}

func seriesSetToQueryResult(s storage.SeriesSet) (*prompb.QueryResult, error) {
func seriesSetToQueryResult(s storage.SeriesSet, filterStartMs, filterEndMs int64) (*prompb.QueryResult, error) {
result := &prompb.QueryResult{}

var it chunkenc.Iterator
Expand All @@ -210,6 +213,11 @@ func seriesSetToQueryResult(s storage.SeriesSet) (*prompb.QueryResult, error) {
histograms := []prompb.Histogram{}
it = series.Iterator(it)
for valType := it.Next(); valType != chunkenc.ValNone; valType = it.Next() {
// Ensure the sample is within the filtered time range.
if ts := it.AtT(); ts < filterStartMs || ts > filterEndMs {
continue
}

switch valType {
case chunkenc.ValFloat:
t, v := it.At()
Expand Down Expand Up @@ -335,14 +343,17 @@ func initializedFrameBytesRemaining(maxBytesInFrame int, lbls []prompb.Label) in

// queryFromRemoteReadQuery returns the queried time range and label matchers for the given remote
// read request query.
func queryFromRemoteReadQuery(query *prompb.Query) (start, end model.Time, matchers []*labels.Matcher, hints *storage.SelectHints, err error) {
func queryFromRemoteReadQuery(query *prompb.Query) (start, end, minT, maxT model.Time, matchers []*labels.Matcher, hints *storage.SelectHints, err error) {
matchers, err = prom_remote.FromLabelMatchers(query.Matchers)
if err != nil {
return
}

start = model.Time(query.StartTimestampMs)
end = model.Time(query.EndTimestampMs)
minT = start
maxT = end

hints = &storage.SelectHints{
Start: query.StartTimestampMs,
End: query.EndTimestampMs,
Expand All @@ -352,9 +363,11 @@ func queryFromRemoteReadQuery(query *prompb.Query) (start, end model.Time, match
// the passed read hints are zero values (because unintentionally initialised but not set).
if query.Hints != nil && query.Hints.StartMs > 0 {
hints.Start = query.Hints.StartMs
minT = model.Time(hints.Start)
}
if query.Hints != nil && query.Hints.EndMs > 0 {
hints.End = query.Hints.EndMs
maxT = model.Time(hints.End)
}

return
Expand Down
59 changes: 41 additions & 18 deletions pkg/querier/remote_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func TestRemoteReadHandler_Samples(t *testing.T) {
query *prompb.Query
expectedQueriedStart int64
expectedQueriedEnd int64
expectedTimeseries []*prompb.TimeSeries
}{
"query without hints": {
query: &prompb.Query{
Expand All @@ -117,18 +118,44 @@ func TestRemoteReadHandler_Samples(t *testing.T) {
},
expectedQueriedStart: 1,
expectedQueriedEnd: 10,
expectedTimeseries: []*prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "foo", Value: "bar"},
},
Samples: []prompb.Sample{
{Value: 1, Timestamp: 1},
{Value: 2, Timestamp: 2},
{Value: 3, Timestamp: 3},
},
Histograms: []prompb.Histogram{
prom_remote.HistogramToHistogramProto(4, test.GenerateTestHistogram(4)),
},
},
},
},
"query with hints": {
query: &prompb.Query{
StartTimestampMs: 1,
EndTimestampMs: 10,
Hints: &prompb.ReadHints{
StartMs: 2,
EndMs: 9,
EndMs: 3,
},
},
expectedQueriedStart: 2,
expectedQueriedEnd: 9,
expectedQueriedEnd: 3,
expectedTimeseries: []*prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "foo", Value: "bar"},
},
Samples: []prompb.Sample{
{Value: 2, Timestamp: 2},
{Value: 3, Timestamp: 3},
},
},
},
},
}

Expand Down Expand Up @@ -179,21 +206,7 @@ func TestRemoteReadHandler_Samples(t *testing.T) {
expected := prompb.ReadResponse{
Results: []*prompb.QueryResult{
{
Timeseries: []*prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "foo", Value: "bar"},
},
Samples: []prompb.Sample{
{Value: 1, Timestamp: 1},
{Value: 2, Timestamp: 2},
{Value: 3, Timestamp: 3},
},
Histograms: []prompb.Histogram{
prom_remote.HistogramToHistogramProto(4, test.GenerateTestHistogram(4)),
},
},
},
Timeseries: queryData.expectedTimeseries,
},
},
}
Expand Down Expand Up @@ -682,6 +695,8 @@ func TestQueryFromRemoteReadQuery(t *testing.T) {
query *prompb.Query
expectedStart model.Time
expectedEnd model.Time
expectedMinT model.Time
expectedMaxT model.Time
expectedMatchers []*labels.Matcher
expectedHints *storage.SelectHints
}{
Expand All @@ -695,6 +710,8 @@ func TestQueryFromRemoteReadQuery(t *testing.T) {
},
expectedStart: 1000,
expectedEnd: 2000,
expectedMinT: 1000,
expectedMaxT: 2000,
expectedMatchers: []*labels.Matcher{{Type: labels.MatchEqual, Name: labels.MetricName, Value: "metric"}},
expectedHints: &storage.SelectHints{
Start: 1000,
Expand All @@ -715,6 +732,8 @@ func TestQueryFromRemoteReadQuery(t *testing.T) {
},
expectedStart: 1000,
expectedEnd: 2000,
expectedMinT: 500,
expectedMaxT: 1500,
expectedMatchers: []*labels.Matcher{{Type: labels.MatchEqual, Name: labels.MetricName, Value: "metric"}},
expectedHints: &storage.SelectHints{
Start: 500,
Expand All @@ -732,6 +751,8 @@ func TestQueryFromRemoteReadQuery(t *testing.T) {
},
expectedStart: 1000,
expectedEnd: 2000,
expectedMinT: 1000,
expectedMaxT: 2000,
expectedMatchers: []*labels.Matcher{{Type: labels.MatchEqual, Name: labels.MetricName, Value: "metric"}},
expectedHints: &storage.SelectHints{
// Fallback to start/end time range given the read hints are zero values.
Expand All @@ -743,10 +764,12 @@ func TestQueryFromRemoteReadQuery(t *testing.T) {

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
actualStart, actualEnd, actualMatchers, actualHints, err := queryFromRemoteReadQuery(testData.query)
actualStart, actualEnd, actualMinT, actualMaxT, actualMatchers, actualHints, err := queryFromRemoteReadQuery(testData.query)
require.NoError(t, err)
require.Equal(t, testData.expectedStart, actualStart)
require.Equal(t, testData.expectedEnd, actualEnd)
require.Equal(t, testData.expectedMinT, actualMinT)
require.Equal(t, testData.expectedMaxT, actualMaxT)
require.Equal(t, testData.expectedMatchers, actualMatchers)
require.Equal(t, testData.expectedHints, actualHints)
})
Expand Down
Loading