diff --git a/docs/integrations/prometheus.md b/docs/integrations/prometheus.md index ab6a6948f0..468927d146 100644 --- a/docs/integrations/prometheus.md +++ b/docs/integrations/prometheus.md @@ -32,7 +32,7 @@ clusters: # We created a namespace called "default" and had set it to retention "48h". - namespace: default retention: 48h - storageMetricsType: unaggregated + type: unaggregated client: config: service: diff --git a/scripts/integration-tests/prometheus/m3coordinator.yml b/scripts/integration-tests/prometheus/m3coordinator.yml index 9e45ac4086..4eaaa5024c 100644 --- a/scripts/integration-tests/prometheus/m3coordinator.yml +++ b/scripts/integration-tests/prometheus/m3coordinator.yml @@ -15,7 +15,7 @@ metrics: clusters: - namespaces: - namespace: prometheus_metrics - storageMetricsType: unaggregated + type: unaggregated retention: 48h client: config: diff --git a/src/cmd/services/m3coordinator/downsample/downsampler.go b/src/cmd/services/m3coordinator/downsample/downsampler.go index 2a98b9e2bb..483096c223 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler.go @@ -63,7 +63,8 @@ func NewDownsampler( func (d *downsampler) NewMetricsAppender() MetricsAppender { return newMetricsAppender(metricsAppenderOptions{ - agg: d.agg.aggregator, + agg: d.agg.aggregator, + defaultStagedMetadatas: d.agg.defaultStagedMetadatas, clockOpts: d.agg.clockOpts, tagEncoder: d.agg.pools.tagEncoderPool.Get(), matcher: d.agg.matcher, diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index 3705481397..bf2d64f24d 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -47,12 +47,30 @@ import ( "github.com/stretchr/testify/require" ) -func TestDownsamplerAggregation(t *testing.T) { +var ( + testAggregationType = aggregation.Sum + testAggregationStoragePolicies = []policy.StoragePolicy{ + policy.MustParseStoragePolicy("2s:1d"), + } +) + +func TestDownsamplerAggregationWithAutoMappingRules(t *testing.T) { + testDownsampler := newTestDownsampler(t, testDownsamplerOptions{ + autoMappingRules: []MappingRule{ + { + Aggregations: []aggregation.Type{testAggregationType}, + Policies: testAggregationStoragePolicies, + }, + }, + }) + + // Test expected output + testDownsamplerAggregation(t, testDownsampler) +} + +func TestDownsamplerAggregationWithRulesStore(t *testing.T) { testDownsampler := newTestDownsampler(t, testDownsamplerOptions{}) - downsampler := testDownsampler.downsampler rulesStore := testDownsampler.rulesStore - logger := testDownsampler.instrumentOpts.Logger(). - WithFields(xlog.NewField("test", t.Name())) // Create rules _, err := rulesStore.CreateNamespace("default", store.NewUpdateOptions()) @@ -62,13 +80,16 @@ func TestDownsamplerAggregation(t *testing.T) { ID: "mappingrule", Name: "mappingrule", Filter: "app:test*", - AggregationID: aggregation.MustCompressTypes(aggregation.Sum), - StoragePolicies: []policy.StoragePolicy{policy.MustParseStoragePolicy("2s:1d")}, + AggregationID: aggregation.MustCompressTypes(testAggregationType), + StoragePolicies: testAggregationStoragePolicies, } _, err = rulesStore.CreateMappingRule("default", rule, store.NewUpdateOptions()) require.NoError(t, err) + logger := testDownsampler.instrumentOpts.Logger(). + WithFields(xlog.NewField("test", t.Name())) + // Wait for mapping rule to appear logger.Infof("waiting for mapping rules to propagate") matcher := testDownsampler.matcher @@ -86,6 +107,19 @@ func TestDownsamplerAggregation(t *testing.T) { time.Sleep(100 * time.Millisecond) } + // Test expected output + testDownsamplerAggregation(t, testDownsampler) +} + +func testDownsamplerAggregation( + t *testing.T, + testDownsampler testDownsampler, +) { + downsampler := testDownsampler.downsampler + + logger := testDownsampler.instrumentOpts.Logger(). + WithFields(xlog.NewField("test", t.Name())) + testCounterMetrics := []struct { tags map[string]string samples []int64 @@ -180,8 +214,9 @@ type testDownsampler struct { } type testDownsamplerOptions struct { - clockOpts clock.Options - instrumentOpts instrument.Options + autoMappingRules []MappingRule + clockOpts clock.Options + instrumentOpts instrument.Options } func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampler { @@ -227,6 +262,7 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl instance, err := NewDownsampler(DownsamplerOptions{ Storage: storage, RulesKVStore: rulesKVStore, + AutoMappingRules: opts.autoMappingRules, ClockOptions: clockOpts, InstrumentOptions: instrumentOpts, TagEncoderOptions: tagEncoderOptions, diff --git a/src/cmd/services/m3coordinator/downsample/metrics_appender.go b/src/cmd/services/m3coordinator/downsample/metrics_appender.go index 46b748d24e..8657c7cf6f 100644 --- a/src/cmd/services/m3coordinator/downsample/metrics_appender.go +++ b/src/cmd/services/m3coordinator/downsample/metrics_appender.go @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/dbnode/serialize" "github.com/m3db/m3aggregator/aggregator" "github.com/m3db/m3metrics/matcher" + "github.com/m3db/m3metrics/metadata" "github.com/m3db/m3x/clock" ) @@ -40,6 +41,7 @@ type metricsAppender struct { type metricsAppenderOptions struct { agg aggregator.Aggregator + defaultStagedMetadatas []metadata.StagedMetadatas clockOpts clock.Options tagEncoder serialize.TagEncoder matcher matcher.Matcher @@ -78,6 +80,15 @@ func (a *metricsAppender) SamplesAppender() (SamplesAppender, error) { matchResult := a.matcher.ForwardMatch(id, fromNanos, toNanos) id.Close() + // Always aggregate any default staged metadats + for _, stagedMetadatas := range a.defaultStagedMetadatas { + a.multiSamplesAppender.addSamplesAppender(samplesAppender{ + agg: a.agg, + unownedID: unownedID, + stagedMetadatas: stagedMetadatas, + }) + } + stagedMetadatas := matchResult.ForExistingIDAt(nowNanos) if !stagedMetadatas.IsDefault() && len(stagedMetadatas) != 0 { // Only sample if going to actually aggregate diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index 0dcbabe999..6092cfa1fa 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -42,7 +42,9 @@ import ( "github.com/m3db/m3metrics/filters" "github.com/m3db/m3metrics/matcher" "github.com/m3db/m3metrics/matcher/cache" + "github.com/m3db/m3metrics/metadata" "github.com/m3db/m3metrics/metric/id" + "github.com/m3db/m3metrics/policy" "github.com/m3db/m3metrics/rules" "github.com/m3db/m3x/clock" "github.com/m3db/m3x/instrument" @@ -76,6 +78,7 @@ type DownsamplerOptions struct { Storage storage.Storage StorageFlushConcurrency int RulesKVStore kv.Store + AutoMappingRules []MappingRule NameTag string ClockOptions clock.Options InstrumentOptions instrument.Options @@ -86,6 +89,33 @@ type DownsamplerOptions struct { OpenTimeout time.Duration } +// MappingRule is a mapping rule to apply to metrics. +type MappingRule struct { + Aggregations []aggregation.Type + Policies policy.StoragePolicies +} + +// StagedMetadatas returns the corresponding staged metadatas for this mapping rule. +func (r MappingRule) StagedMetadatas() (metadata.StagedMetadatas, error) { + aggID, err := aggregation.CompressTypes(r.Aggregations...) + if err != nil { + return nil, err + } + + return metadata.StagedMetadatas{ + metadata.StagedMetadata{ + Metadata: metadata.Metadata{ + Pipelines: metadata.PipelineMetadatas{ + metadata.PipelineMetadata{ + AggregationID: aggID, + StoragePolicies: r.Policies, + }, + }, + }, + }, + }, nil +} + // Validate validates the dynamic downsampling options. func (o DownsamplerOptions) validate() error { if o.Storage == nil { @@ -116,10 +146,11 @@ func (o DownsamplerOptions) validate() error { } type agg struct { - aggregator aggregator.Aggregator - clockOpts clock.Options - matcher matcher.Matcher - pools aggPools + aggregator aggregator.Aggregator + defaultStagedMetadatas []metadata.StagedMetadatas + clockOpts clock.Options + matcher matcher.Matcher + pools aggPools } func (o DownsamplerOptions) newAggregator() (agg, error) { @@ -134,6 +165,7 @@ func (o DownsamplerOptions) newAggregator() (agg, error) { clockOpts = o.ClockOptions instrumentOpts = o.InstrumentOptions openTimeout = defaultOpenTimeout + defaultStagedMetadatas []metadata.StagedMetadatas ) if o.StorageFlushConcurrency > 0 { storageFlushConcurrency = o.StorageFlushConcurrency @@ -141,6 +173,13 @@ func (o DownsamplerOptions) newAggregator() (agg, error) { if o.OpenTimeout > 0 { openTimeout = o.OpenTimeout } + for _, rule := range o.AutoMappingRules { + metadatas, err := rule.StagedMetadatas() + if err != nil { + return agg{}, err + } + defaultStagedMetadatas = append(defaultStagedMetadatas, metadatas) + } pools := o.newAggregatorPools() ruleSetOpts := o.newAggregatorRulesOptions(pools) @@ -227,9 +266,10 @@ func (o DownsamplerOptions) newAggregator() (agg, error) { } return agg{ - aggregator: aggregatorInstance, - matcher: matcher, - pools: pools, + aggregator: aggregatorInstance, + defaultStagedMetadatas: defaultStagedMetadatas, + matcher: matcher, + pools: pools, }, nil } diff --git a/src/query/config/m3coordinator-cluster-template.yml b/src/query/config/m3coordinator-cluster-template.yml index 13a3ebd748..94da40643f 100644 --- a/src/query/config/m3coordinator-cluster-template.yml +++ b/src/query/config/m3coordinator-cluster-template.yml @@ -18,7 +18,7 @@ clusters: # - namespaces: # - namespace: default # retention: 48h -# storageMetricsType: unaggregated +# type: unaggregated # client: # config: # service: diff --git a/src/query/config/m3coordinator-local-etcd.yml b/src/query/config/m3coordinator-local-etcd.yml index b30eb0ad40..4b65c4a423 100644 --- a/src/query/config/m3coordinator-local-etcd.yml +++ b/src/query/config/m3coordinator-local-etcd.yml @@ -15,7 +15,7 @@ metrics: clusters: - namespaces: - namespace: default - storageMetricsType: unaggregated + type: unaggregated retention: 48h client: config: diff --git a/src/query/config/m3query-local-etcd.yml b/src/query/config/m3query-local-etcd.yml index b30eb0ad40..4b65c4a423 100644 --- a/src/query/config/m3query-local-etcd.yml +++ b/src/query/config/m3query-local-etcd.yml @@ -15,7 +15,7 @@ metrics: clusters: - namespaces: - namespace: default - storageMetricsType: unaggregated + type: unaggregated retention: 48h client: config: diff --git a/src/query/server/server.go b/src/query/server/server.go index 245cb7f397..dc4dc00e2e 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -48,12 +48,15 @@ import ( "github.com/m3db/m3/src/query/util/logging" clusterclient "github.com/m3db/m3cluster/client" etcdclient "github.com/m3db/m3cluster/client/etcd" + "github.com/m3db/m3metrics/aggregation" + "github.com/m3db/m3metrics/policy" "github.com/m3db/m3x/clock" xconfig "github.com/m3db/m3x/config" "github.com/m3db/m3x/ident" "github.com/m3db/m3x/instrument" "github.com/m3db/m3x/pool" xsync "github.com/m3db/m3x/sync" + xtime "github.com/m3db/m3x/time" "github.com/pkg/errors" "github.com/uber-go/tally" @@ -299,8 +302,12 @@ func newM3DBStorage( if n := namespaces.NumAggregatedClusterNamespaces(); n > 0 { logger.Info("configuring downsampler to use with aggregated cluster namespaces", zap.Int("numAggregatedClusterNamespaces", n)) + autoMappingRules, err := newDownsamplerAutoMappingRules(namespaces) + if err != nil { + return nil, nil, nil, nil, err + } downsampler, err = newDownsampler(clusterManagementClient, - fanoutStorage, instrumentOptions) + fanoutStorage, autoMappingRules, instrumentOptions) if err != nil { return nil, nil, nil, nil, err } @@ -328,32 +335,35 @@ func newM3DBStorage( func newDownsampler( clusterManagementClient clusterclient.Client, storage storage.Storage, + autoMappingRules []downsample.MappingRule, instrumentOpts instrument.Options, ) (downsample.Downsampler, error) { if clusterManagementClient == nil { - return nil, errors.New("no configured cluster management config, must set this " + - "config for downsampler") + return nil, fmt.Errorf("no configured cluster management config, " + + "must set this config for downsampler") } kvStore, err := clusterManagementClient.KV() if err != nil { - return nil, errors.Wrap(err, "unable to create KV store from the cluster management config client") + return nil, errors.Wrap(err, "unable to create KV store from the "+ + "cluster management config client") } tagEncoderOptions := serialize.NewTagEncoderOptions() tagDecoderOptions := serialize.NewTagDecoderOptions() tagEncoderPoolOptions := pool.NewObjectPoolOptions(). SetInstrumentOptions(instrumentOpts. - SetMetricsScope(instrumentOpts.MetricsScope(). - SubScope("tag-encoder-pool"))) + SetMetricsScope(instrumentOpts.MetricsScope(). + SubScope("tag-encoder-pool"))) tagDecoderPoolOptions := pool.NewObjectPoolOptions(). SetInstrumentOptions(instrumentOpts. - SetMetricsScope(instrumentOpts.MetricsScope(). - SubScope("tag-decoder-pool"))) + SetMetricsScope(instrumentOpts.MetricsScope(). + SubScope("tag-decoder-pool"))) downsampler, err := downsample.NewDownsampler(downsample.DownsamplerOptions{ Storage: storage, RulesKVStore: kvStore, + AutoMappingRules: autoMappingRules, ClockOptions: clock.NewOptions(), InstrumentOptions: instrumentOpts, TagEncoderOptions: tagEncoderOptions, @@ -368,6 +378,37 @@ func newDownsampler( return downsampler, nil } +func newDownsamplerAutoMappingRules( + namespaces []local.ClusterNamespace, +) ([]downsample.MappingRule, error) { + var autoMappingRules []downsample.MappingRule + for _, namespace := range namespaces { + opts := namespace.Options() + attrs := opts.Attributes() + if attrs.MetricsType == storage.AggregatedMetricsType { + downsampleOpts, err := opts.DownsampleOptions() + if err != nil { + errFmt := "unable to resolve downsample options for namespace: %v" + return nil, fmt.Errorf(errFmt, namespace.NamespaceID().String()) + } + if downsampleOpts.All { + storagePolicy := policy.NewStoragePolicy(attrs.Resolution, + xtime.Second, attrs.Retention) + autoMappingRules = append(autoMappingRules, downsample.MappingRule{ + // NB(r): By default we will apply just keep all last values + // since coordinator only uses downsampling with Prometheus + // remote write endpoint. + // More rich static configuration mapping rules can be added + // in the future but they are currently not required. + Aggregations: []aggregation.Type{aggregation.Last}, + Policies: policy.StoragePolicies{storagePolicy}, + }) + } + } + } + return autoMappingRules, nil +} + func initClusters(cfg config.Configuration, dbClientCh <-chan client.Client, logger *zap.Logger) (local.Clusters, error) { var ( clusters local.Clusters diff --git a/src/query/server/server_test.go b/src/query/server/server_test.go index 8f2e03bf90..929dc0d4ec 100644 --- a/src/query/server/server_test.go +++ b/src/query/server/server_test.go @@ -64,7 +64,7 @@ metrics: clusters: - namespaces: - namespace: prometheus_metrics - storageMetricsType: unaggregated + type: unaggregated retention: 48h ` diff --git a/src/query/storage/interface.go b/src/query/storage/interface.go index ae85035865..725d9a12c0 100644 --- a/src/query/storage/interface.go +++ b/src/query/storage/interface.go @@ -139,6 +139,9 @@ const ( UnaggregatedMetricsType MetricsType = iota // AggregatedMetricsType is an aggregated metrics type. AggregatedMetricsType + + // DefaultMetricsType is the default metrics type value. + DefaultMetricsType = UnaggregatedMetricsType ) var ( diff --git a/src/query/storage/local/cluster.go b/src/query/storage/local/cluster.go index e9f7d67346..6fef2e213e 100644 --- a/src/query/storage/local/cluster.go +++ b/src/query/storage/local/cluster.go @@ -38,6 +38,10 @@ var ( errSessionNotSet = errors.New("session not set") errRetentionNotSet = errors.New("retention not set") errResolutionNotSet = errors.New("resolution not set") + + defaultClusterNamespaceDownsampleOptions = ClusterNamespaceDownsampleOptions{ + All: true, + } ) // Clusters is a flattened collection of local storage clusters and namespaces. @@ -66,10 +70,44 @@ type RetentionResolution struct { // ClusterNamespace is a local storage cluster namespace. type ClusterNamespace interface { NamespaceID() ident.ID - Attributes() storage.Attributes + Options() ClusterNamespaceOptions Session() client.Session } +// ClusterNamespaceOptions is a set of options +type ClusterNamespaceOptions struct { + // Note: Don't allow direct access, as we want to provide defaults + // and/or error if call to access a field is not relevant/correct. + attributes storage.Attributes + downsample *ClusterNamespaceDownsampleOptions +} + +// Attributes returns the storage attributes of the cluster namespace. +func (o ClusterNamespaceOptions) Attributes() storage.Attributes { + return o.attributes +} + +// DownsampleOptions returns the downsample options for a cluster namespace, +// which is only valid if the namespace is an aggregated cluster namespace. +func (o ClusterNamespaceOptions) DownsampleOptions() ( + ClusterNamespaceDownsampleOptions, + error, +) { + if o.attributes.MetricsType != storage.AggregatedMetricsType { + return ClusterNamespaceDownsampleOptions{}, errNotAggregatedClusterNamespace + } + if o.downsample == nil { + return defaultClusterNamespaceDownsampleOptions, nil + } + return *o.downsample, nil +} + +// ClusterNamespaceDownsampleOptions is the downsample options for +// a cluster namespace. +type ClusterNamespaceDownsampleOptions struct { + All bool +} + // ClusterNamespaces is a slice of ClusterNamespace instances. type ClusterNamespaces []ClusterNamespace @@ -78,7 +116,7 @@ type ClusterNamespaces []ClusterNamespace func (n ClusterNamespaces) NumAggregatedClusterNamespaces() int { count := 0 for _, namespace := range n { - if namespace.Attributes().MetricsType == storage.AggregatedMetricsType { + if namespace.Options().Attributes().MetricsType == storage.AggregatedMetricsType { count++ } } @@ -115,6 +153,7 @@ type AggregatedClusterNamespaceDefinition struct { Session client.Session Retention time.Duration Resolution time.Duration + Downsample *ClusterNamespaceDownsampleOptions } // Validate validates the cluster namespace definition. @@ -166,8 +205,8 @@ func NewClusters( namespaces = append(namespaces, namespace) key := RetentionResolution{ - Retention: namespace.Attributes().Retention, - Resolution: namespace.Attributes().Resolution, + Retention: namespace.Options().Attributes().Retention, + Resolution: namespace.Options().Attributes().Resolution, } _, exists := aggregatedNamespaces[key] @@ -241,7 +280,7 @@ func (c *clusters) Close() error { type clusterNamespace struct { namespaceID ident.ID - attributes storage.Attributes + options ClusterNamespaceOptions session client.Session } @@ -253,9 +292,11 @@ func newUnaggregatedClusterNamespace( } return &clusterNamespace{ namespaceID: def.NamespaceID, - attributes: storage.Attributes{ - MetricsType: storage.UnaggregatedMetricsType, - Retention: def.Retention, + options: ClusterNamespaceOptions{ + attributes: storage.Attributes{ + MetricsType: storage.UnaggregatedMetricsType, + Retention: def.Retention, + }, }, session: def.Session, }, nil @@ -269,10 +310,13 @@ func newAggregatedClusterNamespace( } return &clusterNamespace{ namespaceID: def.NamespaceID, - attributes: storage.Attributes{ - MetricsType: storage.AggregatedMetricsType, - Retention: def.Retention, - Resolution: def.Resolution, + options: ClusterNamespaceOptions{ + attributes: storage.Attributes{ + MetricsType: storage.AggregatedMetricsType, + Retention: def.Retention, + Resolution: def.Resolution, + }, + downsample: def.Downsample, }, session: def.Session, }, nil @@ -282,8 +326,8 @@ func (n *clusterNamespace) NamespaceID() ident.ID { return n.namespaceID } -func (n *clusterNamespace) Attributes() storage.Attributes { - return n.attributes +func (n *clusterNamespace) Options() ClusterNamespaceOptions { + return n.options } func (n *clusterNamespace) Session() client.Session { diff --git a/src/query/storage/local/cluster_test.go b/src/query/storage/local/cluster_test.go index 8850db8b8e..559ec421a7 100644 --- a/src/query/storage/local/cluster_test.go +++ b/src/query/storage/local/cluster_test.go @@ -72,9 +72,9 @@ func TestNewClustersFromConfig(t *testing.T) { NewClientFromConfig: newClient1, Namespaces: []ClusterStaticNamespaceConfiguration{ ClusterStaticNamespaceConfiguration{ - Namespace: "unaggregated", - StorageMetricsType: storage.UnaggregatedMetricsType, - Retention: 7 * 24 * time.Hour, + Namespace: "unaggregated", + Type: storage.UnaggregatedMetricsType, + Retention: 7 * 24 * time.Hour, }, }, }, @@ -82,16 +82,16 @@ func TestNewClustersFromConfig(t *testing.T) { NewClientFromConfig: newClient2, Namespaces: []ClusterStaticNamespaceConfiguration{ ClusterStaticNamespaceConfiguration{ - Namespace: "aggregated0", - StorageMetricsType: storage.AggregatedMetricsType, - Retention: 30 * 24 * time.Hour, - Resolution: time.Minute, + Namespace: "aggregated0", + Type: storage.AggregatedMetricsType, + Retention: 30 * 24 * time.Hour, + Resolution: time.Minute, }, ClusterStaticNamespaceConfiguration{ - Namespace: "aggregated1", - StorageMetricsType: storage.AggregatedMetricsType, - Retention: 365 * 24 * time.Hour, - Resolution: 10 * time.Minute, + Namespace: "aggregated1", + Type: storage.AggregatedMetricsType, + Retention: 365 * 24 * time.Hour, + Resolution: 10 * time.Minute, }, }, }, @@ -106,7 +106,7 @@ func TestNewClustersFromConfig(t *testing.T) { assert.Equal(t, storage.Attributes{ MetricsType: storage.UnaggregatedMetricsType, Retention: 7 * 24 * time.Hour, - }, unaggregatedNs.Attributes()) + }, unaggregatedNs.Options().Attributes()) assert.True(t, mockSession1 == unaggregatedNs.Session()) aggregated1Month1Minute, ok := clusters.AggregatedClusterNamespace(RetentionResolution{ @@ -119,7 +119,7 @@ func TestNewClustersFromConfig(t *testing.T) { MetricsType: storage.AggregatedMetricsType, Retention: 30 * 24 * time.Hour, Resolution: time.Minute, - }, aggregated1Month1Minute.Attributes()) + }, aggregated1Month1Minute.Options().Attributes()) assert.True(t, mockSession2 == aggregated1Month1Minute.Session()) aggregated1Year10Minute, ok := clusters.AggregatedClusterNamespace(RetentionResolution{ @@ -132,7 +132,7 @@ func TestNewClustersFromConfig(t *testing.T) { MetricsType: storage.AggregatedMetricsType, Retention: 365 * 24 * time.Hour, Resolution: 10 * time.Minute, - }, aggregated1Year10Minute.Attributes()) + }, aggregated1Year10Minute.Options().Attributes()) assert.True(t, mockSession2 == aggregated1Year10Minute.Session()) // Ensure cannot resolve unexpected clusters diff --git a/src/query/storage/local/config.go b/src/query/storage/local/config.go index e6afdf509c..3059fe0456 100644 --- a/src/query/storage/local/config.go +++ b/src/query/storage/local/config.go @@ -21,6 +21,7 @@ package local import ( + goerrors "errors" "fmt" "sync" "time" @@ -32,6 +33,9 @@ import ( ) var ( + errNotAggregatedClusterNamespace = goerrors.New("not an aggregated cluster namespace") + errBothNamespaceTypeNewAndDeprecatedFieldsSet = goerrors.New("cannot specify both deprecated and non-deprecated fields for namespace type") + defaultNewClientConfigurationParams = client.ConfigurationParameters{} ) @@ -66,10 +70,77 @@ func (c ClusterStaticConfiguration) newClient( // ClusterStaticNamespaceConfiguration describes the namespaces in a // static cluster. type ClusterStaticNamespaceConfiguration struct { - Namespace string `yaml:"namespace"` + // Namespace is namespace in the cluster that is specified. + Namespace string `yaml:"namespace"` + + // Type is the type of values stored by the namespace, current + // supported values are "unaggregated" or "aggregated". + Type storage.MetricsType `yaml:"type"` + + // Retention is the length of which values are stored by the namespace. + Retention time.Duration `yaml:"retention" validate:"nonzero"` + + // Resolution is the frequency of which values are stored by the namespace. + Resolution time.Duration `yaml:"resolution" validate:"min=0"` + + // Downsample is the configuration for downsampling options to use with + // the namespace. + Downsample *DownsampleClusterStaticNamespaceConfiguration `yaml:"downsample"` + + // StorageMetricsType is the namespace type. + // + // Deprecated: Use "Type" field when specifying config instead, it is + // invalid to use both. StorageMetricsType storage.MetricsType `yaml:"storageMetricsType"` - Retention time.Duration `yaml:"retention" validate:"nonzero"` - Resolution time.Duration `yaml:"resolution" validate:"min=0"` +} + +func (c ClusterStaticNamespaceConfiguration) metricsType() (storage.MetricsType, error) { + result := storage.DefaultMetricsType + if c.Type != storage.DefaultMetricsType && c.StorageMetricsType != storage.DefaultMetricsType { + // Don't allow both to not be default + return result, errBothNamespaceTypeNewAndDeprecatedFieldsSet + } + + if c.Type != storage.DefaultMetricsType { + // New field value set + return c.Type, nil + } + + if c.StorageMetricsType != storage.DefaultMetricsType { + // Deprecated field value set + return c.StorageMetricsType, nil + } + + // Both are default + return result, nil +} + +func (c ClusterStaticNamespaceConfiguration) downsampleOptions() ( + ClusterNamespaceDownsampleOptions, + error, +) { + nsType, err := c.metricsType() + if err != nil { + return ClusterNamespaceDownsampleOptions{}, err + } + if nsType != storage.AggregatedMetricsType { + return ClusterNamespaceDownsampleOptions{}, errNotAggregatedClusterNamespace + } + if c.Downsample == nil { + return defaultClusterNamespaceDownsampleOptions, nil + } + + return c.Downsample.downsampleOptions(), nil +} + +// DownsampleClusterStaticNamespaceConfiguration is configuration +// specified for downsampling options on an aggregated cluster namespace. +type DownsampleClusterStaticNamespaceConfiguration struct { + All bool `yaml:"all"` +} + +func (c DownsampleClusterStaticNamespaceConfiguration) downsampleOptions() ClusterNamespaceDownsampleOptions { + return ClusterNamespaceDownsampleOptions(c) } type unaggregatedClusterNamespaceConfiguration struct { @@ -118,7 +189,12 @@ func (c ClustersStaticConfiguration) NewClusters( } for _, n := range clusterCfg.Namespaces { - switch n.StorageMetricsType { + nsType, err := n.metricsType() + if err != nil { + return nil, err + } + + switch nsType { case storage.UnaggregatedMetricsType: numUnaggregatedClusterNamespaces++ if numUnaggregatedClusterNamespaces > 1 { @@ -136,8 +212,7 @@ func (c ClustersStaticConfiguration) NewClusters( append(aggregatedClusterNamespacesCfg.namespaces, n) default: - return nil, fmt.Errorf("unknown storage metrics type: %v", - n.StorageMetricsType) + return nil, fmt.Errorf("unknown storage metrics type: %v", nsType) } } @@ -202,11 +277,18 @@ func (c ClustersStaticConfiguration) NewClusters( } for _, n := range cfg.namespaces { + downsampleOpts, err := n.downsampleOptions() + if err != nil { + return nil, fmt.Errorf("error parse downsample options for cluster #%d namespace %s: %v", + i, n.Namespace, err) + } + def := AggregatedClusterNamespaceDefinition{ NamespaceID: ident.StringID(n.Namespace), Session: cfg.result.session, Retention: n.Retention, Resolution: n.Resolution, + Downsample: &downsampleOpts, } aggregatedClusterNamespaces = append(aggregatedClusterNamespaces, def) } diff --git a/src/query/storage/local/storage.go b/src/query/storage/local/storage.go index b35202a63f..d4635860b9 100644 --- a/src/query/storage/local/storage.go +++ b/src/query/storage/local/storage.go @@ -83,7 +83,7 @@ func (s *localStorage) Fetch(ctx context.Context, query *storage.FetchQuery, opt for _, namespace := range namespaces { namespace := namespace // Capture var - clusterStart := now.Add(-1 * namespace.Attributes().Retention) + clusterStart := now.Add(-1 * namespace.Options().Attributes().Retention) // Only include if cluster can completely fulfill the range if clusterStart.After(query.Start) { @@ -95,7 +95,7 @@ func (s *localStorage) Fetch(ctx context.Context, query *storage.FetchQuery, opt wg.Add(1) go func() { r, err := s.fetch(namespace, m3query, opts) - result.add(namespace.Attributes(), r, err) + result.add(namespace.Options().Attributes(), r, err) wg.Done() }() } @@ -154,7 +154,7 @@ func (s *localStorage) FetchTags(ctx context.Context, query *storage.FetchQuery, for _, namespace := range namespaces { namespace := namespace // Capture var - clusterStart := now.Add(-1 * namespace.Attributes().Retention) + clusterStart := now.Add(-1 * namespace.Options().Attributes().Retention) // Only include if cluster can completely fulfill the range if clusterStart.After(query.Start) {