Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Increase perf for temporal functions #2049

Merged
merged 19 commits into from
Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .fossa.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ analyze:
path: src/cmd/services/m3query/main
options:
allow-unresolved: true
- name: github.com/m3db/m3/src/cmd/services/m3comparator/main
type: go
target: github.com/m3db/m3/src/cmd/services/m3comparator/main
path: src/cmd/services/m3comparator/main
- name: github.com/m3db/m3/src/cmd/tools/carbon_load/main
type: go
target: github.com/m3db/m3/src/cmd/tools/carbon_load/main
Expand Down
4 changes: 2 additions & 2 deletions scripts/comparator/docker-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ function setup_docker {
echo "Run m3query, m3comparator, and prometheus containers"
docker-compose -f ${COMPOSE_FILE} up -d --build --renew-anon-volumes m3comparator
docker-compose -f ${COMPOSE_FILE} up -d --build --renew-anon-volumes prometheus
docker-compose -f ${COMPOSE_FILE} up -d --build --renew-anon-volumes m3query

docker-compose -f ${COMPOSE_FILE} up -d --build --renew-anon-volumes m3query
CI=$1
if [[ "$CI" != "true" ]]
then
Expand Down
1 change: 1 addition & 0 deletions scripts/comparator/m3query.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ metrics:

tagOptions:
idScheme: quoted

2 changes: 1 addition & 1 deletion src/aggregator/aggregation/quantile/cm/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type SamplePool interface {
// Init initializes the pool.
Init()

// Get returns a sample from the pool.
// Get gets a sample from the pool.
Get() *Sample

// Put returns a sample to the pool.
Expand Down
17 changes: 17 additions & 0 deletions src/cmd/services/m3comparator/main/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"math"
"math/rand"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -154,6 +155,22 @@ func (q *querier) FetchCompressed(
}
}

break
} else if "gen" == string(matcher.Name) {
cStr := string(matcher.Value)
count, err := strconv.Atoi(cStr)
if err != nil {
return m3.SeriesFetchResult{}, noop, err
}

actualGens = make([]seriesGen, count)
for i := 0; i < count; i++ {
actualGens[i] = seriesGen{
res: time.Second * 15,
name: fmt.Sprintf("foo_%d", i),
}
}

robskillington marked this conversation as resolved.
Show resolved Hide resolved
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/prometheus/native/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ func parseInstantaneousParams(
if fetchOpts.Step == 0 {
fetchOpts.Step = time.Second
}

r.Form.Set(startParam, nowTimeValue)
r.Form.Set(endParam, nowTimeValue)

params, err := parseParams(r, engineOpts, timeoutOpts,
fetchOpts, instrumentOpts)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/prometheus/native/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestParseBlockType(t *testing.T) {
instrument.NewOptions()))

r = httptest.NewRequest(http.MethodGet, "/foo?block-type=1", nil)
assert.Equal(t, models.TypeMultiBlock, parseBlockType(r,
assert.Equal(t, models.TypeSingleBlock, parseBlockType(r,
instrument.NewOptions()))

r = httptest.NewRequest(http.MethodGet, "/foo?block-type=2", nil)
Expand Down
20 changes: 10 additions & 10 deletions src/query/api/v1/handler/prometheus/native/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ import (
"github.com/stretchr/testify/require"
)

func TestPromReadHandler_Read(t *testing.T) {
testPromReadHandler_Read(t, block.NewResultMetadata(), "")
testPromReadHandler_Read(t, buildWarningMeta("foo", "bar"), "foo_bar")
testPromReadHandler_Read(t, block.ResultMetadata{Exhaustive: false},
func TestPromReadHandlerRead(t *testing.T) {
testPromReadHandlerRead(t, block.NewResultMetadata(), "")
testPromReadHandlerRead(t, buildWarningMeta("foo", "bar"), "foo_bar")
testPromReadHandlerRead(t, block.ResultMetadata{Exhaustive: false},
handler.LimitHeaderSeriesLimitApplied)
}

func testPromReadHandler_Read(
func testPromReadHandlerRead(
t *testing.T,
resultMeta block.ResultMetadata,
ex string,
Expand Down Expand Up @@ -100,14 +100,14 @@ type M3QLResp []struct {
StepSizeMs int `json:"step_size_ms"`
}

func TestPromReadHandlerRead(t *testing.T) {
testPromReadHandlerRead(t, block.NewResultMetadata(), "")
testPromReadHandlerRead(t, buildWarningMeta("foo", "bar"), "foo_bar")
testPromReadHandlerRead(t, block.ResultMetadata{Exhaustive: false},
func TestM3PromReadHandlerRead(t *testing.T) {
testM3PromReadHandlerRead(t, block.NewResultMetadata(), "")
testM3PromReadHandlerRead(t, buildWarningMeta("foo", "bar"), "foo_bar")
testM3PromReadHandlerRead(t, block.ResultMetadata{Exhaustive: false},
handler.LimitHeaderSeriesLimitApplied)
}

func testPromReadHandlerRead(
func testM3PromReadHandlerRead(
t *testing.T,
resultMeta block.ResultMetadata,
ex string,
Expand Down
77 changes: 62 additions & 15 deletions src/query/api/v1/handler/prometheus/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package remote

import (
"bytes"
"context"
"net/http"
"sync"
Expand Down Expand Up @@ -178,8 +179,8 @@ func (h *PromReadHandler) parseRequest(
}

type readResult struct {
result []*prompb.QueryResult
meta block.ResultMetadata
result []*prompb.QueryResult
}

func (h *PromReadHandler) read(
Expand All @@ -190,18 +191,18 @@ func (h *PromReadHandler) read(
fetchOpts *storage.FetchOptions,
) (readResult, error) {
var (
queryCount = len(r.Queries)
promResults = make([]*prompb.QueryResult, queryCount)
cancelFuncs = make([]context.CancelFunc, queryCount)
queryOpts = &executor.QueryOptions{
queryCount = len(r.Queries)
cancelFuncs = make([]context.CancelFunc, queryCount)
queryResults = make([]*prompb.QueryResult, queryCount)
meta = block.NewResultMetadata()
queryOpts = &executor.QueryOptions{
QueryContextOptions: models.QueryContextOptions{
LimitMaxTimeseries: fetchOpts.Limit,
}}

wg sync.WaitGroup
mu sync.Mutex
multiErr xerrors.MultiError
meta = block.NewResultMetadata()
)

wg.Add(queryCount)
Expand All @@ -221,23 +222,20 @@ func (h *PromReadHandler) read(

// Detect clients closing connections
handler.CloseWatcher(ctx, cancel, w, h.instrumentOpts)
result, err := h.engine.Execute(ctx, query, queryOpts, fetchOpts)
result, err := h.engine.ExecuteProm(ctx, query, queryOpts, fetchOpts)
if err != nil {
mu.Lock()
multiErr = multiErr.Add(err)
mu.Unlock()
return
}

result.PromResult.Timeseries = filterResults(
result.PromResult.GetTimeseries(), fetchOpts)
mu.Lock()
queryResults[i] = result.PromResult
meta = meta.CombineMetadata(result.Metadata)
mu.Unlock()
result.SeriesList = prometheus.FilterSeriesByOptions(
result.SeriesList,
fetchOpts,
)
promRes := storage.FetchResultToPromResult(result, h.keepEmpty)
promResults[i] = promRes
}()
}

Expand All @@ -247,8 +245,57 @@ func (h *PromReadHandler) read(
}

if err := multiErr.FinalError(); err != nil {
return readResult{nil, meta}, err
return readResult{result: nil, meta: meta}, err
}

return readResult{result: queryResults, meta: meta}, nil
}

// filterResults removes series tags based on options.
func filterResults(
series []*prompb.TimeSeries,
opts *storage.FetchOptions,
) []*prompb.TimeSeries {
if opts == nil {
return series
}

keys := opts.RestrictQueryOptions.GetRestrictByTag().GetFilterByNames()
if len(keys) == 0 {
return series
}

for i, s := range series {
series[i].Labels = filterLabels(s.Labels, keys)
}

return series
}

func filterLabels(
labels []prompb.Label,
filtering [][]byte,
) []prompb.Label {
if len(filtering) == 0 {
return labels
}

filtered := labels[:0]
for _, l := range labels {
skip := false
for _, f := range filtering {
if bytes.Equal(l.GetName(), f) {
skip = true
break
}
}

if skip {
continue
}

filtered = append(filtered, l)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could make this a little more performant by:

  1. start for loop compare each until we hit a label we need to skip
  2. if we find label we need to skip, copy all into "filtered" up to this element, then keep copying as you go
  3. if get to end and none were found that need to be skipped, then just take a reference to the original slice

This would avoid memcpying the results back into the original for any results that don't have the label.

But maybe an overoptimization... so only do it if you feel like it's worthwhile. I also realize this only happens if the filter is specified at all.. so maybe not worth doing for now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll keep that pattern in mind, but this path is likely cold enough for this to not really matter that much for the extra complexity? If we see this showing up in traces, happy to revisit though

}

return readResult{promResults, meta}, nil
return filtered
}
102 changes: 78 additions & 24 deletions src/query/api/v1/handler/prometheus/remote/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/test"
"github.com/m3db/m3/src/query/test/m3"
"github.com/m3db/m3/src/query/ts"
xclock "github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/instrument"

Expand Down Expand Up @@ -244,24 +243,14 @@ func TestMultipleRead(t *testing.T) {
now := time.Now()
promNow := storage.TimeToPromTimestamp(now)

vals := ts.NewMockValues(ctrl)
vals.EXPECT().Len().Return(1).AnyTimes()
dp := ts.Datapoints{{Timestamp: now, Value: 1}}
vals.EXPECT().Datapoints().Return(dp).AnyTimes()

tags := models.NewTags(1, models.NewTagOptions()).
AddTag(models.Tag{Name: []byte("a"), Value: []byte("b")})

valsTwo := ts.NewMockValues(ctrl)
valsTwo.EXPECT().Len().Return(1).AnyTimes()
dpTwo := ts.Datapoints{{Timestamp: now, Value: 2}}
valsTwo.EXPECT().Datapoints().Return(dpTwo).AnyTimes()
tagsTwo := models.NewTags(1, models.NewTagOptions()).
AddTag(models.Tag{Name: []byte("c"), Value: []byte("d")})

r := &storage.FetchResult{
SeriesList: ts.SeriesList{
ts.NewSeries([]byte("a"), vals, tags),
r := storage.PromResult{
PromResult: &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{
&prompb.TimeSeries{
Samples: []prompb.Sample{{Value: 1, Timestamp: promNow}},
Labels: []prompb.Label{{Name: []byte("a"), Value: []byte("b")}},
},
},
},
Metadata: block.ResultMetadata{
Exhaustive: true,
Expand All @@ -270,9 +259,14 @@ func TestMultipleRead(t *testing.T) {
},
}

rTwo := &storage.FetchResult{
SeriesList: ts.SeriesList{
ts.NewSeries([]byte("c"), valsTwo, tagsTwo),
rTwo := storage.PromResult{
PromResult: &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{
&prompb.TimeSeries{
Samples: []prompb.Sample{{Value: 2, Timestamp: promNow}},
Labels: []prompb.Label{{Name: []byte("c"), Value: []byte("d")}},
},
},
},
Metadata: block.ResultMetadata{
Exhaustive: false,
Expand All @@ -295,9 +289,11 @@ func TestMultipleRead(t *testing.T) {

engine := executor.NewMockEngine(ctrl)
engine.EXPECT().
Execute(gomock.Any(), q, gomock.Any(), gomock.Any()).Return(r, nil)
ExecuteProm(gomock.Any(), q, gomock.Any(), gomock.Any()).
Return(r, nil)
engine.EXPECT().
Execute(gomock.Any(), qTwo, gomock.Any(), gomock.Any()).Return(rTwo, nil)
ExecuteProm(gomock.Any(), qTwo, gomock.Any(), gomock.Any()).
Return(rTwo, nil)

h := NewPromReadHandler(engine, nil, nil, true, instrument.NewOptions()).(*PromReadHandler)
res, err := h.read(context.TODO(), nil, req, 0, storage.NewFetchOptions())
Expand Down Expand Up @@ -325,3 +321,61 @@ func TestMultipleRead(t *testing.T) {
require.Equal(t, 1, len(meta.Warnings))
assert.Equal(t, "foo_bar", meta.Warnings[0].Header())
}

func TestReadWithOptions(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

now := time.Now()
promNow := storage.TimeToPromTimestamp(now)

r := storage.PromResult{
PromResult: &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{
&prompb.TimeSeries{
Samples: []prompb.Sample{{Value: 1, Timestamp: promNow}},
Labels: []prompb.Label{
{Name: []byte("a"), Value: []byte("b")},
{Name: []byte("remove"), Value: []byte("c")},
},
},
},
},
Metadata: block.NewResultMetadata(),
}

req := &prompb.ReadRequest{
Queries: []*prompb.Query{{StartTimestampMs: 10}},
}

q, err := storage.PromReadQueryToM3(req.Queries[0])
require.NoError(t, err)

engine := executor.NewMockEngine(ctrl)
engine.EXPECT().
ExecuteProm(gomock.Any(), q, gomock.Any(), gomock.Any()).
Return(r, nil)

opts := storage.NewFetchOptions()
opts.RestrictQueryOptions = &storage.RestrictQueryOptions{
RestrictByTag: &storage.RestrictByTag{
Strip: [][]byte{[]byte("remove")},
},
}

h := NewPromReadHandler(engine, nil, nil, true,
instrument.NewOptions()).(*PromReadHandler)
res, err := h.read(context.TODO(), nil, req, 0, opts)
require.NoError(t, err)
expected := &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{
&prompb.TimeSeries{
Labels: []prompb.Label{{Name: []byte("a"), Value: []byte("b")}},
Samples: []prompb.Sample{{Timestamp: promNow, Value: 1}},
},
},
}

result := res.result
assert.Equal(t, expected.Timeseries[0], result[0].Timeseries[0])
}
Loading