Skip to content

Commit

Permalink
By default downsample all metrics for aggregated cluster namespaces (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Sep 12, 2018
1 parent 0061da1 commit 721ac92
Show file tree
Hide file tree
Showing 16 changed files with 325 additions and 67 deletions.
2 changes: 1 addition & 1 deletion docs/integrations/prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ clusters:
# We created a namespace called "default" and had set it to retention "48h".
- namespace: default
retention: 48h
storageMetricsType: unaggregated
type: unaggregated
client:
config:
service:
Expand Down
2 changes: 1 addition & 1 deletion scripts/integration-tests/prometheus/m3coordinator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ metrics:
clusters:
- namespaces:
- namespace: prometheus_metrics
storageMetricsType: unaggregated
type: unaggregated
retention: 48h
client:
config:
Expand Down
3 changes: 2 additions & 1 deletion src/cmd/services/m3coordinator/downsample/downsampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func NewDownsampler(

func (d *downsampler) NewMetricsAppender() MetricsAppender {
return newMetricsAppender(metricsAppenderOptions{
agg: d.agg.aggregator,
agg: d.agg.aggregator,
defaultStagedMetadatas: d.agg.defaultStagedMetadatas,
clockOpts: d.agg.clockOpts,
tagEncoder: d.agg.pools.tagEncoderPool.Get(),
matcher: d.agg.matcher,
Expand Down
52 changes: 44 additions & 8 deletions src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,30 @@ import (
"github.com/stretchr/testify/require"
)

func TestDownsamplerAggregation(t *testing.T) {
var (
testAggregationType = aggregation.Sum
testAggregationStoragePolicies = []policy.StoragePolicy{
policy.MustParseStoragePolicy("2s:1d"),
}
)

func TestDownsamplerAggregationWithAutoMappingRules(t *testing.T) {
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
autoMappingRules: []MappingRule{
{
Aggregations: []aggregation.Type{testAggregationType},
Policies: testAggregationStoragePolicies,
},
},
})

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationWithRulesStore(t *testing.T) {
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{})
downsampler := testDownsampler.downsampler
rulesStore := testDownsampler.rulesStore
logger := testDownsampler.instrumentOpts.Logger().
WithFields(xlog.NewField("test", t.Name()))

// Create rules
_, err := rulesStore.CreateNamespace("default", store.NewUpdateOptions())
Expand All @@ -62,13 +80,16 @@ func TestDownsamplerAggregation(t *testing.T) {
ID: "mappingrule",
Name: "mappingrule",
Filter: "app:test*",
AggregationID: aggregation.MustCompressTypes(aggregation.Sum),
StoragePolicies: []policy.StoragePolicy{policy.MustParseStoragePolicy("2s:1d")},
AggregationID: aggregation.MustCompressTypes(testAggregationType),
StoragePolicies: testAggregationStoragePolicies,
}
_, err = rulesStore.CreateMappingRule("default", rule,
store.NewUpdateOptions())
require.NoError(t, err)

logger := testDownsampler.instrumentOpts.Logger().
WithFields(xlog.NewField("test", t.Name()))

// Wait for mapping rule to appear
logger.Infof("waiting for mapping rules to propagate")
matcher := testDownsampler.matcher
Expand All @@ -86,6 +107,19 @@ func TestDownsamplerAggregation(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

func testDownsamplerAggregation(
t *testing.T,
testDownsampler testDownsampler,
) {
downsampler := testDownsampler.downsampler

logger := testDownsampler.instrumentOpts.Logger().
WithFields(xlog.NewField("test", t.Name()))

testCounterMetrics := []struct {
tags map[string]string
samples []int64
Expand Down Expand Up @@ -180,8 +214,9 @@ type testDownsampler struct {
}

type testDownsamplerOptions struct {
clockOpts clock.Options
instrumentOpts instrument.Options
autoMappingRules []MappingRule
clockOpts clock.Options
instrumentOpts instrument.Options
}

func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampler {
Expand Down Expand Up @@ -227,6 +262,7 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl
instance, err := NewDownsampler(DownsamplerOptions{
Storage: storage,
RulesKVStore: rulesKVStore,
AutoMappingRules: opts.autoMappingRules,
ClockOptions: clockOpts,
InstrumentOptions: instrumentOpts,
TagEncoderOptions: tagEncoderOptions,
Expand Down
11 changes: 11 additions & 0 deletions src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/m3db/m3/src/dbnode/serialize"
"github.com/m3db/m3aggregator/aggregator"
"github.com/m3db/m3metrics/matcher"
"github.com/m3db/m3metrics/metadata"
"github.com/m3db/m3x/clock"
)

Expand All @@ -40,6 +41,7 @@ type metricsAppender struct {

type metricsAppenderOptions struct {
agg aggregator.Aggregator
defaultStagedMetadatas []metadata.StagedMetadatas
clockOpts clock.Options
tagEncoder serialize.TagEncoder
matcher matcher.Matcher
Expand Down Expand Up @@ -78,6 +80,15 @@ func (a *metricsAppender) SamplesAppender() (SamplesAppender, error) {
matchResult := a.matcher.ForwardMatch(id, fromNanos, toNanos)
id.Close()

// Always aggregate any default staged metadats
for _, stagedMetadatas := range a.defaultStagedMetadatas {
a.multiSamplesAppender.addSamplesAppender(samplesAppender{
agg: a.agg,
unownedID: unownedID,
stagedMetadatas: stagedMetadatas,
})
}

stagedMetadatas := matchResult.ForExistingIDAt(nowNanos)
if !stagedMetadatas.IsDefault() && len(stagedMetadatas) != 0 {
// Only sample if going to actually aggregate
Expand Down
54 changes: 47 additions & 7 deletions src/cmd/services/m3coordinator/downsample/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ import (
"github.com/m3db/m3metrics/filters"
"github.com/m3db/m3metrics/matcher"
"github.com/m3db/m3metrics/matcher/cache"
"github.com/m3db/m3metrics/metadata"
"github.com/m3db/m3metrics/metric/id"
"github.com/m3db/m3metrics/policy"
"github.com/m3db/m3metrics/rules"
"github.com/m3db/m3x/clock"
"github.com/m3db/m3x/instrument"
Expand Down Expand Up @@ -76,6 +78,7 @@ type DownsamplerOptions struct {
Storage storage.Storage
StorageFlushConcurrency int
RulesKVStore kv.Store
AutoMappingRules []MappingRule
NameTag string
ClockOptions clock.Options
InstrumentOptions instrument.Options
Expand All @@ -86,6 +89,33 @@ type DownsamplerOptions struct {
OpenTimeout time.Duration
}

// MappingRule is a mapping rule to apply to metrics.
type MappingRule struct {
Aggregations []aggregation.Type
Policies policy.StoragePolicies
}

// StagedMetadatas returns the corresponding staged metadatas for this mapping rule.
func (r MappingRule) StagedMetadatas() (metadata.StagedMetadatas, error) {
aggID, err := aggregation.CompressTypes(r.Aggregations...)
if err != nil {
return nil, err
}

return metadata.StagedMetadatas{
metadata.StagedMetadata{
Metadata: metadata.Metadata{
Pipelines: metadata.PipelineMetadatas{
metadata.PipelineMetadata{
AggregationID: aggID,
StoragePolicies: r.Policies,
},
},
},
},
}, nil
}

// Validate validates the dynamic downsampling options.
func (o DownsamplerOptions) validate() error {
if o.Storage == nil {
Expand Down Expand Up @@ -116,10 +146,11 @@ func (o DownsamplerOptions) validate() error {
}

type agg struct {
aggregator aggregator.Aggregator
clockOpts clock.Options
matcher matcher.Matcher
pools aggPools
aggregator aggregator.Aggregator
defaultStagedMetadatas []metadata.StagedMetadatas
clockOpts clock.Options
matcher matcher.Matcher
pools aggPools
}

func (o DownsamplerOptions) newAggregator() (agg, error) {
Expand All @@ -134,13 +165,21 @@ func (o DownsamplerOptions) newAggregator() (agg, error) {
clockOpts = o.ClockOptions
instrumentOpts = o.InstrumentOptions
openTimeout = defaultOpenTimeout
defaultStagedMetadatas []metadata.StagedMetadatas
)
if o.StorageFlushConcurrency > 0 {
storageFlushConcurrency = o.StorageFlushConcurrency
}
if o.OpenTimeout > 0 {
openTimeout = o.OpenTimeout
}
for _, rule := range o.AutoMappingRules {
metadatas, err := rule.StagedMetadatas()
if err != nil {
return agg{}, err
}
defaultStagedMetadatas = append(defaultStagedMetadatas, metadatas)
}

pools := o.newAggregatorPools()
ruleSetOpts := o.newAggregatorRulesOptions(pools)
Expand Down Expand Up @@ -227,9 +266,10 @@ func (o DownsamplerOptions) newAggregator() (agg, error) {
}

return agg{
aggregator: aggregatorInstance,
matcher: matcher,
pools: pools,
aggregator: aggregatorInstance,
defaultStagedMetadatas: defaultStagedMetadatas,
matcher: matcher,
pools: pools,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/config/m3coordinator-cluster-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ clusters:
# - namespaces:
# - namespace: default
# retention: 48h
# storageMetricsType: unaggregated
# type: unaggregated
# client:
# config:
# service:
Expand Down
2 changes: 1 addition & 1 deletion src/query/config/m3coordinator-local-etcd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ metrics:
clusters:
- namespaces:
- namespace: default
storageMetricsType: unaggregated
type: unaggregated
retention: 48h
client:
config:
Expand Down
2 changes: 1 addition & 1 deletion src/query/config/m3query-local-etcd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ metrics:
clusters:
- namespaces:
- namespace: default
storageMetricsType: unaggregated
type: unaggregated
retention: 48h
client:
config:
Expand Down
57 changes: 49 additions & 8 deletions src/query/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,15 @@ import (
"github.com/m3db/m3/src/query/util/logging"
clusterclient "github.com/m3db/m3cluster/client"
etcdclient "github.com/m3db/m3cluster/client/etcd"
"github.com/m3db/m3metrics/aggregation"
"github.com/m3db/m3metrics/policy"
"github.com/m3db/m3x/clock"
xconfig "github.com/m3db/m3x/config"
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/instrument"
"github.com/m3db/m3x/pool"
xsync "github.com/m3db/m3x/sync"
xtime "github.com/m3db/m3x/time"

"github.com/pkg/errors"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -299,8 +302,12 @@ func newM3DBStorage(
if n := namespaces.NumAggregatedClusterNamespaces(); n > 0 {
logger.Info("configuring downsampler to use with aggregated cluster namespaces",
zap.Int("numAggregatedClusterNamespaces", n))
autoMappingRules, err := newDownsamplerAutoMappingRules(namespaces)
if err != nil {
return nil, nil, nil, nil, err
}
downsampler, err = newDownsampler(clusterManagementClient,
fanoutStorage, instrumentOptions)
fanoutStorage, autoMappingRules, instrumentOptions)
if err != nil {
return nil, nil, nil, nil, err
}
Expand Down Expand Up @@ -328,32 +335,35 @@ func newM3DBStorage(
func newDownsampler(
clusterManagementClient clusterclient.Client,
storage storage.Storage,
autoMappingRules []downsample.MappingRule,
instrumentOpts instrument.Options,
) (downsample.Downsampler, error) {
if clusterManagementClient == nil {
return nil, errors.New("no configured cluster management config, must set this " +
"config for downsampler")
return nil, fmt.Errorf("no configured cluster management config, " +
"must set this config for downsampler")
}

kvStore, err := clusterManagementClient.KV()
if err != nil {
return nil, errors.Wrap(err, "unable to create KV store from the cluster management config client")
return nil, errors.Wrap(err, "unable to create KV store from the "+
"cluster management config client")
}

tagEncoderOptions := serialize.NewTagEncoderOptions()
tagDecoderOptions := serialize.NewTagDecoderOptions()
tagEncoderPoolOptions := pool.NewObjectPoolOptions().
SetInstrumentOptions(instrumentOpts.
SetMetricsScope(instrumentOpts.MetricsScope().
SubScope("tag-encoder-pool")))
SetMetricsScope(instrumentOpts.MetricsScope().
SubScope("tag-encoder-pool")))
tagDecoderPoolOptions := pool.NewObjectPoolOptions().
SetInstrumentOptions(instrumentOpts.
SetMetricsScope(instrumentOpts.MetricsScope().
SubScope("tag-decoder-pool")))
SetMetricsScope(instrumentOpts.MetricsScope().
SubScope("tag-decoder-pool")))

downsampler, err := downsample.NewDownsampler(downsample.DownsamplerOptions{
Storage: storage,
RulesKVStore: kvStore,
AutoMappingRules: autoMappingRules,
ClockOptions: clock.NewOptions(),
InstrumentOptions: instrumentOpts,
TagEncoderOptions: tagEncoderOptions,
Expand All @@ -368,6 +378,37 @@ func newDownsampler(
return downsampler, nil
}

func newDownsamplerAutoMappingRules(
namespaces []local.ClusterNamespace,
) ([]downsample.MappingRule, error) {
var autoMappingRules []downsample.MappingRule
for _, namespace := range namespaces {
opts := namespace.Options()
attrs := opts.Attributes()
if attrs.MetricsType == storage.AggregatedMetricsType {
downsampleOpts, err := opts.DownsampleOptions()
if err != nil {
errFmt := "unable to resolve downsample options for namespace: %v"
return nil, fmt.Errorf(errFmt, namespace.NamespaceID().String())
}
if downsampleOpts.All {
storagePolicy := policy.NewStoragePolicy(attrs.Resolution,
xtime.Second, attrs.Retention)
autoMappingRules = append(autoMappingRules, downsample.MappingRule{
// NB(r): By default we will apply just keep all last values
// since coordinator only uses downsampling with Prometheus
// remote write endpoint.
// More rich static configuration mapping rules can be added
// in the future but they are currently not required.
Aggregations: []aggregation.Type{aggregation.Last},
Policies: policy.StoragePolicies{storagePolicy},
})
}
}
}
return autoMappingRules, nil
}

func initClusters(cfg config.Configuration, dbClientCh <-chan client.Client, logger *zap.Logger) (local.Clusters, error) {
var (
clusters local.Clusters
Expand Down
Loading

0 comments on commit 721ac92

Please sign in to comment.