Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prom read instant handler refactoring #2928

Merged
merged 14 commits into from
Nov 24, 2020
9 changes: 6 additions & 3 deletions src/query/api/v1/handler/prom/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ const (

type errorType string

type queryData struct {
// QueryData struct to be used when responding from HTTP handler.
type QueryData struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, it doesn't look like queryData needs to be public, at least in the scope of this PR (ditto for Respond and RespondError). Am I missing something?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we can now override default handlers, we might need these for such use-cases.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should take a bit more care when exposing things. For example, there seems to be a duplicate of this struct in prometheus/response.go, which doesn't surface types from prometheus repository. Maybe we should pick one of those to be public, and not both?

type data struct {
// ResultType is the type of Result (matrix, vector, etc.).
ResultType string
// Result contains the query result (concrete type depends on ResultType).
Result result
}
type result interface {
matches(other result) (MatchInformation, error)
}
// MatrixResult contains a list matrixRow.
type MatrixResult struct {
Result []matrixRow `json:"result"`
}
// VectorResult contains a list of vectorItem.
type VectorResult struct {
Result []vectorItem `json:"result"`
}
// ScalarResult is the scalar Value for the response.
type ScalarResult struct {
Result Value `json:"result"`
}
// StringResult is the string Value for the response.
type StringResult struct {
Result Value `json:"result"`
}

Similar question with Respond and RespondError – are these the methods we want to have exposed? Aren't there better candidates?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

response is not made public in this PR so no changes there.
As for Respond and RespondError, I don't think exposing them would make any harm. They do not belong to any class and could be reused by custom handlers without a need to copy the same code (as we are currently doing in our private codebase).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Need to add comment to exported types // QueryData is ....

ResultType promql.ValueType `json:"resultType"`
Result promql.Value `json:"result"`
}
Expand All @@ -66,7 +67,8 @@ type response struct {
Warnings []string `json:"warnings,omitempty"`
}

func respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warnings) {
// Responds with HTTP OK status code and writes response JSON to response body.
func Respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warnings) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Need to add comment to exported types // Respond ....

statusMessage := statusSuccess
var warningStrings []string
for _, warning := range warnings {
Expand All @@ -88,7 +90,8 @@ func respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warni
w.Write(b)
}

