Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Parse programmatic min/max time as [start_retention, end_retention) #2487

Merged
merged 4 commits into from
Jul 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions scripts/docker-integration-tests/prometheus/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,29 @@ function test_query_restrict_tags {
'[[ $(curl -s 0.0.0.0:7201/api/v1/query?query=\\{restricted_metrics_type=\"hidden\"\\} | jq -r ".data.result | length") -eq 0 ]]'
}

function test_series {
# Test series search with start/end specified
ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_succeeded_samples_total&start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 1 ]]'

# Test series search with no start/end specified
ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_succeeded_samples_total" | jq -r ".data | length") -eq 1 ]]'

# Test series search with min/max start time using the Prometheus Go
# min/max formatted timestamps, which is sent as part of a Prometheus
# remote query.
# minTime = time.Unix(math.MinInt64/1000+62135596801, 0).UTC()
# maxTime = time.Unix(math.MaxInt64/1000-62135596801, 999999999).UTC()
# minTimeFormatted = minTime.Format(time.RFC3339Nano)
# maxTimeFormatted = maxTime.Format(time.RFC3339Nano)
# Which:
# minTimeFormatted="-292273086-05-16T16:47:06Z"
# maxTimeFormatted="292277025-08-18T07:12:54.999999999Z"
ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_succeeded_samples_total&start=-292273086-05-16T16:47:06Z&end=292277025-08-18T07:12:54.999999999Z" | jq -r ".data | length") -eq 1 ]]'
}

echo "Running prometheus tests"
test_prometheus_remote_read
test_prometheus_remote_write_multi_namespaces
Expand All @@ -316,6 +339,7 @@ test_query_limits_applied
test_query_restrict_metrics_type
test_query_restrict_tags
test_prometheus_remote_write_map_tags
test_series

echo "Running function correctness tests"
test_correctness
Expand Down
40 changes: 28 additions & 12 deletions src/query/api/v1/handler/prom/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,56 @@ import (
"go.uber.org/zap/zapcore"
)

type mockQuerier struct{}
type mockQuerier struct {
mockOptions
}

type mockSeriesSet struct {
mockOptions
promstorage.SeriesSet
}

func (m mockSeriesSet) Next() bool { return false }
func (m mockSeriesSet) At() promstorage.Series { return nil }
func (m mockSeriesSet) Err() error { return nil }
func (m *mockSeriesSet) Next() bool { return false }
func (m *mockSeriesSet) At() promstorage.Series { return nil }
func (m *mockSeriesSet) Err() error { return nil }

