Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Fanout options to select namespaces #1328

Merged
merged 11 commits into from
Jan 31, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -35,36 +35,36 @@ import (
)

const (
// SearchURL is the url for searching graphite metrics.
SearchURL = handler.RoutePrefixV1 + "/graphite/metrics/find"
// FindURL is the url for finding graphite metrics.
FindURL = handler.RoutePrefixV1 + "/graphite/metrics/find"
)

var (
// SearchHTTPMethods is the HTTP methods used with this resource.
SearchHTTPMethods = []string{http.MethodGet, http.MethodPost}
// FindHTTPMethods is the HTTP methods used with this resource.
FindHTTPMethods = []string{http.MethodGet, http.MethodPost}
)

type grahiteSearchHandler struct {
type grahiteFindHandler struct {
storage storage.Storage
}

// NewSearchHandler returns a new instance of handler.
func NewSearchHandler(
// NewFindHandler returns a new instance of handler.
func NewFindHandler(
storage storage.Storage,
) http.Handler {
return &grahiteSearchHandler{
return &grahiteFindHandler{
storage: storage,
}
}

func (h *grahiteSearchHandler) ServeHTTP(
func (h *grahiteFindHandler) ServeHTTP(
w http.ResponseWriter,
r *http.Request,
) {
ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header)
logger := logging.WithContext(ctx)
w.Header().Set("Content-Type", "application/json")
query, rErr := parseSearchParamsToQuery(r)
query, rErr := parseFindParamsToQuery(r)
if rErr != nil {
xhttp.Error(w, rErr.Inner(), rErr.Code())
return
Expand Down Expand Up @@ -108,8 +108,8 @@ func (h *grahiteSearchHandler) ServeHTTP(
}

// TODO: Support multiple result types
if err = searchResultsJSON(w, prefix, seenMap); err != nil {
logger.Error("unable to print search results", zap.Error(err))
if err = findResultsJSON(w, prefix, seenMap); err != nil {
logger.Error("unable to print find results", zap.Error(err))
xhttp.Error(w, err, http.StatusBadRequest)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/m3db/m3/src/x/net/http"
)

func parseSearchParamsToQuery(r *http.Request) (
func parseFindParamsToQuery(r *http.Request) (
*storage.FetchQuery,
*xhttp.ParseError,
) {
Expand Down Expand Up @@ -86,7 +86,7 @@ func parseSearchParamsToQuery(r *http.Request) (
}, nil
}

func searchResultsJSON(
func findResultsJSON(
w io.Writer,
prefix string,
tags map[string]bool,
Expand Down
11 changes: 9 additions & 2 deletions src/query/api/v1/handler/prometheus/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.promReadMetrics.fetchSuccess.Inc(1)
}

func (h *PromReadHandler) parseRequest(r *http.Request) (*prompb.ReadRequest, *xhttp.ParseError) {
func (h *PromReadHandler) parseRequest(
r *http.Request,
) (*prompb.ReadRequest, *xhttp.ParseError) {
reqBuf, err := prometheus.ParsePromCompressedRequest(r)
if err != nil {
return nil, err
Expand All @@ -142,7 +144,12 @@ func (h *PromReadHandler) parseRequest(r *http.Request) (*prompb.ReadRequest, *x
return &req, nil
}

func (h *PromReadHandler) read(reqCtx context.Context, w http.ResponseWriter, r *prompb.ReadRequest, timeout time.Duration) ([]*prompb.QueryResult, error) {
func (h *PromReadHandler) read(
reqCtx context.Context,
w http.ResponseWriter,
r *prompb.ReadRequest,
timeout time.Duration,
) ([]*prompb.QueryResult, error) {
// TODO: Handle multi query use case
if len(r.Queries) != 1 {
return nil, fmt.Errorf("prometheus read endpoint currently only supports one query at a time")
Expand Down
6 changes: 3 additions & 3 deletions src/query/api/v1/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ func (h *Handler) RegisterRoutes() error {
logged(graphite.NewRenderHandler(h.storage)).ServeHTTP,
).Methods(graphite.ReadHTTPMethods...)

h.router.HandleFunc(graphite.SearchURL,
logged(graphite.NewSearchHandler(h.storage)).ServeHTTP,
).Methods(graphite.SearchHTTPMethods...)
h.router.HandleFunc(graphite.FindURL,
logged(graphite.NewFindHandler(h.storage)).ServeHTTP,
).Methods(graphite.FindHTTPMethods...)

if h.clusterClient != nil {
placementOpts := placement.HandlerOptions{
Expand Down
19 changes: 16 additions & 3 deletions src/query/executor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,16 @@ func newEngineMetrics(scope tally.Scope) *engineMetrics {
}

// Execute runs the query and closes the results channel once done
func (e *Engine) Execute(ctx context.Context, query *storage.FetchQuery, opts *EngineOptions, results chan *storage.QueryResult) {
func (e *Engine) Execute(
ctx context.Context,
query *storage.FetchQuery,
opts *EngineOptions,
results chan *storage.QueryResult,
) {
defer close(results)
result, err := e.store.Fetch(ctx, query, &storage.FetchOptions{})
fetchOpts := storage.NewFetchOptions()
fetchOpts.Limit = 0
result, err := e.store.Fetch(ctx, query, fetchOpts)
if err != nil {
results <- &storage.QueryResult{Err: err}
return
Expand All @@ -115,7 +122,13 @@ func (e *Engine) Execute(ctx context.Context, query *storage.FetchQuery, opts *E

// ExecuteExpr runs the query DAG and closes the results channel once done
// nolint: unparam
func (e *Engine) ExecuteExpr(ctx context.Context, parser parser.Parser, opts *EngineOptions, params models.RequestParams, results chan Query) {
func (e *Engine) ExecuteExpr(
ctx context.Context,
parser parser.Parser,
opts *EngineOptions,
params models.RequestParams,
results chan Query,
) {
defer close(results)

req := newRequest(e, params)
Expand Down
10 changes: 8 additions & 2 deletions src/query/graphite/storage/m3_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,14 @@ func (s *m3WrappedStore) FetchByQuery(
m3ctx, cancel := context.WithTimeout(ctx.RequestContext(), opts.Timeout)
defer cancel()

m3result, err := s.m3.Fetch(m3ctx, m3query,
storage.NewFetchOptions())
fetchOptions := storage.NewFetchOptions()
fetchOptions.FanoutOptions = &storage.FanoutOptions{
FanoutUnaggregated: storage.FanoutForceDisable,
FanoutAggregated: storage.FanoutForceEnable,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, should we specifying Default perhaps here?

FanoutAggregatedOptimized: storage.FanoutForceEnable,
Copy link
Collaborator

@robskillington robskillington Jan 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want this to be FanoutForceDisable for the optimized flag so that all aggregated namespaces look like "partial" namespaces?

}

m3result, err := s.m3.Fetch(m3ctx, m3query, fetchOptions)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions src/query/storage/fanout/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ func TestFanoutReadEmpty(t *testing.T) {

func TestFanoutReadError(t *testing.T) {
store := setupFanoutRead(t, true)
_, err := store.Fetch(context.TODO(), &storage.FetchQuery{}, &storage.FetchOptions{})
opts := storage.NewFetchOptions()
_, err := store.Fetch(context.TODO(), &storage.FetchQuery{}, opts)
assert.Error(t, err)
}

Expand All @@ -166,7 +167,8 @@ func TestFanoutSearchEmpty(t *testing.T) {

func TestFanoutSearchError(t *testing.T) {
store := setupFanoutRead(t, true)
_, err := store.FetchTags(context.TODO(), &storage.FetchQuery{}, &storage.FetchOptions{})
opts := storage.NewFetchOptions()
_, err := store.FetchTags(context.TODO(), &storage.FetchQuery{}, opts)
assert.Error(t, err)
}

Expand Down
130 changes: 100 additions & 30 deletions src/query/storage/m3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,12 @@ func (s *m3storage) fetchCompressed(
// cluster that can completely fulfill this range and then prefer the
// highest resolution (most fine grained) results.
// This needs to be optimized, however this is a start.
fanout, namespaces, err := s.resolveClusterNamespacesForQuery(query.Start, query.End)
fanout, namespaces, err := s.resolveClusterNamespacesForQuery(
query.Start,
query.End,
options.FanoutOptions,
)

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -507,27 +512,46 @@ func (s *m3storage) writeSingle(
func (s *m3storage) resolveClusterNamespacesForQuery(
start time.Time,
end time.Time,
opts *storage.FanoutOptions,
) (queryFanoutType, ClusterNamespaces, error) {
now := s.nowFn()

unaggregated := s.clusters.UnaggregatedClusterNamespace()
unaggregatedRetention := unaggregated.Options().Attributes().Retention
unaggregatedStart := now.Add(-1 * unaggregatedRetention)
if unaggregatedStart.Before(start) || unaggregatedStart.Equal(start) {
// Highest resolution is unaggregated, return if it can fulfill it
return namespaceCoversAllQueryRange, ClusterNamespaces{unaggregated}, nil
type unaggregatedNamespaceDetails struct {
unaggregated ClusterNamespace
retention time.Duration
enabled bool
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move this type to top of this file?


var (
unaggregatedNamespace unaggregatedNamespaceDetails
)

if opts.FanoutUnaggregated != storage.FanoutForceDisable {
unaggregated := s.clusters.UnaggregatedClusterNamespace()
unaggregatedRetention := unaggregated.Options().Attributes().Retention
unaggregatedStart := now.Add(-1 * unaggregatedRetention)
if unaggregatedStart.Before(start) || unaggregatedStart.Equal(start) {
// Highest resolution is unaggregated, return if it can fulfill it
return namespaceCoversAllQueryRange, ClusterNamespaces{unaggregated}, nil
}

unaggregatedNamespace.unaggregated = unaggregated
unaggregatedNamespace.retention = unaggregatedRetention
unaggregatedNamespace.enabled = true
}

// First determine if any aggregated clusters span the whole query range, if
// so that's the most optimal strategy, choose the most granular resolution
// that can and fan out to any partial aggregated namespaces that may holder
// even more granular resolutions
var r reusedAggregatedNamespaceSlices
r = s.aggregatedNamespaces(r, func(namespace ClusterNamespace) bool {
// Include only if can fulfill the entire time range of the query
clusterStart := now.Add(-1 * namespace.Options().Attributes().Retention)
return clusterStart.Before(start) || clusterStart.Equal(start)
})
r = s.aggregatedNamespaces(r,
func(namespace ClusterNamespace) bool {
// Include only if can fulfill the entire time range of the query
clusterStart := now.Add(-1 * namespace.Options().Attributes().Retention)
return clusterStart.Before(start) || clusterStart.Equal(start)
},
opts)

if len(r.completeAggregated) > 0 {
// Return the most granular completed aggregated namespace and
Expand Down Expand Up @@ -556,45 +580,69 @@ func (s *m3storage) resolveClusterNamespacesForQuery(
// as much data as possible, along with any partially aggregated namespaces
// that have either same retention and lower resolution or longer retention
// than the complete aggregated namespace
r = s.aggregatedNamespaces(r, nil)
r = s.aggregatedNamespaces(r, nil, opts)

if len(r.completeAggregated) == 0 {
// Absolutely no complete aggregated namespaces, need to fanout to all
// partial aggregated namespaces as well as the unaggregated cluster
// as we have no idea who has the longest retention
result := append(r.partialAggregated, unaggregated)
// as we have no idea who has the longest retention.
result := r.partialAggregated
if unaggregatedNamespace.enabled {
result = append(result, unaggregatedNamespace.unaggregated)
}

// Need to change namespaceCoversAllQueryRange if all these namespaces
// cover the entire query range, since if that is the case we just
// want to choose the namespace with the best resolution.
allFulfillsRange := true
for _, n := range result {
clusterStart := now.Add(-1 * n.Options().Attributes().Retention)
fulfillsRange := clusterStart.Before(start) || clusterStart.Equal(start)
if !fulfillsRange {
allFulfillsRange = false
break
}
}

if allFulfillsRange {
return namespaceCoversAllQueryRange, result, nil
}

return namespaceCoversPartialQueryRange, result, nil
}

// Return the longest retention aggregated namespace and
// any potentially more granular or longer retention partial
// aggregated namespaces
// aggregated namespaces.
sort.Stable(sort.Reverse(ClusterNamespacesByRetentionAsc(r.completeAggregated)))

// Take longest retention complete aggregated namespace or the unaggregated
// cluster if that is longer than the longest aggregated namespace
// cluster if that is longer than the longest aggregated namespace.
result := r.completeAggregated[:1]
completedAttrs := result[0].Options().Attributes()
if completedAttrs.Retention <= unaggregatedRetention {
// If the longest aggregated cluster for some reason has lower retention
// than the unaggregated cluster then we prefer the unaggregated cluster
// as it has a complete data set and is always the most granular
result[0] = unaggregated
completedAttrs = unaggregated.Options().Attributes()
if unaggregatedNamespace.enabled {
if completedAttrs.Retention <= unaggregatedNamespace.retention {
// If the longest aggregated cluster for some reason has lower retention
// than the unaggregated cluster then we prefer the unaggregated cluster
// as it has a complete data set and is always the most granular.
unaggregated := unaggregatedNamespace.unaggregated
result[0] = unaggregated
completedAttrs = unaggregated.Options().Attributes()
}
}

// Take any partially aggregated namespaces with longer retention or
// same retention with more granular resolution that may contain
// a matching metric
// a matching metric.
for _, n := range r.partialAggregated {
if n.Options().Attributes().Retention > completedAttrs.Retention {
// Higher retention
// Higher retention.
result = append(result, n)
continue
}
if n.Options().Attributes().Retention == completedAttrs.Retention &&
n.Options().Attributes().Resolution < completedAttrs.Resolution {
// Same retention but more granular resolution
// Same retention but more granular resolution.
result = append(result, n)
continue
}
Expand All @@ -611,22 +659,32 @@ type reusedAggregatedNamespaceSlices struct {
func (s *m3storage) aggregatedNamespaces(
slices reusedAggregatedNamespaceSlices,
filter func(ClusterNamespace) bool,
opts *storage.FanoutOptions,
) reusedAggregatedNamespaceSlices {
all := s.clusters.ClusterNamespaces()

// Reset reused slices as necessary
if slices.completeAggregated == nil {
slices.completeAggregated = make([]ClusterNamespace, 0, len(all))
}

slices.completeAggregated = slices.completeAggregated[:0]
if slices.partialAggregated == nil {
slices.partialAggregated = make([]ClusterNamespace, 0, len(all))
}

slices.partialAggregated = slices.partialAggregated[:0]
if opts.FanoutAggregated == storage.FanoutForceDisable {
// Force disable fanning out to any aggregated namespaces.
return slices
}

// Otherwise the default and force enable is to fanout and treat
// the aggregated namespaces differently (depending on whether they
// have all the data).
for _, namespace := range all {
opts := namespace.Options()
if opts.Attributes().MetricsType != storage.AggregatedMetricsType {
nsOpts := namespace.Options()
if nsOpts.Attributes().MetricsType != storage.AggregatedMetricsType {
// Not an aggregated cluster
continue
}
Expand All @@ -635,8 +693,20 @@ func (s *m3storage) aggregatedNamespaces(
continue
}

downsampleOpts, err := opts.DownsampleOptions()
if err != nil || !downsampleOpts.All {
downsampleOpts, err := nsOpts.DownsampleOptions()
if err != nil {
continue
}

if opts.FanoutAggregatedOptimized == storage.FanoutForceDisable {
// If we disable the optimization of sometimes knowing we can eclipse
// the partially aggregated namespaces, then we treat all namespaces
// as potentially not having the complete set of data.
slices.partialAggregated = append(slices.partialAggregated, namespace)
continue
}

if !downsampleOpts.All {
// Cluster does not contain all data, include as part of fan out
// but separate from
slices.partialAggregated = append(slices.partialAggregated, namespace)
Expand Down
Loading