func respondError(w http.ResponseWriter, err error) {
// Responds with error status code and writes error JSON to response body.
func RespondError(w http.ResponseWriter, err error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Need to add comment to exported types // RespondError ....

json := jsoniter.ConfigCompatibleWithStandardLibrary
b, marshalErr := json.Marshal(&response{
Status: statusError,
Expand Down
58 changes: 44 additions & 14 deletions src/query/api/v1/handler/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/graphite/errors"
"github.com/m3db/m3/src/query/storage/prometheus"

"github.com/prometheus/prometheus/promql"
Expand All @@ -38,29 +39,58 @@ func init() {
promql.SetDefaultEvaluationInterval(time.Minute)
}

// Options defines options for PromQL handler.
type Options struct {
PromQLEngine *promql.Engine
// opts defines options for PromQL handler.
type opts struct {
promQLEngine *promql.Engine
instant bool
newQueryFn NewQueryFn
}

// NewReadHandler creates a handler to handle PromQL requests.
func NewReadHandler(opts Options, hOpts options.HandlerOptions) http.Handler {
queryable := prometheus.NewPrometheusQueryable(
prometheus.PrometheusOptions{
Storage: hOpts.Storage(),
InstrumentOptions: hOpts.InstrumentOpts(),
})
return newReadHandler(opts, hOpts, queryable)
// Option is a Prometheus handler option.
type Option func(*opts) error

// WithEngine sets the PromQL engine.
func WithEngine(promQLEngine *promql.Engine) Option {
return withEngine(promQLEngine, false)
}

// WithInstantEngine sets the PromQL instant engine.
func WithInstantEngine(promQLEngine *promql.Engine) Option {
return withEngine(promQLEngine, true)
}

func withEngine(promQLEngine *promql.Engine, instant bool) Option {
return func(o *opts) error {
if promQLEngine == nil {
return errors.New("invalid engine")
}
o.instant = instant
o.promQLEngine = promQLEngine
o.newQueryFn = newRangeQueryFn
if instant {
o.newQueryFn = newInstantQueryFn
}
return nil
}
}

// NewReadInstantHandler creates a handler to handle PromQL requests.
func NewReadInstantHandler(opts Options, hOpts options.HandlerOptions) http.Handler {
func newDefaultOptions(hOpts options.HandlerOptions) opts {
return opts{
promQLEngine: hOpts.PrometheusEngine(),
instant: false,
newQueryFn: newRangeQueryFn,
}
Copy link
Collaborator

@robskillington robskillington Nov 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to do parameterized options for this change, we can simplify this a bit (also note the WithFoo exported symbols need comments):

type opts struct {
  promQLEngine *promql.Engine
  instant      bool
  newQueryFn   NewQueryFn
}

func newDefaultOptions(hOpts options.HandlerOptions) opts {
  return opts{
    promQLEngine: hOpts.PrometheusEngine(),
    instant:      false,
    newQueryFn:   newRangeQueryFn,
  }
}

// Option is a Prometheus handler option.
type Option func(*opts)

// WithEngine sets the PromQL engine.
func WithEngine(promQLEngine *promql.Engine) Option {
  return withEngine(promQLEngine, false)
}

// WithInstantEngine sets the PromQL instant engine.
func WithInstantEngine(promQLEngine *promql.Engine) Option {
  return withEngine(promQLEngine, true)
}

func withEngine(promQLEngine *promql.Engine, instant bool) Option {
  return func(o *opts) error {
    if promQLEngine == nil {
      return errors.New("invalid engine")
    }
    o.instant = instant
    o.promQLEngine = promQLEngine
    o.newQueryFn = newRangeQueryFn
    if instant {
      o.newQueryFn = newInstantQueryFn
    }
    return nil
  }
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks much simpler to me 👍🏻

}

// NewReadHandler creates a handler to handle PromQL requests.
func NewReadHandler(hOpts options.HandlerOptions, options ...Option) (http.Handler, error) {
queryable := prometheus.NewPrometheusQueryable(
prometheus.PrometheusOptions{
Storage: hOpts.Storage(),
InstrumentOptions: hOpts.InstrumentOpts(),
})
return newReadInstantHandler(opts, hOpts, queryable)

return newReadHandler(hOpts, queryable, options...)
}

// ApplyRangeWarnings applies warnings encountered during execution.
Expand Down
109 changes: 76 additions & 33 deletions src/query/api/v1/handler/prom/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/native"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage/prometheus"
xerrors "github.com/m3db/m3/src/x/errors"

Expand All @@ -40,94 +41,136 @@ import (
"go.uber.org/zap"
)

// NewQueryFn creates a new promql Query.
type NewQueryFn func(
engine *promql.Engine,
queryable promstorage.Queryable,
params models.RequestParams,
) (promql.Query, error)

var (
newRangeQueryFn = func(
engine *promql.Engine,
queryable promstorage.Queryable,
params models.RequestParams,
) (promql.Query, error) {
return engine.NewRangeQuery(
queryable,
params.Query,
params.Start,
params.End,
params.Step)
}

newInstantQueryFn = func(
engine *promql.Engine,
queryable promstorage.Queryable,
params models.RequestParams,
) (promql.Query, error) {
return engine.NewInstantQuery(
queryable,
params.Query,
params.Now)
}
vpranckaitis marked this conversation as resolved.
Show resolved Hide resolved
)

type readHandler struct {
engine *promql.Engine
queryable promstorage.Queryable
hOpts options.HandlerOptions
scope tally.Scope
logger *zap.Logger
engine *promql.Engine
queryable promstorage.Queryable
hOpts options.HandlerOptions
scope tally.Scope
logger *zap.Logger
opts opts
}

func newReadHandler(
opts Options,
hOpts options.HandlerOptions,
queryable promstorage.Queryable,
) http.Handler {
options ...Option,
) (http.Handler, error) {
scope := hOpts.InstrumentOpts().MetricsScope().Tagged(
map[string]string{"handler": "prometheus-read"},
)
return &readHandler{
engine: opts.PromQLEngine,
queryable: queryable,
hOpts: hOpts,
scope: scope,
logger: hOpts.InstrumentOpts().Logger(),
opts := newDefaultOptions(hOpts)
for _, optionFn := range options {
if err := optionFn(&opts); err != nil {
return nil, err
}
}

return &readHandler{
engine: opts.promQLEngine,
queryable: queryable,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm missing something, both engine and queryable seems to be used to construct an instance of promql.Query. If we're switching to injecting a function/closure for it, we might as well get rid of these two fields by using them in the newQueryFn closure. E.g.:

type newQueryFn func(models.RequestParams) (promql.Query, error)

type readHandler struct {
    newQueryFn newQueryFn
    hOpts      options.HandlerOptions
    scope      tally.Scope
    logger     *zap.Logger
    /* ... */
}

func newRangeQueryFn(engine *promql.Engine, queryable  promstorage.Queryable) newQueryFn {
    return func(params models.RequestParams) (promql.Query, error) {
        return engine.NewRangeQuery(queryable, /* params.* */)
    }
}

hOpts: hOpts,
opts: opts,
scope: scope,
logger: hOpts.InstrumentOpts().Logger(),
}, nil
}

func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

fetchOptions, err := h.hOpts.FetchOptionsBuilder().NewFetchOptions(r)
if err != nil {
respondError(w, err)
RespondError(w, err)
return
}

request, err := native.ParseRequest(ctx, r, false, h.hOpts)
request, err := native.ParseRequest(ctx, r, h.opts.instant, h.hOpts)
if err != nil {
respondError(w, err)
RespondError(w, err)
return
}

params := request.Params

// NB (@shreyas): We put the FetchOptions in context so it can be
// retrieved in the queryable object as there is no other way to pass
// that through.
var resultMetadata block.ResultMetadata
ctx = context.WithValue(ctx, prometheus.FetchOptionsContextKey, fetchOptions)
ctx = context.WithValue(ctx, prometheus.BlockResultMetadataKey, &resultMetadata)

if request.Params.Timeout > 0 {
if params.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, request.Params.Timeout)
ctx, cancel = context.WithTimeout(ctx, params.Timeout)
defer cancel()
}

qry, err := h.engine.NewRangeQuery(
h.queryable,
request.Params.Query,
request.Params.Start,
request.Params.End,
request.Params.Step)
qry, err := h.opts.newQueryFn(h.engine, h.queryable, params)
if err != nil {
h.logger.Error("error creating range query",
zap.Error(err), zap.String("query", request.Params.Query))
respondError(w, xerrors.NewInvalidParamsError(err))
h.logger.Error("error creating query",
zap.Error(err), zap.String("query", params.Query),
zap.Bool("instant", h.opts.instant))
RespondError(w, xerrors.NewInvalidParamsError(err))
return
}
defer qry.Close()

res := qry.Exec(ctx)
if res.Err != nil {
h.logger.Error("error executing range query",
zap.Error(res.Err), zap.String("query", request.Params.Query))
respondError(w, res.Err)
h.logger.Error("error executing query",
zap.Error(res.Err), zap.String("query", params.Query),
zap.Bool("instant", h.opts.instant))
RespondError(w, res.Err)
return
}

for _, warn := range resultMetadata.Warnings {
res.Warnings = append(res.Warnings, errors.New(warn.Message))
}

query := request.Params.Query
query := params.Query
err = ApplyRangeWarnings(query, &resultMetadata)
if err != nil {
h.logger.Warn("error applying range warnings",
zap.Error(err), zap.String("query", query))
zap.Error(err), zap.String("query", query),
zap.Bool("instant", h.opts.instant))
}

handleroptions.AddResponseHeaders(w, resultMetadata, fetchOptions)
respond(w, &queryData{
Respond(w, &QueryData{
Result: res.Value,
ResultType: res.Value.Type(),
}, res.Warnings)
Expand Down
Loading