func (mockQuerier) Select(
func (q *mockQuerier) Select(
sortSeries bool,
hints *promstorage.SelectHints,
labelMatchers ...*labels.Matcher,
) (promstorage.SeriesSet, promstorage.Warnings, error) {
return mockSeriesSet{}, nil, nil
if q.mockOptions.selectFn != nil {
return q.mockOptions.selectFn(sortSeries, hints, labelMatchers...)
}
return &mockSeriesSet{mockOptions: q.mockOptions}, nil, nil
}

func (mockQuerier) LabelValues(name string) ([]string, promstorage.Warnings, error) {
func (*mockQuerier) LabelValues(name string) ([]string, promstorage.Warnings, error) {
return nil, nil, errors.New("not implemented")
}

func (mockQuerier) LabelNames() ([]string, promstorage.Warnings, error) {
func (*mockQuerier) LabelNames() ([]string, promstorage.Warnings, error) {
return nil, nil, errors.New("not implemented")
}

func (mockQuerier) Close() error {
func (*mockQuerier) Close() error {
return nil
}

type mockQueryable struct{}
type mockOptions struct {
selectFn func(
sortSeries bool,
hints *promstorage.SelectHints,
labelMatchers ...*labels.Matcher,
) (promstorage.SeriesSet, promstorage.Warnings, error)
}

type mockQueryable struct {
mockOptions
}

func (mockQueryable) Querier(_ context.Context, _, _ int64) (promstorage.Querier, error) {
return mockQuerier{}, nil
func (q *mockQueryable) Querier(_ context.Context, _, _ int64) (promstorage.Querier, error) {
return &mockQuerier{mockOptions: q.mockOptions}, nil
}

func newMockPromQLEngine() *promql.Engine {
Expand Down
58 changes: 6 additions & 52 deletions src/query/api/v1/handler/prom/read_instant.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,15 @@ package prom
import (
"context"
"fmt"
"math"
"net/http"
"strconv"
"time"

"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/storage/prometheus"
"github.com/m3db/m3/src/query/util"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
promstorage "github.com/prometheus/prometheus/storage"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -66,16 +64,10 @@ func newReadInstantHandler(
}

func (h *readInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var ts time.Time
if t := r.FormValue("time"); t != "" {
var err error
ts, err = parseTime(t)
if err != nil {
respondError(w, err, http.StatusBadRequest)
return
}
} else {
ts = time.Now()
ts, err := util.ParseTimeStringWithDefault(r.FormValue("time"), time.Now())
if err != nil {
respondError(w, err, http.StatusBadRequest)
return
}

fetchOptions, fetchErr := h.hOpts.FetchOptionsBuilder().NewFetchOptions(r)
Expand All @@ -93,7 +85,7 @@ func (h *readInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx = context.WithValue(ctx, prometheus.BlockResultMetadataKey, &resultMetadata)

if t := r.FormValue("timeout"); t != "" {
timeout, err := parseDuration(t)
timeout, err := util.ParseDurationString(t)
if err != nil {
err = fmt.Errorf("invalid parameter 'timeout': %v", err)
respondError(w, err, http.StatusBadRequest)
Expand Down Expand Up @@ -130,41 +122,3 @@ func (h *readInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ResultType: res.Value.Type(),
}, res.Warnings)
}

func parseTime(s string) (time.Time, error) {
if t, err := strconv.ParseFloat(s, 64); err == nil {
s, ns := math.Modf(t)
ns = math.Round(ns*1000) / 1000
return time.Unix(int64(s), int64(ns*float64(time.Second))), nil
}
if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
return t, nil
}

// Stdlib's time parser can only handle 4 digit years. As a workaround until
// that is fixed we want to at least support our own boundary times.
// Context: https://github.com/prometheus/client_golang/issues/614
// Upstream issue: https://github.com/golang/go/issues/20555
switch s {
case minTimeFormatted:
return time.Time{}, nil
case maxTimeFormatted:
return time.Now(), nil
}

return time.Time{}, fmt.Errorf("cannot parse %q to a valid timestamp", s)
}

func parseDuration(s string) (time.Duration, error) {
if d, err := strconv.ParseFloat(s, 64); err == nil {
ts := d * float64(time.Second)
if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) {
return 0, fmt.Errorf("cannot parse %q to a valid duration. It overflows int64", s)
}
return time.Duration(ts), nil
}
if d, err := model.ParseDuration(s); err == nil {
return time.Duration(d), nil
}
return 0, fmt.Errorf("cannot parse %q to a valid duration", s)
}
112 changes: 111 additions & 1 deletion src/query/api/v1/handler/prom/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package prom

import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
Expand All @@ -34,6 +35,8 @@ import (
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/executor"
"github.com/m3db/m3/src/x/instrument"
"github.com/prometheus/prometheus/pkg/labels"
promstorage "github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/require"
)

Expand All @@ -42,6 +45,7 @@ const promQuery = `http_requests_total{job="prometheus",group="canary"}`
var testPromQLEngine = newMockPromQLEngine()

type testHandlers struct {
queryable *mockQueryable
readHandler http.Handler
readInstantHandler http.Handler
}
Expand All @@ -66,10 +70,11 @@ func setupTest(t *testing.T) testHandlers {
SetFetchOptionsBuilder(fetchOptsBuilder).
SetEngine(engine).
SetTimeoutOpts(timeoutOpts)
queryable := mockQueryable{}
queryable := &mockQueryable{}
readHandler := newReadHandler(opts, hOpts, queryable)
readInstantHandler := newReadInstantHandler(opts, hOpts, queryable)
return testHandlers{
queryable: queryable,
readHandler: readHandler,
readInstantHandler: readInstantHandler,
}
Expand Down Expand Up @@ -149,3 +154,108 @@ func TestPromReadInstantHandlerInvalidQuery(t *testing.T) {
require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &resp))
require.Equal(t, statusError, resp.Status)
}

func TestPromReadInstantHandlerParseMinTime(t *testing.T) {
setup := setupTest(t)

var (
query *promstorage.SelectHints
selects int
)
setup.queryable.selectFn = func(
sortSeries bool,
hints *promstorage.SelectHints,
labelMatchers ...*labels.Matcher,
) (promstorage.SeriesSet, promstorage.Warnings, error) {
selects++
query = hints
return &mockSeriesSet{}, nil, nil
}

req, _ := http.NewRequest("GET", native.PromReadInstantURL, nil)
params := defaultParams()
params.Set("time", minTimeFormatted)
req.URL.RawQuery = params.Encode()

var resp response
recorder := httptest.NewRecorder()

setup.readInstantHandler.ServeHTTP(recorder, req)

require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &resp))
require.Equal(t, statusSuccess, resp.Status)

require.Equal(t, 1, selects)

fudge := 5 * time.Minute // Need to account for lookback
expected := time.Unix(0, 0)
actual := millisTime(query.Start)
require.True(t, abs(expected.Sub(actual)) <= fudge,
fmt.Sprintf("expected=%v, actual=%v, fudge=%v, delta=%v",
expected, actual, fudge, expected.Sub(actual)))

fudge = 5 * time.Minute // Need to account for lookback
expected = time.Unix(0, 0)
actual = millisTime(query.Start)
require.True(t, abs(expected.Sub(actual)) <= fudge,
fmt.Sprintf("expected=%v, actual=%v, fudge=%v, delta=%v",
expected, actual, fudge, expected.Sub(actual)))
}

func TestPromReadInstantHandlerParseMaxTime(t *testing.T) {
setup := setupTest(t)

var (
query *promstorage.SelectHints
selects int
)
setup.queryable.selectFn = func(
sortSeries bool,
hints *promstorage.SelectHints,
labelMatchers ...*labels.Matcher,
) (promstorage.SeriesSet, promstorage.Warnings, error) {
selects++
query = hints
return &mockSeriesSet{}, nil, nil
}

req, _ := http.NewRequest("GET", native.PromReadInstantURL, nil)
params := defaultParams()
params.Set("time", maxTimeFormatted)
req.URL.RawQuery = params.Encode()

var resp response
recorder := httptest.NewRecorder()

setup.readInstantHandler.ServeHTTP(recorder, req)

require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &resp))
require.Equal(t, statusSuccess, resp.Status)

require.Equal(t, 1, selects)

fudge := 6 * time.Minute // Need to account for lookback + time.Now() skew
expected := time.Now()
actual := millisTime(query.Start)
require.True(t, abs(expected.Sub(actual)) <= fudge,
fmt.Sprintf("expected=%v, actual=%v, fudge=%v, delta=%v",
expected, actual, fudge, expected.Sub(actual)))

fudge = 6 * time.Minute // Need to account for lookback + time.Now() skew
expected = time.Now()
actual = millisTime(query.Start)
require.True(t, abs(expected.Sub(actual)) <= fudge,
fmt.Sprintf("expected=%v, actual=%v, fudge=%v, delta=%v",
expected, actual, fudge, expected.Sub(actual)))
}

func abs(v time.Duration) time.Duration {
if v < 0 {
return v * -1
}
return v
}

func millisTime(timestampMilliseconds int64) time.Time {
return time.Unix(0, timestampMilliseconds*int64(time.Millisecond))
}
24 changes: 8 additions & 16 deletions src/query/api/v1/handler/prometheus/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,14 @@ func ParseTagCompletionParamsToQueries(
r *http.Request,
) (TagCompletionQueries, *xhttp.ParseError) {
tagCompletionQueries := TagCompletionQueries{}
start, err := parseTimeWithDefault(r, "start", time.Time{})
start, err := util.ParseTimeStringWithDefault(r.FormValue("start"),
time.Unix(0, 0))
if err != nil {
return tagCompletionQueries, xhttp.NewParseError(err, http.StatusBadRequest)
}

end, err := parseTimeWithDefault(r, "end", time.Now())
end, err := util.ParseTimeStringWithDefault(r.FormValue("end"),
time.Now())
if err != nil {
return tagCompletionQueries, xhttp.NewParseError(err, http.StatusBadRequest)
}
Expand Down Expand Up @@ -218,18 +220,6 @@ func parseTagCompletionQueries(r *http.Request) ([]string, error) {
return queries, nil
}

func parseTimeWithDefault(
r *http.Request,
key string,
defaultTime time.Time,
) (time.Time, error) {
if t := r.FormValue(key); t != "" {
return util.ParseTimeString(t)
}

return defaultTime, nil
}

// ParseSeriesMatchQuery parses all params from the GET request.
func ParseSeriesMatchQuery(
r *http.Request,
Expand All @@ -241,12 +231,14 @@ func ParseSeriesMatchQuery(
return nil, xhttp.NewParseError(errors.ErrInvalidMatchers, http.StatusBadRequest)
}

start, err := parseTimeWithDefault(r, "start", time.Time{})
start, err := util.ParseTimeStringWithDefault(r.FormValue("start"),
time.Unix(0, 0))
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

end, err := parseTimeWithDefault(r, "end", time.Now())
end, err := util.ParseTimeStringWithDefault(r.FormValue("end"),
time.Now())
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
Expand Down
Loading