Skip to content

Commit

Permalink
Honor start/end in the remote read request hints (#8431)
Browse files Browse the repository at this point in the history
* Honor start/end in the remote read request hints

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Update CHANGELOG

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Add remote read hints test cases to TestQuerierRemoteRead integration test

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Add remote read hints test cases to TestQuerierStreamingRemoteRead integration test

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Reverted TestQuerierRemoteRead to main branch

Signed-off-by: Marco Pracucci <marco@pracucci.com>

---------

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Jun 21, 2024
1 parent 144a26a commit 6bcba2c
Show file tree
Hide file tree
Showing 11 changed files with 296 additions and 168 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* [CHANGE] Ruler: promote tenant federation from experimental to stable. #8400
* [CHANGE] Ruler: promote `-ruler.recording-rules-evaluation-enabled` and `-ruler.alerting-rules-evaluation-enabled` from experimental to stable. #8400
* [CHANGE] General: promote `-tenant-federation.max-tenants` from experimental to stable. #8400
* [CHANGE] Querier: honor the start/end time range specified in the read hints when executing a remote read request. #8431
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
Expand Down
2 changes: 1 addition & 1 deletion integration/backward_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func checkQueries(

for _, req := range remoteReadRequests {
t.Run(fmt.Sprintf("%s: remote read: %s", endpoint, req.metricName), func(t *testing.T) {
httpRes, result, _, err := c.RemoteRead(req.metricName, req.startTime, req.endTime)
httpRes, result, _, err := c.RemoteRead(remoteReadQueryByMetricName(req.metricName, req.startTime, req.endTime))
require.NoError(t, err)
require.Equal(t, http.StatusOK, httpRes.StatusCode)
require.NotNil(t, result)
Expand Down
27 changes: 4 additions & 23 deletions integration/e2emimir/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
promapi "github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/prompb" // OTLP protos are not compatible with gogo
"github.com/prometheus/prometheus/storage/remote"
Expand Down Expand Up @@ -256,18 +255,9 @@ func (c *Client) QueryRangeRaw(query string, start, end time.Time, step time.Dur
// RemoteRead uses samples streaming. See RemoteReadChunks as well for chunks streaming.
// RemoteRead returns the HTTP response with consumed body, the remote read protobuf response and an error.
// In case the response is not a protobuf, the plaintext body content is returned instead of the protobuf message.
func (c *Client) RemoteRead(metricName string, start, end time.Time) (_ *http.Response, _ *prompb.QueryResult, plaintextResponse []byte, _ error) {
func (c *Client) RemoteRead(query *prompb.Query) (_ *http.Response, _ *prompb.QueryResult, plaintextResponse []byte, _ error) {
req := &prompb.ReadRequest{
Queries: []*prompb.Query{{
Matchers: []*prompb.LabelMatcher{{Type: prompb.LabelMatcher_EQ, Name: labels.MetricName, Value: metricName}},
StartTimestampMs: start.UnixMilli(),
EndTimestampMs: end.UnixMilli(),
Hints: &prompb.ReadHints{
StepMs: 1,
StartMs: start.UnixMilli(),
EndMs: end.UnixMilli(),
},
}},
Queries: []*prompb.Query{query},
}
resp, err := c.doRemoteReadReq(req)
if err != nil {
Expand Down Expand Up @@ -300,18 +290,9 @@ func (c *Client) RemoteRead(metricName string, start, end time.Time) (_ *http.Re
// RemoteReadChunks uses chunks streaming. See RemoteRead as well for samples streaming.
// RemoteReadChunks returns the HTTP response with consumed body, the remote read protobuf response and an error.
// In case the response is not a protobuf, the plaintext body content is returned instead of the protobuf message.
func (c *Client) RemoteReadChunks(metricName string, start, end time.Time) (_ *http.Response, _ []prompb.ChunkedReadResponse, plaintextResponse []byte, _ error) {
func (c *Client) RemoteReadChunks(query *prompb.Query) (_ *http.Response, _ []prompb.ChunkedReadResponse, plaintextResponse []byte, _ error) {
req := &prompb.ReadRequest{
Queries: []*prompb.Query{{
Matchers: []*prompb.LabelMatcher{{Type: prompb.LabelMatcher_EQ, Name: labels.MetricName, Value: metricName}},
StartTimestampMs: start.UnixMilli(),
EndTimestampMs: end.UnixMilli(),
Hints: &prompb.ReadHints{
StepMs: 1,
StartMs: start.UnixMilli(),
EndMs: end.UnixMilli(),
},
}},
Queries: []*prompb.Query{query},
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS},
}

Expand Down
250 changes: 147 additions & 103 deletions integration/querier_remote_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package integration

import (
"math/rand"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -77,7 +76,7 @@ func runTestPushSeriesForQuerierRemoteRead(t *testing.T, c *e2emimir.Client, que

client, err := e2emimir.NewClient("", querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)
httpResp, resp, _, err := client.RemoteRead(seriesName, startMs, endMs)
httpResp, resp, _, err := client.RemoteRead(remoteReadQueryByMetricName(seriesName, startMs, endMs))
require.Equal(t, http.StatusOK, httpResp.StatusCode)
require.NoError(t, err)

Expand All @@ -97,126 +96,170 @@ func runTestPushSeriesForQuerierRemoteRead(t *testing.T, c *e2emimir.Client, que
}

func TestQuerierStreamingRemoteRead(t *testing.T) {
testCases := map[string]struct {
expectedValType chunkenc.ValueType
floats func(startMs, endMs int64) []prompb.Sample
histograms func(startMs, endMs int64) []prompb.Histogram
var (
now = time.Now().Truncate(time.Minute) // Make assertions easier to debug.
pushedStartMs = now.Add(-time.Minute).UnixMilli()
pushedEndMs = now.Add(time.Minute).UnixMilli()
pushedStepMs = int64(100) // Simulate a high frequency scraping (10Hz).
)

const (
floatMetricName = "series_float"
histogramMetricName = "series_histogram"
floatHistogramMetricName = "series_float_histogram"
)

tests := map[string]struct {
valType chunkenc.ValueType
metricName string
query *prompb.Query
expectedStartMs int64
expectedEndMs int64
}{
"float samples": {
expectedValType: chunkenc.ValFloat,
floats: func(startMs, endMs int64) []prompb.Sample {
var samples []prompb.Sample
for i := startMs; i < endMs; i++ {
samples = append(samples, prompb.Sample{
Value: rand.Float64(),
Timestamp: i,
})
}
return samples
"float samples, remote read request without hints": {
valType: chunkenc.ValFloat,
metricName: floatMetricName,
query: &prompb.Query{
Matchers: remoteReadQueryMatchersByMetricName(floatMetricName),
StartTimestampMs: pushedStartMs,
EndTimestampMs: pushedEndMs,
},
expectedStartMs: pushedStartMs,
expectedEndMs: pushedEndMs,
},
"histograms": {
expectedValType: chunkenc.ValHistogram,
histograms: func(startMs, endMs int64) []prompb.Histogram {
var hists []prompb.Histogram
for i := startMs; i < endMs; i++ {
h := test.GenerateTestHistogram(int(i))
hists = append(hists, remote.HistogramToHistogramProto(i, h))
}
return hists
"float samples, with hints time range equal to query time range": {
valType: chunkenc.ValFloat,
metricName: floatMetricName,
query: &prompb.Query{
Matchers: remoteReadQueryMatchersByMetricName(floatMetricName),
StartTimestampMs: pushedStartMs,
EndTimestampMs: pushedEndMs,
Hints: &prompb.ReadHints{
StartMs: pushedStartMs,
EndMs: pushedEndMs,
},
},
expectedStartMs: pushedStartMs,
expectedEndMs: pushedEndMs,
},
"float histograms": {
expectedValType: chunkenc.ValFloatHistogram,
histograms: func(startMs, endMs int64) []prompb.Histogram {
var hists []prompb.Histogram
for i := startMs; i < endMs; i++ {
h := test.GenerateTestFloatHistogram(int(i))
hists = append(hists, remote.FloatHistogramToHistogramProto(i, h))
}
return hists
"float samples, with hints time range different than query time range": {
valType: chunkenc.ValFloat,
metricName: floatMetricName,
query: &prompb.Query{
Matchers: remoteReadQueryMatchersByMetricName(floatMetricName),
StartTimestampMs: pushedStartMs,
EndTimestampMs: pushedEndMs,
Hints: &prompb.ReadHints{
StartMs: now.UnixMilli(),
EndMs: now.Add(10 * time.Second).UnixMilli(),
},
},
// Mimir doesn't cut returned chunks to the requested time range for performance reasons. This means
// that we get the entire chunks in output. TSDB targets to cut chunks once every 120 samples, but
// it's based on estimation and math is not super accurate. That's why we get these not-perfectly-aligned
// chunk ranges. However, they're consistent, so tests are stable.
expectedStartMs: now.Add(-1500 * time.Millisecond).UnixMilli(),
expectedEndMs: now.Add(10100 * time.Millisecond).UnixMilli(),
},
"histograms": {
valType: chunkenc.ValHistogram,
metricName: histogramMetricName,
query: remoteReadQueryByMetricName(histogramMetricName, time.UnixMilli(pushedStartMs), time.UnixMilli(pushedEndMs)),
expectedStartMs: pushedStartMs,
expectedEndMs: pushedEndMs,
},
"float histograms": {
valType: chunkenc.ValFloatHistogram,
metricName: floatHistogramMetricName,
query: remoteReadQueryByMetricName(floatHistogramMetricName, time.UnixMilli(pushedStartMs), time.UnixMilli(pushedEndMs)),
expectedStartMs: pushedStartMs,
expectedEndMs: pushedEndMs,
},
}

for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := mergeFlags(BlocksStorageFlags(), BlocksStorageS3Flags(), map[string]string{
"-distributor.ingestion-rate-limit": "1048576",
"-distributor.ingestion-burst-size": "1048576",
"-distributor.remote-timeout": "10s",
})
flags := mergeFlags(BlocksStorageFlags(), BlocksStorageS3Flags(), map[string]string{
"-distributor.ingestion-rate-limit": "1048576",
"-distributor.ingestion-burst-size": "1048576",
"-distributor.remote-timeout": "10s",

// Start dependencies.
minio := e2edb.NewMinio(9000, blocksBucketName)
// This test writes samples sparse in time. We don't want compaction to trigger while testing.
"-blocks-storage.tsdb.block-ranges-period": "2h",
})

consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(minio, consul))
// Start dependencies.
minio := e2edb.NewMinio(9000, blocksBucketName)
consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(minio, consul))

// Start Mimir components for the write path.
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
// Start Mimir components for the write path.
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester))

// Wait until the distributor has updated the ring.
// The distributor should have 512 tokens for the ingester ring and 1 for the distributor ring
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512+1), "cortex_ring_tokens_total"))
// Wait until the distributor has updated the ring.
// The distributor should have 512 tokens for the ingester ring and 1 for the distributor ring
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512+1), "cortex_ring_tokens_total"))

querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(querier))
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(querier))

// Wait until the querier has updated the ring.
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
// Wait until the querier has updated the ring.
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Push a series to Mimir.
now := time.Now()
c, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1")
c.SetTimeout(10 * time.Second)
require.NoError(t, err)

c, err := e2emimir.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
c.SetTimeout(10 * time.Second)
require.NoError(t, err)
// Generate all the samples and histograms.
var floats []prompb.Sample
var histograms []prompb.Histogram
var floatHistograms []prompb.Histogram

// Generate the series
startMs := now.Add(-time.Minute)
endMs := now.Add(time.Minute)
for ts := pushedStartMs; ts < pushedEndMs; ts += pushedStepMs {
floats = append(floats, prompb.Sample{Value: float64(ts), Timestamp: ts})
histograms = append(histograms, remote.HistogramToHistogramProto(ts, test.GenerateTestHistogram(int(ts))))
floatHistograms = append(floatHistograms, remote.FloatHistogramToHistogramProto(ts, test.GenerateTestFloatHistogram(int(ts))))
}

var samples []prompb.Sample
if tc.floats != nil {
samples = tc.floats(startMs.UnixMilli(), endMs.UnixMilli())
}
var histograms []prompb.Histogram
if tc.histograms != nil {
histograms = tc.histograms(startMs.UnixMilli(), endMs.UnixMilli())
}
// Generate the series.
seriesToPush := []prompb.TimeSeries{
{
Labels: []prompb.Label{{Name: labels.MetricName, Value: floatMetricName}},
Samples: floats,
}, {
Labels: []prompb.Label{{Name: labels.MetricName, Value: histogramMetricName}},
Histograms: histograms,
}, {
Labels: []prompb.Label{{Name: labels.MetricName, Value: floatHistogramMetricName}},
Histograms: floatHistograms,
},
}

var series []prompb.TimeSeries
series = append(series, prompb.TimeSeries{
Labels: []prompb.Label{
{Name: labels.MetricName, Value: "series_1"},
},
Samples: samples,
Histograms: histograms,
})
// Push a series to Mimir.
res, err := c.Push(seriesToPush)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
expectedSamples := filterSamplesByTimestamp(floats, testData.expectedStartMs, testData.expectedEndMs)
expectedHistograms := filterHistogramsByTimestamp(histograms, testData.expectedStartMs, testData.expectedEndMs)
expectedFloatHistograms := filterHistogramsByTimestamp(floatHistograms, testData.expectedStartMs, testData.expectedEndMs)

client, err := e2emimir.NewClient("", querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)
httpResp, results, _, err := client.RemoteReadChunks("series_1", startMs, endMs)
httpResp, results, _, err := c.RemoteReadChunks(testData.query)
require.Equal(t, http.StatusOK, httpResp.StatusCode)
require.NoError(t, err)

// Validate the returned remote read data
sampleIdx := 0
for _, result := range results {
// We're only expected a single series `series_1`.
// We expect only 1 series.
require.Len(t, result.ChunkedSeries, 1)
require.Equal(t, "series_1", result.ChunkedSeries[0].Labels[0].GetValue())
require.Equal(t, testData.metricName, result.ChunkedSeries[0].Labels[0].GetValue())

for _, rawChk := range result.ChunkedSeries[0].Chunks {
var enc chunkenc.Encoding
Expand All @@ -237,23 +280,23 @@ func TestQuerierStreamingRemoteRead(t *testing.T) {
chkItr := chk.Iterator(nil)
chkIdx := 0
for valType := chkItr.Next(); valType != chunkenc.ValNone; valType = chkItr.Next() {
require.Equal(t, tc.expectedValType, valType)
require.Equal(t, testData.valType, valType)
switch valType {
case chunkenc.ValFloat:
ts, val := chkItr.At()
require.Equal(t, samples[sampleIdx].Timestamp, ts)
require.Equal(t, samples[sampleIdx].Value, val)
require.Equalf(t, expectedSamples[sampleIdx].Timestamp, ts, "index: %d", sampleIdx)
require.Equalf(t, expectedSamples[sampleIdx].Value, val, "index: %d", sampleIdx)
case chunkenc.ValHistogram:
ts, h := chkItr.AtHistogram(nil)
require.Equal(t, histograms[sampleIdx].Timestamp, ts)
require.Equalf(t, expectedHistograms[sampleIdx].Timestamp, ts, "index: %d", sampleIdx)

expected := remote.HistogramProtoToHistogram(histograms[sampleIdx])
expected := remote.HistogramProtoToHistogram(expectedHistograms[sampleIdx])
test.RequireHistogramEqual(t, expected, h)
case chunkenc.ValFloatHistogram:
ts, fh := chkItr.AtFloatHistogram(nil)
require.Equal(t, histograms[sampleIdx].Timestamp, ts)
require.Equalf(t, expectedFloatHistograms[sampleIdx].Timestamp, ts, "index: %d", sampleIdx)

expected := remote.FloatHistogramProtoToFloatHistogram(histograms[sampleIdx])
expected := remote.FloatHistogramProtoToFloatHistogram(expectedFloatHistograms[sampleIdx])
test.RequireFloatHistogramEqual(t, expected, fh)
default:
require.Fail(t, "unrecognized value type")
Expand All @@ -264,11 +307,12 @@ func TestQuerierStreamingRemoteRead(t *testing.T) {
}
}

if samples != nil {
require.Len(t, samples, sampleIdx)
} else if histograms != nil {
require.Len(t, histograms, sampleIdx)
if expectedSamples != nil {
require.Len(t, expectedSamples, sampleIdx)
} else if expectedHistograms != nil {
require.Len(t, expectedHistograms, sampleIdx)
}
})
}

}
Loading

0 comments on commit 6bcba2c

Please sign in to comment.