Skip to content

Commit

Permalink
Removed read instant handler.
Browse files Browse the repository at this point in the history
  • Loading branch information
soundvibe committed Nov 19, 2020
1 parent 1031323 commit 4dc973a
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 183 deletions.
6 changes: 3 additions & 3 deletions src/query/api/v1/handler/prom/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const (

type errorType string

type queryData struct {
type QueryData struct {
ResultType promql.ValueType `json:"resultType"`
Result promql.Value `json:"result"`
}
Expand All @@ -66,7 +66,7 @@ type response struct {
Warnings []string `json:"warnings,omitempty"`
}

func respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warnings) {
func Respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warnings) {
statusMessage := statusSuccess
var warningStrings []string
for _, warning := range warnings {
Expand All @@ -88,7 +88,7 @@ func respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warni
w.Write(b)
}

func respondError(w http.ResponseWriter, err error) {
func RespondError(w http.ResponseWriter, err error) {
json := jsoniter.ConfigCompatibleWithStandardLibrary
b, marshalErr := json.Marshal(&response{
Status: statusError,
Expand Down
19 changes: 9 additions & 10 deletions src/query/api/v1/handler/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ func init() {
// Options defines options for PromQL handler.
type Options struct {
PromQLEngine *promql.Engine
instant bool
}

func (o Options) WithInstant(instant bool) Options {
return Options{
PromQLEngine: o.PromQLEngine,
instant: instant,
}
}

// NewReadHandler creates a handler to handle PromQL requests.
Expand All @@ -59,17 +67,8 @@ func NewReadHandlerWithHooks(
Storage: hOpts.Storage(),
InstrumentOptions: hOpts.InstrumentOpts(),
})
return newReadHandler(opts, hOpts, hooks, queryable)
}

// NewReadInstantHandler creates a handler to handle PromQL requests.
func NewReadInstantHandler(opts Options, hOpts options.HandlerOptions) http.Handler {
queryable := prometheus.NewPrometheusQueryable(
prometheus.PrometheusOptions{
Storage: hOpts.Storage(),
InstrumentOptions: hOpts.InstrumentOpts(),
})
return newReadInstantHandler(opts, hOpts, queryable)
return newReadHandler(opts, hOpts, hooks, queryable)
}

// ApplyRangeWarnings applies warnings encountered during execution.
Expand Down
100 changes: 70 additions & 30 deletions src/query/api/v1/handler/prom/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,47 @@ type ReadHandlerHooks interface {
) (models.RequestParams, error)
}

type NewQueryFn func(
engine *promql.Engine,
queryable promstorage.Queryable,
params models.RequestParams,
) (promql.Query, error)

var (
newRangeQueryFn = func(
engine *promql.Engine,
queryable promstorage.Queryable,
params models.RequestParams,
) (promql.Query, error) {
return engine.NewRangeQuery(
queryable,
params.Query,
params.Start,
params.End,
params.Step)
}

newInstantQueryFn = func(
engine *promql.Engine,
queryable promstorage.Queryable,
params models.RequestParams,
) (promql.Query, error) {
return engine.NewInstantQuery(
queryable,
params.Query,
params.Now)
}
)

type readHandler struct {
engine *promql.Engine
hooks ReadHandlerHooks
queryable promstorage.Queryable
hOpts options.HandlerOptions
scope tally.Scope
logger *zap.Logger
engine *promql.Engine
hooks ReadHandlerHooks
queryable promstorage.Queryable
newQueryFn NewQueryFn
hOpts options.HandlerOptions
scope tally.Scope
logger *zap.Logger
opts Options
}

func newReadHandler(
Expand All @@ -68,13 +102,21 @@ func newReadHandler(
scope := hOpts.InstrumentOpts().MetricsScope().Tagged(
map[string]string{"handler": "prometheus-read"},
)

newQueryFn := newRangeQueryFn
if opts.instant {
newQueryFn = newInstantQueryFn
}

return &readHandler{
engine: opts.PromQLEngine,
hooks: hooks,
queryable: queryable,
hOpts: hOpts,
scope: scope,
logger: hOpts.InstrumentOpts().Logger(),
engine: opts.PromQLEngine,
hooks: hooks,
queryable: queryable,
newQueryFn: newQueryFn,
hOpts: hOpts,
opts: opts,
scope: scope,
logger: hOpts.InstrumentOpts().Logger(),
}
}

Expand All @@ -83,19 +125,19 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

fetchOptions, err := h.hOpts.FetchOptionsBuilder().NewFetchOptions(r)
if err != nil {
respondError(w, err)
RespondError(w, err)
return
}

request, err := native.ParseRequest(ctx, r, false, h.hOpts)
request, err := native.ParseRequest(ctx, r, h.opts.instant, h.hOpts)
if err != nil {
respondError(w, err)
RespondError(w, err)
return
}

params, err := h.hooks.OnParsedRequest(ctx, r, request.Params)
if err != nil {
respondError(w, err)
RespondError(w, err)
return
}

Expand All @@ -112,25 +154,22 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer cancel()
}

qry, err := h.engine.NewRangeQuery(
h.queryable,
params.Query,
params.Start,
params.End,
params.Step)
qry, err := h.newQueryFn(h.engine, h.queryable, params)
if err != nil {
h.logger.Error("error creating range query",
zap.Error(err), zap.String("query", params.Query))
respondError(w, err)
h.logger.Error("error creating query",
zap.Error(err), zap.String("query", params.Query),
zap.Bool("instant", h.opts.instant))
RespondError(w, err)
return
}
defer qry.Close()

res := qry.Exec(ctx)
if res.Err != nil {
h.logger.Error("error executing range query",
zap.Error(res.Err), zap.String("query", params.Query))
respondError(w, res.Err)
h.logger.Error("error executing query",
zap.Error(res.Err), zap.String("query", params.Query),
zap.Bool("instant", h.opts.instant))
RespondError(w, res.Err)
return
}

Expand All @@ -142,11 +181,12 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
err = ApplyRangeWarnings(query, &resultMetadata)
if err != nil {
h.logger.Warn("error applying range warnings",
zap.Error(err), zap.String("query", query))
zap.Error(err), zap.String("query", query),
zap.Bool("instant", h.opts.instant))
}

handleroptions.AddWarningHeaders(w, resultMetadata)
respond(w, &queryData{
Respond(w, &QueryData{
Result: res.Value,
ResultType: res.Value.Type(),
}, res.Warnings)
Expand Down
138 changes: 0 additions & 138 deletions src/query/api/v1/handler/prom/read_instant.go

This file was deleted.

4 changes: 3 additions & 1 deletion src/query/api/v1/handler/prom/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/m3db/m3/src/query/executor"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/instrument"

"github.com/prometheus/prometheus/pkg/labels"
promstorage "github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -78,7 +79,8 @@ func setupTest(_ *testing.T) testHandlers {
SetTimeoutOpts(timeoutOpts)
queryable := &mockQueryable{}
readHandler := newReadHandler(opts, hOpts, &noopReadHandlerHooks{}, queryable)
readInstantHandler := newReadInstantHandler(opts, hOpts, queryable)
readInstantHandler := newReadHandler(opts.WithInstant(true), hOpts,
&noopReadHandlerHooks{}, queryable)
return testHandlers{
queryable: queryable,
readHandler: readHandler,
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (h *Handler) RegisterRoutes() error {
PromQLEngine: h.options.PrometheusEngine(),
}
promqlQueryHandler := wrapped(prom.NewReadHandler(opts, nativeSourceOpts))
promqlInstantQueryHandler := wrapped(prom.NewReadInstantHandler(opts, nativeSourceOpts))
promqlInstantQueryHandler := wrapped(prom.NewReadHandler(opts.WithInstant(true), nativeSourceOpts))
nativePromReadHandler := wrapped(native.NewPromReadHandler(nativeSourceOpts))
nativePromReadInstantHandler := wrapped(native.NewPromReadInstantHandler(nativeSourceOpts))

Expand Down

0 comments on commit 4dc973a

Please sign in to comment.