Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

By default downsample all metrics for aggregated cluster namespaces #890

Merged
merged 7 commits into from
Sep 12, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/integrations/prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ clusters:
# We created a namespace called "default" and had set it to retention "48h".
- namespace: default
retention: 48h
storageMetricsType: unaggregated
type: unaggregated
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this going to be b/w compatible? main concern is if skipping specifying this field be the same as unaggregated?

Copy link
Collaborator Author

@robskillington robskillington Sep 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I'll add a support for both to be used in a backwards compatible way. Ugh, good catch.

client:
config:
service:
Expand Down
2 changes: 1 addition & 1 deletion scripts/integration-tests/prometheus/m3coordinator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ metrics:
clusters:
- namespaces:
- namespace: prometheus_metrics
storageMetricsType: unaggregated
type: unaggregated
retention: 48h
client:
config:
Expand Down
3 changes: 2 additions & 1 deletion src/cmd/services/m3coordinator/downsample/downsampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func NewDownsampler(

func (d *downsampler) NewMetricsAppender() MetricsAppender {
return newMetricsAppender(metricsAppenderOptions{
agg: d.agg.aggregator,
agg: d.agg.aggregator,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird that this didn't align, did go fmt break on this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, no idea. gofmt was the tool that aligned it like this. Let's just leave it for now since that how it wants to align it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep fair enough

defaultStagedMetadatas: d.agg.defaultStagedMetadatas,
clockOpts: d.agg.clockOpts,
tagEncoder: d.agg.pools.tagEncoderPool.Get(),
matcher: d.agg.matcher,
Expand Down
52 changes: 44 additions & 8 deletions src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,30 @@ import (
"github.com/stretchr/testify/require"
)

func TestDownsamplerAggregation(t *testing.T) {
var (
testAggregationType = aggregation.Sum
testAggregationStoragePolicies = []policy.StoragePolicy{
policy.MustParseStoragePolicy("2s:1d"),
}
)

func TestDownsamplerAggregationWithAutoMappingRules(t *testing.T) {
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
autoMappingRules: []MappingRule{
{
Aggregations: []aggregation.Type{testAggregationType},
Policies: testAggregationStoragePolicies,
},
},
})

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationWithRulesStore(t *testing.T) {
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{})
downsampler := testDownsampler.downsampler
rulesStore := testDownsampler.rulesStore
logger := testDownsampler.instrumentOpts.Logger().
WithFields(xlog.NewField("test", t.Name()))

// Create rules
_, err := rulesStore.CreateNamespace("default", store.NewUpdateOptions())
Expand All @@ -62,13 +80,16 @@ func TestDownsamplerAggregation(t *testing.T) {
ID: "mappingrule",
Name: "mappingrule",
Filter: "app:test*",
AggregationID: aggregation.MustCompressTypes(aggregation.Sum),
StoragePolicies: []policy.StoragePolicy{policy.MustParseStoragePolicy("2s:1d")},
AggregationID: aggregation.MustCompressTypes(testAggregationType),
StoragePolicies: testAggregationStoragePolicies,
}
_, err = rulesStore.CreateMappingRule("default", rule,
store.NewUpdateOptions())
require.NoError(t, err)

logger := testDownsampler.instrumentOpts.Logger().
WithFields(xlog.NewField("test", t.Name()))

// Wait for mapping rule to appear
logger.Infof("waiting for mapping rules to propagate")
matcher := testDownsampler.matcher
Expand All @@ -86,6 +107,19 @@ func TestDownsamplerAggregation(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

func testDownsamplerAggregation(
t *testing.T,
testDownsampler testDownsampler,
) {
downsampler := testDownsampler.downsampler

logger := testDownsampler.instrumentOpts.Logger().
WithFields(xlog.NewField("test", t.Name()))

testCounterMetrics := []struct {
tags map[string]string
samples []int64
Expand Down Expand Up @@ -180,8 +214,9 @@ type testDownsampler struct {
}

type testDownsamplerOptions struct {
clockOpts clock.Options
instrumentOpts instrument.Options
autoMappingRules []MappingRule
clockOpts clock.Options
instrumentOpts instrument.Options
}

func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampler {
Expand Down Expand Up @@ -227,6 +262,7 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl
instance, err := NewDownsampler(DownsamplerOptions{
Storage: storage,
RulesKVStore: rulesKVStore,
AutoMappingRules: opts.autoMappingRules,
ClockOptions: clockOpts,
InstrumentOptions: instrumentOpts,
TagEncoderOptions: tagEncoderOptions,
Expand Down
11 changes: 11 additions & 0 deletions src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/m3db/m3/src/dbnode/serialize"
"github.com/m3db/m3aggregator/aggregator"
"github.com/m3db/m3metrics/matcher"
"github.com/m3db/m3metrics/metadata"
"github.com/m3db/m3x/clock"
)

Expand All @@ -40,6 +41,7 @@ type metricsAppender struct {

type metricsAppenderOptions struct {
agg aggregator.Aggregator
defaultStagedMetadatas []metadata.StagedMetadatas
clockOpts clock.Options
tagEncoder serialize.TagEncoder
matcher matcher.Matcher
Expand Down Expand Up @@ -78,6 +80,15 @@ func (a *metricsAppender) SamplesAppender() (SamplesAppender, error) {
matchResult := a.matcher.ForwardMatch(id, fromNanos, toNanos)
id.Close()

// Always aggregate any default staged metadats
for _, stagedMetadatas := range a.defaultStagedMetadatas {
a.multiSamplesAppender.addSamplesAppender(samplesAppender{
agg: a.agg,
unownedID: unownedID,
stagedMetadatas: stagedMetadatas,
})
}

stagedMetadatas := matchResult.ForExistingIDAt(nowNanos)
if !stagedMetadatas.IsDefault() && len(stagedMetadatas) != 0 {
// Only sample if going to actually aggregate
Expand Down
54 changes: 47 additions & 7 deletions src/cmd/services/m3coordinator/downsample/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ import (
"github.com/m3db/m3metrics/filters"
"github.com/m3db/m3metrics/matcher"
"github.com/m3db/m3metrics/matcher/cache"
"github.com/m3db/m3metrics/metadata"
"github.com/m3db/m3metrics/metric/id"
"github.com/m3db/m3metrics/policy"
"github.com/m3db/m3metrics/rules"
"github.com/m3db/m3x/clock"
"github.com/m3db/m3x/instrument"
Expand Down Expand Up @@ -76,6 +78,7 @@ type DownsamplerOptions struct {
Storage storage.Storage
StorageFlushConcurrency int
RulesKVStore kv.Store
AutoMappingRules []MappingRule
NameTag string
ClockOptions clock.Options
InstrumentOptions instrument.Options
Expand All @@ -86,6 +89,33 @@ type DownsamplerOptions struct {
OpenTimeout time.Duration
}

// MappingRule is a mapping rule to apply to metrics.
type MappingRule struct {
Aggregations []aggregation.Type
Policies policy.StoragePolicies
}

// StagedMetadatas returns the corresponding staged metadatas for this mapping rule.
func (r MappingRule) StagedMetadatas() (metadata.StagedMetadatas, error) {
aggID, err := aggregation.CompressTypes(r.Aggregations...)
if err != nil {
return nil, err
}

return metadata.StagedMetadatas{
metadata.StagedMetadata{
Metadata: metadata.Metadata{
Pipelines: metadata.PipelineMetadatas{
metadata.PipelineMetadata{
AggregationID: aggID,
StoragePolicies: r.Policies,
},
},
},
},
}, nil
}

// Validate validates the dynamic downsampling options.
func (o DownsamplerOptions) validate() error {
if o.Storage == nil {
Expand Down Expand Up @@ -116,10 +146,11 @@ func (o DownsamplerOptions) validate() error {
}

type agg struct {
aggregator aggregator.Aggregator
clockOpts clock.Options
matcher matcher.Matcher
pools aggPools
aggregator aggregator.Aggregator
defaultStagedMetadatas []metadata.StagedMetadatas
clockOpts clock.Options
matcher matcher.Matcher
pools aggPools
}

func (o DownsamplerOptions) newAggregator() (agg, error) {
Expand All @@ -134,13 +165,21 @@ func (o DownsamplerOptions) newAggregator() (agg, error) {
clockOpts = o.ClockOptions
instrumentOpts = o.InstrumentOptions
openTimeout = defaultOpenTimeout
defaultStagedMetadatas []metadata.StagedMetadatas
)
if o.StorageFlushConcurrency > 0 {
storageFlushConcurrency = o.StorageFlushConcurrency
}
if o.OpenTimeout > 0 {
openTimeout = o.OpenTimeout
}
for _, rule := range o.AutoMappingRules {
metadatas, err := rule.StagedMetadatas()
if err != nil {
return agg{}, err
}
defaultStagedMetadatas = append(defaultStagedMetadatas, metadatas)
}

pools := o.newAggregatorPools()
ruleSetOpts := o.newAggregatorRulesOptions(pools)
Expand Down Expand Up @@ -227,9 +266,10 @@ func (o DownsamplerOptions) newAggregator() (agg, error) {
}

return agg{
aggregator: aggregatorInstance,
matcher: matcher,
pools: pools,
aggregator: aggregatorInstance,
defaultStagedMetadatas: defaultStagedMetadatas,
matcher: matcher,
pools: pools,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/config/m3coordinator-cluster-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ clusters:
# - namespaces:
# - namespace: default
# retention: 48h
# storageMetricsType: unaggregated
# type: unaggregated
# client:
# config:
# service:
Expand Down
2 changes: 1 addition & 1 deletion src/query/config/m3coordinator-local-etcd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ metrics:
clusters:
- namespaces:
- namespace: default
storageMetricsType: unaggregated
type: unaggregated
retention: 48h
client:
config:
Expand Down
2 changes: 1 addition & 1 deletion src/query/config/m3query-local-etcd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ metrics:
clusters:
- namespaces:
- namespace: default
storageMetricsType: unaggregated
type: unaggregated
retention: 48h
client:
config:
Expand Down
62 changes: 52 additions & 10 deletions src/query/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,15 @@ import (
"github.com/m3db/m3/src/query/util/logging"
clusterclient "github.com/m3db/m3cluster/client"
etcdclient "github.com/m3db/m3cluster/client/etcd"
"github.com/m3db/m3metrics/aggregation"
"github.com/m3db/m3metrics/policy"
"github.com/m3db/m3x/clock"
xconfig "github.com/m3db/m3x/config"
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/instrument"
"github.com/m3db/m3x/pool"
xsync "github.com/m3db/m3x/sync"
xtime "github.com/m3db/m3x/time"

"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -230,8 +233,15 @@ func Run(runOpts RunOptions) {
if n := namespaces.NumAggregatedClusterNamespaces(); n > 0 {
logger.Info("configuring downsampler to use with aggregated cluster namespaces",
zap.Int("numAggregatedClusterNamespaces", n))
downsampler = newDownsampler(logger, clusterManagementClient,
fanoutStorage, instrumentOptions)
autoMappingRules, err := newDownsamplerAutoMappingRules(namespaces)
if err != nil {
logger.Fatal(err.Error())
}
downsampler, err = newDownsampler(clusterManagementClient,
fanoutStorage, autoMappingRules, instrumentOptions)
if err != nil {
logger.Fatal(err.Error())
}
}

engine := executor.NewEngine(fanoutStorage)
Expand Down Expand Up @@ -275,20 +285,21 @@ func Run(runOpts RunOptions) {
}

func newDownsampler(
logger *zap.Logger,
clusterManagementClient clusterclient.Client,
storage storage.Storage,
autoMappingRules []downsample.MappingRule,
instrumentOpts instrument.Options,
) downsample.Downsampler {
logger *zap.Logger,
) (downsample.Downsampler, error) {
if clusterManagementClient == nil {
logger.Fatal("no configured cluster management config, must set this " +
"config for downsampler")
return nil, fmt.Errorf("no configured cluster management config, " +
"must set this config for downsampler")
}

kvStore, err := clusterManagementClient.KV()
if err != nil {
logger.Fatal("unable to create KV store from the cluster management "+
"config client", zap.Any("error", err))
return nil, fmt.Errorf("unable to create KV store from the "+
"cluster management config client: %v", err)
}

tagEncoderOptions := serialize.NewTagEncoderOptions()
Expand All @@ -313,10 +324,41 @@ func newDownsampler(
TagDecoderPoolOptions: tagDecoderPoolOptions,
})
if err != nil {
logger.Fatal("unable to create downsampler", zap.Any("error", err))
return nil, fmt.Errorf("unable to create downsampler: %v", err)
}

return downsampler
return downsampler, nil
}

func newDownsamplerAutoMappingRules(
namespaces []local.ClusterNamespace,
) ([]downsample.MappingRule, error) {
var autoMappingRules []downsample.MappingRule
for _, namespace := range namespaces {
opts := namespace.Options()
attrs := opts.Attributes()
if attrs.MetricsType == storage.AggregatedMetricsType {
downsampleOpts, err := opts.DownsampleOptions()
if err != nil {
errFmt := "unable to resolve downsample options for namespace: %v"
return nil, fmt.Errorf(errFmt, namespace.NamespaceID().String())
}
if downsampleOpts.All {
storagePolicy := policy.NewStoragePolicy(attrs.Resolution,
xtime.Second, attrs.Retention)
autoMappingRules = append(autoMappingRules, downsample.MappingRule{
// NB(r): By default we will apply just keep all last values
// since coordinator only uses downsampling with Prometheus
// remote write endpoint.
// More rich static configuration mapping rules can be added
// in the future but they are currently not required.
Aggregations: []aggregation.Type{aggregation.Last},
Policies: policy.StoragePolicies{storagePolicy},
})
}
}
}
return autoMappingRules, nil
}

func newStorages(
Expand Down
2 changes: 1 addition & 1 deletion src/query/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ metrics:
clusters:
- namespaces:
- namespace: prometheus_metrics
storageMetricsType: unaggregated
type: unaggregated
retention: 48h
`

Expand Down
Loading