diff --git a/glide.lock b/glide.lock index e5e0757143..a9bcead297 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 3a53f2c08d71320ccc49b0078a702b0eee06f316cab7a68135c8cbc74d44983c -updated: 2018-07-25T13:02:19.326704-04:00 +hash: e705a45eee71bea9a22036a204f4c7d1b70de8ce049d41f5ae6239a5157d24b5 +updated: 2018-07-28T20:03:02.374102-04:00 imports: - name: github.com/apache/thrift version: c2fb1c4e8c931d22617bebb0bf388cb4d5e6fcff @@ -26,6 +26,8 @@ imports: - client - clientv3 - clientv3/concurrency + - clientv3/namespace + - clientv3/naming - compactor - discovery - embed @@ -49,6 +51,7 @@ imports: - etcdserver/etcdserverpb/gw - etcdserver/membership - etcdserver/stats + - integration - lease - lease/leasehttp - lease/leasepb @@ -73,11 +76,14 @@ imports: - pkg/runtime - pkg/schedule - pkg/srv + - pkg/testutil - pkg/tlsutil - pkg/transport - pkg/types - pkg/wait + - proxy/grpcproxy - proxy/grpcproxy/adapter + - proxy/grpcproxy/cache - raft - raft/raftpb - rafthttp @@ -88,13 +94,15 @@ imports: - wal - wal/walpb - name: github.com/coreos/go-semver - version: 568e959cd89871e61434c1143528d9162da89ef2 + version: 8ab6407b697782a06568d4b7f1db25550ec2e4c6 subpackages: - semver - name: github.com/coreos/go-systemd version: 48702e0da86bd25e76cfef347e2adeb434a0d0a6 subpackages: + - daemon - journal + - util - name: github.com/coreos/pkg version: 97fdf19511ea361ae1c100dd393cc47f8dcfa1e1 subpackages: @@ -116,7 +124,7 @@ imports: - name: github.com/fsnotify/fsnotify version: c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9 - name: github.com/ghodss/yaml - version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee + version: 0ca9ea5df5451ffdf184b4428c902747c2c11cd7 - name: github.com/glycerine/go-unsnap-stream version: 9f0cb55181dd3a0a4c168d3dbc72d4aca4853126 - name: github.com/go-kit/kit @@ -137,6 +145,10 @@ imports: - protoc-gen-gogo/descriptor - sortkeys - types +- name: github.com/golang/groupcache + version: 02826c3e79038b59d737d3b1c0a1d937f71a4433 + subpackages: + - lru - name: github.com/golang/mock version: c34cdb4725f4c3844d095133c6e40e448b86589b subpackages: @@ -198,21 +210,26 @@ imports: - name: github.com/m3db/bloom version: 47fe1193cdb900de7193d1f3d26ea9b2cbf6fb31 - name: github.com/m3db/m3aggregator - version: fd38c07d1a94b8598b6839a9c471e0a6feacfafb + version: 6f59918fe3791a3df0dc146ea492593925809c3e subpackages: - aggregation - aggregation/quantile/cm - aggregator - aggregator/handler - aggregator/handler/common + - aggregator/handler/filter + - aggregator/handler/router + - aggregator/handler/router/trafficcontrol - aggregator/handler/writer + - bitset + - client - generated/proto/flush - hash - rate - runtime - sharding - name: github.com/m3db/m3cluster - version: b4935c48a00f47c6f16f71d15d94e1c24785297d + version: 313c27d715da9c0993bdc640b758d93d1392438a subpackages: - client - client/etcd @@ -237,6 +254,14 @@ imports: - services/leader/campaign - services/leader/election - shard +- name: github.com/m3db/m3ctl + version: acc762bfdd42ecb192d34e48fa7ca1fd7ee088ac + subpackages: + - auth + - service + - service/r2 + - service/r2/store + - service/r2/store/kv - name: github.com/m3db/m3em version: ed532baee45a440f0b08b6893c816634c6978d4d subpackages: @@ -250,47 +275,54 @@ imports: - os/fs - x/grpc - name: github.com/m3db/m3metrics - version: f22d8684fa8b42ff30f1d68f6f2be5e465db9a9d + version: b400256bc6da4c10bad5a5f0a11e91bfa095e2e6 subpackages: - aggregation + - encoding + - encoding/msgpack + - encoding/protobuf - errors - filters - - generated/proto/schema + - generated/proto/aggregationpb + - generated/proto/metricpb + - generated/proto/pipelinepb + - generated/proto/policypb + - generated/proto/rulepb + - generated/proto/transformationpb - matcher - matcher/cache + - metadata - metric - metric/aggregated - metric/id - metric/id/m3 - metric/unaggregated + - pipeline + - pipeline/applied - policy - - protocol/msgpack - rules - - rules/models - - rules/models/changes + - rules/store/kv + - rules/validator + - rules/validator/namespace + - rules/validator/namespace/kv + - rules/validator/namespace/static + - rules/view + - rules/view/changes + - transformation + - x/bytes +- name: github.com/m3db/m3msg + version: 4680d9b45286826f87b134a4559b11d795786eaf + subpackages: + - generated/proto/msgpb + - generated/proto/topicpb + - producer + - producer/buffer + - producer/config + - producer/writer + - protocol/proto + - topic - name: github.com/m3db/m3ninx version: 7556fa8339674f1d9f559486d1feca18c17d1190 - subpackages: - - doc - - generated/proto/fswriter - - generated/proto/querypb - - idx - - index - - index/segment - - index/segment/fs - - index/segment/fs/encoding - - index/segment/fs/encoding/docs - - index/segment/mem - - index/util - - persist - - postings - - postings/pilosa - - postings/roaring - - search - - search/executor - - search/query - - search/searcher - - x - name: github.com/m3db/m3x version: 994ab3222cc45bdb1b439f96d37221651b24feca vcs: git @@ -342,6 +374,10 @@ imports: version: c12348ce28de40eed0136aa2b644d0ee0650e56c subpackages: - pbutil +- name: github.com/mauricelam/genny + version: 9d8700bcc567cd22ea2ef42ce5835a9c80296c4a + subpackages: + - generic - name: github.com/mitchellh/mapstructure version: bb74f1db0675b241733089d5a1faa5dd8b0ef57b - name: github.com/mschoch/smat @@ -381,11 +417,11 @@ imports: subpackages: - prometheus - name: github.com/prometheus/client_model - version: 6f3806018612930941127f2a7c6c453ba2c527d2 + version: fa8ad6fec33561be4280a8f0514318c79d7f6cb6 subpackages: - go - name: github.com/prometheus/common - version: 49fee292b27bfff7f354ee0f64e1bc4850462edf + version: 9e0844febd9e2856f839c9cb974fbd676d1755a8 subpackages: - expfmt - internal/bitbucket.org/ww/goautoneg @@ -416,7 +452,7 @@ imports: - index - labels - name: github.com/RoaringBitmap/roaring - version: 361768d03f0924093d3eed7623f3cf58392620f4 + version: 4c23670306840b0f8755db247d2e9b8369fc356e - name: github.com/satori/go.uuid version: f58768cc1a7a7e77a3bd49e98cdd21419399b6a3 - name: github.com/sergi/go-diff @@ -454,7 +490,7 @@ imports: - name: github.com/uber-go/atomic version: 1ea20fb1cbb1cc08cbd0d913a96dead89aa18289 - name: github.com/uber-go/tally - version: 79f2a33b0e55b1255ffbbaf824dcafb09ff34dda + version: ff17f3c43c065c3c2991f571e740eee43ea3a14a subpackages: - m3 - m3/customtransports @@ -494,8 +530,9 @@ imports: - internal/exit - zapcore - name: golang.org/x/crypto - version: 9419663f5a44be8b34ca85f08abc5fe1be11f8a3 - + version: 1351f936d976c60a0a48d728281922cf63eafb8d + repo: https://github.com/golang/crypto + vcs: git subpackages: - bcrypt - blowfish @@ -521,8 +558,9 @@ imports: subpackages: - errgroup - name: golang.org/x/sys - version: e48874b42435b4347fc52bdee0424a52abc974d7 - + version: d4feaf1a7e61e1d9e79e6c4e76c6349e9cab0a03 + repo: https://github.com/golang/sys + vcs: git subpackages: - unix - name: golang.org/x/text @@ -532,6 +570,12 @@ imports: - transform - unicode/bidi - unicode/norm +- name: golang.org/x/time + version: a4bde12657593d5e90d0533a3e4fd95e635124cb + repo: https://github.com/golang/time + vcs: git + subpackages: + - rate - name: google.golang.org/appengine version: 2e4a801b39fc199db615bfca7d0b9f8cd9580599 subpackages: @@ -549,17 +593,12 @@ imports: - googleapis/api/annotations - googleapis/rpc/status - name: google.golang.org/grpc - version: 5b3c4e850e90a4cf6a20ebd46c8b32a0a3afcb9e + version: 401e0e00e4bb830a10496d64cd95e068c5bf50de subpackages: - balancer - - balancer/base - - balancer/roundrobin - - channelz - codes - connectivity - credentials - - encoding - - encoding/proto - grpclb/grpc_lb_v1/messages - grpclog - health/grpc_health_v1 @@ -569,8 +608,6 @@ imports: - naming - peer - resolver - - resolver/dns - - resolver/passthrough - stats - status - tap @@ -590,9 +627,17 @@ testImports: version: 600d898af40aa09a7a93ecb9265d87b0504b6f03 - name: github.com/fortytw2/leaktest version: 3677f62bb30dbf3b042c4c211245d072aa9ee075 +- name: github.com/go-playground/locales + version: 1e5f1161c6416a5ff48840eb8724a394e48cc534 + subpackages: + - currency +- name: github.com/go-playground/universal-translator + version: 71201497bace774495daed26a3874fd339e0b538 - name: github.com/leanovate/gopter version: f778776473e0ef7764e1434dd01a61cc1ec574b4 subpackages: - commands - gen - prop +- name: gopkg.in/go-playground/validator.v9 + version: a021b2ec9a8a8bb970f3f15bc42617cb520e8a64 diff --git a/glide.yaml b/glide.yaml index c696e3a382..34ffd1b77f 100644 --- a/glide.yaml +++ b/glide.yaml @@ -16,7 +16,7 @@ import: - ident - package: github.com/m3db/m3cluster - version: b4935c48a00f47c6f16f71d15d94e1c24785297d + version: 313c27d715da9c0993bdc640b758d93d1392438a subpackages: - client - services @@ -26,7 +26,10 @@ import: version: ed532baee45a440f0b08b6893c816634c6978d4d - package: github.com/m3db/m3aggregator - version: fd38c07d1a94b8598b6839a9c471e0a6feacfafb + version: 6f59918fe3791a3df0dc146ea492593925809c3e + +- package: github.com/m3db/m3ctl + version: acc762bfdd42ecb192d34e48fa7ca1fd7ee088ac - package: github.com/m3db/m3ninx version: 7556fa8339674f1d9f559486d1feca18c17d1190 @@ -152,7 +155,7 @@ import: version: ^2.2.6 - package: github.com/m3db/m3metrics - version: f22d8684fa8b42ff30f1d68f6f2be5e465db9a9d + version: b400256bc6da4c10bad5a5f0a11e91bfa095e2e6 subpackages: - policy diff --git a/src/cmd/services/m3coordinator/downsample/downsampler.go b/src/cmd/services/m3coordinator/downsample/downsampler.go new file mode 100644 index 0000000000..2a98b9e2bb --- /dev/null +++ b/src/cmd/services/m3coordinator/downsample/downsampler.go @@ -0,0 +1,80 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package downsample + +// Downsampler is a downsampler. +type Downsampler interface { + NewMetricsAppender() MetricsAppender +} + +// MetricsAppender is a metrics appender that can build a samples +// appender, only valid to use with a single caller at a time. +type MetricsAppender interface { + AddTag(name, value string) + SamplesAppender() (SamplesAppender, error) + Reset() + Finalize() +} + +// SamplesAppender is a downsampling samples appender, +// that can only be called by a single caller at a time. +type SamplesAppender interface { + AppendCounterSample(value int64) error + AppendGaugeSample(value float64) error +} + +type downsampler struct { + opts DownsamplerOptions + agg agg +} + +// NewDownsampler returns a new downsampler. +func NewDownsampler( + opts DownsamplerOptions, +) (Downsampler, error) { + agg, err := opts.newAggregator() + if err != nil { + return nil, err + } + + return &downsampler{ + opts: opts, + agg: agg, + }, nil +} + +func (d *downsampler) NewMetricsAppender() MetricsAppender { + return newMetricsAppender(metricsAppenderOptions{ + agg: d.agg.aggregator, + clockOpts: d.agg.clockOpts, + tagEncoder: d.agg.pools.tagEncoderPool.Get(), + matcher: d.agg.matcher, + encodedTagsIteratorPool: d.agg.pools.encodedTagsIteratorPool, + }) +} + +func newMetricsAppender(opts metricsAppenderOptions) *metricsAppender { + return &metricsAppender{ + metricsAppenderOptions: opts, + tags: newTags(), + multiSamplesAppender: newMultiSamplesAppender(), + } +} diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go new file mode 100644 index 0000000000..66bf3adde0 --- /dev/null +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -0,0 +1,289 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package downsample + +import ( + "testing" + "time" + + "github.com/m3db/m3cluster/kv/mem" + "github.com/m3db/m3ctl/service/r2/store" + r2kv "github.com/m3db/m3ctl/service/r2/store/kv" + "github.com/m3db/m3db/src/dbnode/serialize" + "github.com/m3db/m3db/src/query/storage" + "github.com/m3db/m3db/src/query/storage/mock" + "github.com/m3db/m3metrics/aggregation" + "github.com/m3db/m3metrics/generated/proto/rulepb" + "github.com/m3db/m3metrics/matcher" + "github.com/m3db/m3metrics/metric/id" + "github.com/m3db/m3metrics/policy" + ruleskv "github.com/m3db/m3metrics/rules/store/kv" + "github.com/m3db/m3metrics/rules/view" + "github.com/m3db/m3x/clock" + "github.com/m3db/m3x/instrument" + xlog "github.com/m3db/m3x/log" + "github.com/m3db/m3x/pool" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDownsamplerAggregation(t *testing.T) { + testDownsampler := newTestDownsampler(t, testDownsamplerOptions{}) + downsampler := testDownsampler.downsampler + rulesStore := testDownsampler.rulesStore + logger := testDownsampler.instrumentOpts.Logger(). + WithFields(xlog.NewField("test", t.Name())) + + // Create rules + _, err := rulesStore.CreateNamespace("default", store.NewUpdateOptions()) + require.NoError(t, err) + + rule := view.MappingRule{ + ID: "mappingrule", + Name: "mappingrule", + Filter: "app:test*", + AggregationID: aggregation.MustCompressTypes(aggregation.Sum), + StoragePolicies: []policy.StoragePolicy{policy.MustParseStoragePolicy("2s:1d")}, + } + _, err = rulesStore.CreateMappingRule("default", rule, + store.NewUpdateOptions()) + require.NoError(t, err) + + // Wait for mapping rule to appear + logger.Infof("waiting for mapping rules to propagate") + matcher := testDownsampler.matcher + testMatchID := newTestID(t, map[string]string{ + "__name__": "foo", + "app": "test123", + }) + for { + now := time.Now().UnixNano() + res := matcher.ForwardMatch(testMatchID, now, now+1) + results := res.ForExistingIDAt(now) + if !results.IsDefault() { + break + } + time.Sleep(100 * time.Millisecond) + } + + testCounterMetrics := []struct { + tags map[string]string + samples []int64 + expected int64 + }{ + { + tags: map[string]string{"__name__": "counter0", "app": "testapp", "foo": "bar"}, + samples: []int64{1, 2, 3}, + expected: 6, + }, + } + + testGaugeMetrics := []struct { + tags map[string]string + samples []float64 + expected float64 + }{ + { + tags: map[string]string{"__name__": "gauge0", "app": "testapp", "qux": "qaz"}, + samples: []float64{4, 5, 6}, + expected: 15, + }, + } + + logger.Infof("write test metrics") + appender := downsampler.NewMetricsAppender() + defer appender.Finalize() + + for _, metric := range testCounterMetrics { + appender.Reset() + for name, value := range metric.tags { + appender.AddTag(name, value) + } + + samplesAppender, err := appender.SamplesAppender() + require.NoError(t, err) + + for _, sample := range metric.samples { + err := samplesAppender.AppendCounterSample(sample) + require.NoError(t, err) + } + } + for _, metric := range testGaugeMetrics { + appender.Reset() + for name, value := range metric.tags { + appender.AddTag(name, value) + } + + samplesAppender, err := appender.SamplesAppender() + require.NoError(t, err) + + for _, sample := range metric.samples { + err := samplesAppender.AppendGaugeSample(sample) + require.NoError(t, err) + } + } + + // Wait for writes + logger.Infof("wait for test metrics to appear") + for { + writes := testDownsampler.storage.Writes() + if len(writes) == len(testCounterMetrics)+len(testGaugeMetrics) { + break + } + time.Sleep(100 * time.Millisecond) + } + + // Verify writes + logger.Infof("verify test metrics") + writes := testDownsampler.storage.Writes() + for _, metric := range testCounterMetrics { + write := mustFindWrite(t, writes, metric.tags["__name__"]) + assert.Equal(t, metric.tags, map[string]string(write.Tags)) + require.Equal(t, 1, len(write.Datapoints)) + assert.Equal(t, float64(metric.expected), write.Datapoints[0].Value) + } + for _, metric := range testGaugeMetrics { + write := mustFindWrite(t, writes, metric.tags["__name__"]) + assert.Equal(t, metric.tags, map[string]string(write.Tags)) + require.Equal(t, 1, len(write.Datapoints)) + assert.Equal(t, float64(metric.expected), write.Datapoints[0].Value) + } +} + +type testDownsampler struct { + opts DownsamplerOptions + downsampler Downsampler + matcher matcher.Matcher + storage mock.Storage + rulesStore store.Store + instrumentOpts instrument.Options +} + +type testDownsamplerOptions struct { + clockOpts clock.Options + instrumentOpts instrument.Options +} + +func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampler { + storage := mock.NewMockStorage() + rulesKVStore := mem.NewStore() + + clockOpts := clock.NewOptions() + if opts.clockOpts != nil { + clockOpts = opts.clockOpts + } + + instrumentOpts := instrument.NewOptions() + if opts.instrumentOpts != nil { + instrumentOpts = opts.instrumentOpts + } + + matcherOpts := matcher.NewOptions() + + // Initialize the namespaces + _, err := rulesKVStore.Set(matcherOpts.NamespacesKey(), &rulepb.Namespaces{}) + require.NoError(t, err) + + rulesetKeyFmt := matcherOpts.RuleSetKeyFn()([]byte("%s")) + rulesStorageOpts := ruleskv.NewStoreOptions(matcherOpts.NamespacesKey(), + rulesetKeyFmt, nil) + rulesStorage := ruleskv.NewStore(rulesKVStore, rulesStorageOpts) + + storeOpts := r2kv.NewStoreOptions(). + SetRuleUpdatePropagationDelay(0) + rulesStore := r2kv.NewStore(rulesStorage, storeOpts) + + tagEncoderOptions := serialize.NewTagEncoderOptions() + tagDecoderOptions := serialize.NewTagDecoderOptions() + tagEncoderPoolOptions := pool.NewObjectPoolOptions(). + SetInstrumentOptions(instrumentOpts. + SetMetricsScope(instrumentOpts.MetricsScope(). + SubScope("tag-encoder-pool"))) + tagDecoderPoolOptions := pool.NewObjectPoolOptions(). + SetInstrumentOptions(instrumentOpts. + SetMetricsScope(instrumentOpts.MetricsScope(). + SubScope("tag-decoder-pool"))) + + instance, err := NewDownsampler(DownsamplerOptions{ + Storage: storage, + RulesKVStore: rulesKVStore, + ClockOptions: clockOpts, + InstrumentOptions: instrumentOpts, + TagEncoderOptions: tagEncoderOptions, + TagDecoderOptions: tagDecoderOptions, + TagEncoderPoolOptions: tagEncoderPoolOptions, + TagDecoderPoolOptions: tagDecoderPoolOptions, + }) + require.NoError(t, err) + + downcast, ok := instance.(*downsampler) + require.True(t, ok) + + return testDownsampler{ + opts: downcast.opts, + downsampler: instance, + matcher: downcast.agg.matcher, + storage: storage, + rulesStore: rulesStore, + instrumentOpts: instrumentOpts, + } +} + +func newTestID(t *testing.T, tags map[string]string) id.ID { + tagEncoderPool := serialize.NewTagEncoderPool(serialize.NewTagEncoderOptions(), + pool.NewObjectPoolOptions().SetSize(1)) + tagEncoderPool.Init() + + tagsIter := newTags() + for name, value := range tags { + tagsIter.append(name, value) + } + + tagEncoder := tagEncoderPool.Get() + err := tagEncoder.Encode(tagsIter) + require.NoError(t, err) + + data, ok := tagEncoder.Data() + require.True(t, ok) + + tagDecoderPool := serialize.NewTagDecoderPool(serialize.NewTagDecoderOptions(), + pool.NewObjectPoolOptions().SetSize(1)) + tagDecoderPool.Init() + + tagDecoder := tagDecoderPool.Get() + + iter := newEncodedTagsIterator(tagDecoder, nil) + iter.Reset(data.Bytes()) + return iter +} + +func mustFindWrite(t *testing.T, writes []*storage.WriteQuery, name string) *storage.WriteQuery { + var write *storage.WriteQuery + for _, w := range writes { + if w.Tags["__name__"] == name { + write = w + break + } + } + require.NotNil(t, write) + return write +} diff --git a/src/cmd/services/m3coordinator/downsample/flush_handler.go b/src/cmd/services/m3coordinator/downsample/flush_handler.go new file mode 100644 index 0000000000..90e7e17ff2 --- /dev/null +++ b/src/cmd/services/m3coordinator/downsample/flush_handler.go @@ -0,0 +1,171 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package downsample + +import ( + "context" + "sync" + "time" + + "github.com/m3db/m3aggregator/aggregator/handler" + "github.com/m3db/m3aggregator/aggregator/handler/writer" + "github.com/m3db/m3db/src/query/models" + "github.com/m3db/m3db/src/query/storage" + "github.com/m3db/m3db/src/query/ts" + "github.com/m3db/m3metrics/metric/aggregated" + "github.com/m3db/m3x/instrument" + xsync "github.com/m3db/m3x/sync" + + "github.com/uber-go/tally" +) + +const ( + aggregationSuffixTag = "agg" +) + +type downsamplerFlushHandler struct { + sync.RWMutex + storage storage.Storage + encodedTagsIteratorPool *encodedTagsIteratorPool + workerPool xsync.WorkerPool + instrumentOpts instrument.Options + metrics downsamplerFlushHandlerMetrics +} + +type downsamplerFlushHandlerMetrics struct { + flushSuccess tally.Counter + flushErrors tally.Counter +} + +func newDownsamplerFlushHandlerMetrics( + scope tally.Scope, +) downsamplerFlushHandlerMetrics { + return downsamplerFlushHandlerMetrics{ + flushSuccess: scope.Counter("flush-success"), + flushErrors: scope.Counter("flush-errors"), + } +} + +func newDownsamplerFlushHandler( + storage storage.Storage, + encodedTagsIteratorPool *encodedTagsIteratorPool, + workerPool xsync.WorkerPool, + instrumentOpts instrument.Options, +) handler.Handler { + scope := instrumentOpts.MetricsScope().SubScope("downsampler-flush-handler") + return &downsamplerFlushHandler{ + storage: storage, + encodedTagsIteratorPool: encodedTagsIteratorPool, + workerPool: workerPool, + instrumentOpts: instrumentOpts, + metrics: newDownsamplerFlushHandlerMetrics(scope), + } +} + +func (h *downsamplerFlushHandler) NewWriter( + scope tally.Scope, +) (writer.Writer, error) { + return &downsamplerFlushHandlerWriter{ + ctx: context.Background(), + handler: h, + }, nil +} + +func (h *downsamplerFlushHandler) Close() { +} + +type downsamplerFlushHandlerWriter struct { + wg sync.WaitGroup + ctx context.Context + handler *downsamplerFlushHandler +} + +func (w *downsamplerFlushHandlerWriter) Write( + mp aggregated.ChunkedMetricWithStoragePolicy, +) error { + w.wg.Add(1) + w.handler.workerPool.Go(func() { + defer w.wg.Done() + + logger := w.handler.instrumentOpts.Logger() + + iter := w.handler.encodedTagsIteratorPool.Get() + iter.Reset(mp.ChunkedID.Data) + + expected := iter.NumTags() + if len(mp.ChunkedID.Suffix) != 0 { + expected++ + } + + // Add extra tag since we may need to add an aggregation suffix tag + tags := make(models.Tags, expected+1) + for iter.Next() { + name, value := iter.Current() + tags[string(name)] = string(value) + } + if len(mp.ChunkedID.Suffix) != 0 { + tags[aggregationSuffixTag] = string(mp.ChunkedID.Suffix) + } + + err := iter.Err() + iter.Close() + if err != nil { + logger.Errorf("downsampler flush error preparing write: %v", err) + w.handler.metrics.flushErrors.Inc(1) + return + } + + err = w.handler.storage.Write(w.ctx, &storage.WriteQuery{ + Tags: tags, + Datapoints: ts.Datapoints{ts.Datapoint{ + Timestamp: time.Unix(0, mp.TimeNanos), + Value: mp.Value, + }}, + Unit: mp.StoragePolicy.Resolution().Precision, + Attributes: storage.Attributes{ + MetricsType: storage.AggregatedMetricsType, + Retention: mp.StoragePolicy.Retention().Duration(), + Resolution: mp.StoragePolicy.Resolution().Window, + }, + }) + if err != nil { + logger.Errorf("downsampler flush error failed write: %v", err) + w.handler.metrics.flushErrors.Inc(1) + return + } + + w.handler.metrics.flushSuccess.Inc(1) + }) + + return nil +} + +func (w *downsamplerFlushHandlerWriter) Flush() error { + // NB(r): This is a just simply waiting for inflight requests + // to complete since this flush handler isn't connection based. + w.wg.Wait() + return nil +} + +func (w *downsamplerFlushHandlerWriter) Close() error { + // NB(r): This is a no-op since this flush handler isn't connection based. + return nil +} diff --git a/src/cmd/services/m3coordinator/downsample/id_pool_types.go b/src/cmd/services/m3coordinator/downsample/id_pool_types.go new file mode 100644 index 0000000000..8e4b5ba468 --- /dev/null +++ b/src/cmd/services/m3coordinator/downsample/id_pool_types.go @@ -0,0 +1,342 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package downsample + +import ( + "bytes" + "errors" + "fmt" + + "github.com/m3db/m3db/src/dbnode/serialize" + "github.com/m3db/m3metrics/metric/id" + "github.com/m3db/m3x/checked" + "github.com/m3db/m3x/ident" + "github.com/m3db/m3x/pool" + + "github.com/prometheus/common/model" +) + +var ( + defaultMetricNameTagName = []byte(model.MetricNameLabel) + rollupTagName = []byte("m3_rollup") + rollupTagValue = []byte("true") + rollupTag = ident.Tag{ + Name: ident.BytesID(rollupTagName), + Value: ident.BytesID(rollupTagValue), + } + + errNoMetricNameTag = errors.New("no metric name tag found") +) + +type encodedTagsIterator interface { + id.ID + id.SortedTagIterator + NumTags() int +} + +type encodedTagsIter struct { + tagDecoder serialize.TagDecoder + bytes checked.Bytes + pool *encodedTagsIteratorPool +} + +func newEncodedTagsIterator( + tagDecoder serialize.TagDecoder, + pool *encodedTagsIteratorPool, +) encodedTagsIterator { + return &encodedTagsIter{ + tagDecoder: tagDecoder, + bytes: checked.NewBytes(nil, nil), + pool: pool, + } +} + +// Reset resets the iterator. +func (it *encodedTagsIter) Reset(sortedTagPairs []byte) { + it.bytes.IncRef() + it.bytes.Reset(sortedTagPairs) + it.tagDecoder.Reset(it.bytes) +} + +// Bytes returns the underlying bytes. +func (it *encodedTagsIter) Bytes() []byte { + return it.bytes.Bytes() +} + +func (it *encodedTagsIter) NumTags() int { + return it.tagDecoder.Len() +} + +// TagValue returns the value for a tag value. +func (it *encodedTagsIter) TagValue(tagName []byte) ([]byte, bool) { + iter := it.tagDecoder.Duplicate() + defer iter.Close() + + for iter.Next() { + tag := iter.Current() + if bytes.Equal(tagName, tag.Name.Bytes()) { + return tag.Value.Bytes(), true + } + } + return nil, false +} + +// Next returns true if there are more tag names and values. +func (it *encodedTagsIter) Next() bool { + return it.tagDecoder.Next() +} + +// Current returns the current tag name and value. +func (it *encodedTagsIter) Current() ([]byte, []byte) { + tag := it.tagDecoder.Current() + return tag.Name.Bytes(), tag.Value.Bytes() +} + +// Err returns any errors encountered. +func (it *encodedTagsIter) Err() error { + return it.tagDecoder.Err() +} + +// Close closes the iterator. +func (it *encodedTagsIter) Close() { + it.bytes.Reset(nil) + it.bytes.DecRef() + it.tagDecoder.Reset(it.bytes) + + if it.pool != nil { + it.pool.Put(it) + } +} + +type encodedTagsIteratorPool struct { + tagDecoderPool serialize.TagDecoderPool + pool pool.ObjectPool +} + +func newEncodedTagsIteratorPool( + tagDecoderPool serialize.TagDecoderPool, + opts pool.ObjectPoolOptions, +) *encodedTagsIteratorPool { + return &encodedTagsIteratorPool{ + tagDecoderPool: tagDecoderPool, + pool: pool.NewObjectPool(opts), + } +} + +func (p *encodedTagsIteratorPool) Init() { + p.pool.Init(func() interface{} { + return newEncodedTagsIterator(p.tagDecoderPool.Get(), p) + }) +} + +func (p *encodedTagsIteratorPool) Get() encodedTagsIterator { + return p.pool.Get().(*encodedTagsIter) +} + +func (p *encodedTagsIteratorPool) Put(v encodedTagsIterator) { + p.pool.Put(v) +} + +func isRollupID( + sortedTagPairs []byte, + iteratorPool *encodedTagsIteratorPool, +) bool { + iter := iteratorPool.Get() + iter.Reset(sortedTagPairs) + + tagValue, ok := iter.TagValue(rollupTagName) + isRollupID := ok && bytes.Equal(tagValue, rollupTagValue) + iter.Close() + + return isRollupID +} + +// rollupIDProvider is a constructor for rollup IDs, it can be pooled to avoid +// requiring allocation every time we need to construct a rollup ID. +// When used as a ident.TagIterator for the call to serialize.TagEncoder Encode +// method, it will return the rollup tag in the correct alphabetical order +// when progressing through the existing tags. +type rollupIDProvider struct { + index int + tagPairs []id.TagPair + rollupTagIndex int + + tagEncoder serialize.TagEncoder + pool *rollupIDProviderPool +} + +func newRollupIDProvider( + tagEncoder serialize.TagEncoder, + pool *rollupIDProviderPool, +) *rollupIDProvider { + return &rollupIDProvider{ + tagEncoder: tagEncoder, + pool: pool, + } +} + +func (p *rollupIDProvider) provide( + tagPairs []id.TagPair, +) ([]byte, error) { + p.reset(tagPairs) + p.tagEncoder.Reset() + if err := p.tagEncoder.Encode(p); err != nil { + return nil, err + } + data, ok := p.tagEncoder.Data() + if !ok { + return nil, fmt.Errorf("unable to access encoded tags: ok=%v", ok) + } + // Need to return a copy + id := append([]byte(nil), data.Bytes()...) + // Reset after computing + p.reset(nil) + return id, nil +} + +func (p *rollupIDProvider) reset( + tagPairs []id.TagPair, +) { + p.index = -1 + p.tagPairs = tagPairs + p.rollupTagIndex = -1 + for idx, pair := range tagPairs { + if bytes.Compare(rollupTagName, pair.Name) < 0 { + p.rollupTagIndex = idx + break + } + } + if p.rollupTagIndex == -1 { + p.rollupTagIndex = len(p.tagPairs) + } +} + +func (p *rollupIDProvider) finalize() { + if p.pool != nil { + p.pool.Put(p) + } +} + +func (p *rollupIDProvider) Next() bool { + p.index++ + return p.index < p.Len() +} + +func (p *rollupIDProvider) CurrentIndex() int { + if p.index >= 0 { + return p.index + } + return 0 +} + +func (p *rollupIDProvider) Current() ident.Tag { + idx := p.index + if idx == p.rollupTagIndex { + return rollupTag + } + + if idx > p.rollupTagIndex { + // Effective index is subtracted by 1 + idx-- + } + + return ident.Tag{ + Name: ident.BytesID(p.tagPairs[idx].Name), + Value: ident.BytesID(p.tagPairs[idx].Value), + } +} + +func (p *rollupIDProvider) Err() error { + return nil +} + +func (p *rollupIDProvider) Close() { + // No-op +} + +func (p *rollupIDProvider) Len() int { + return len(p.tagPairs) + 1 +} + +func (p *rollupIDProvider) Remaining() int { + return p.Len() - p.index - 1 +} + +func (p *rollupIDProvider) Duplicate() ident.TagIterator { + duplicate := p.pool.Get() + duplicate.reset(p.tagPairs) + return duplicate +} + +type rollupIDProviderPool struct { + tagEncoderPool serialize.TagEncoderPool + pool pool.ObjectPool +} + +func newRollupIDProviderPool( + tagEncoderPool serialize.TagEncoderPool, + opts pool.ObjectPoolOptions, +) *rollupIDProviderPool { + return &rollupIDProviderPool{ + tagEncoderPool: tagEncoderPool, + pool: pool.NewObjectPool(opts), + } +} + +func (p *rollupIDProviderPool) Init() { + p.pool.Init(func() interface{} { + return newRollupIDProvider(p.tagEncoderPool.Get(), p) + }) +} + +func (p *rollupIDProviderPool) Get() *rollupIDProvider { + return p.pool.Get().(*rollupIDProvider) +} + +func (p *rollupIDProviderPool) Put(v *rollupIDProvider) { + p.pool.Put(v) +} + +func resolveEncodedTagsNameTag( + id []byte, + iterPool *encodedTagsIteratorPool, + nameTag []byte, +) ([]byte, error) { + // ID is always the encoded tags for downsampling IDs + iter := iterPool.Get() + iter.Reset(id) + defer iter.Close() + + value, ok := iter.TagValue(nameTag) + if !ok { + // No name was found in encoded tags + return nil, errNoMetricNameTag + } + + idx := bytes.Index(id, value) + if idx == -1 { + return nil, fmt.Errorf( + "resolved metric name tag value not found in ID: %v", value) + } + + // Return original reference to avoid needing to return a copy + return id[idx : idx+len(value)], nil +} diff --git a/src/cmd/services/m3coordinator/downsample/leader_local.go b/src/cmd/services/m3coordinator/downsample/leader_local.go new file mode 100644 index 0000000000..ec6b95e6d6 --- /dev/null +++ b/src/cmd/services/m3coordinator/downsample/leader_local.go @@ -0,0 +1,105 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package downsample + +import ( + "fmt" + "sync" + + "github.com/m3db/m3cluster/services" + "github.com/m3db/m3cluster/services/leader/campaign" +) + +// localLeaderService provides a mocked out local leader service so that +// we do not need to rely on using an etcd cluster just to elect a leader +// for aggregation in-process (which doesn't need leader election at all, +// however it is simpler to keep the current aggregator code structured the +// way it is which is most natural for accommodating the distributed and +// non-in-process aggregation use case). +type localLeaderService struct { + sync.Mutex + id services.ServiceID + elections map[string]chan campaign.Status + leaders map[string]string +} + +func newLocalLeaderService(id services.ServiceID) services.LeaderService { + l := &localLeaderService{id: id} + l.reset() + return l +} + +func (l *localLeaderService) reset() { + l.Lock() + defer l.Unlock() + l.elections = make(map[string]chan campaign.Status) + l.leaders = make(map[string]string) +} + +func (l *localLeaderService) Campaign( + electionID string, + opts services.CampaignOptions, +) (<-chan campaign.Status, error) { + l.Lock() + defer l.Unlock() + + campaignCh, ok := l.elections[electionID] + if !ok { + campaignCh = make(chan campaign.Status, 1) + campaignCh <- campaign.Status{State: campaign.Leader} + l.elections[electionID] = campaignCh + l.leaders[electionID] = l.id.String() + } + return campaignCh, nil +} + +func (l *localLeaderService) push(id string, status campaign.Status) error { + l.Lock() + defer l.Unlock() + + campaignCh, ok := l.elections[id] + if !ok { + return fmt.Errorf("no such campaign: %s", id) + } + + campaignCh <- status + return nil +} + +func (l *localLeaderService) Resign(electionID string) error { + return l.push(electionID, campaign.Status{State: campaign.Follower}) +} + +func (l *localLeaderService) Leader(electionID string) (string, error) { + l.Lock() + defer l.Unlock() + + leader, ok := l.leaders[electionID] + if !ok { + return "", fmt.Errorf("no such campaign: %s", electionID) + } + return leader, nil +} + +func (l *localLeaderService) Close() error { + l.reset() + return nil +} diff --git a/src/cmd/services/m3coordinator/downsample/leader_local_test.go b/src/cmd/services/m3coordinator/downsample/leader_local_test.go new file mode 100644 index 0000000000..8d87ed9857 --- /dev/null +++ b/src/cmd/services/m3coordinator/downsample/leader_local_test.go @@ -0,0 +1,58 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package downsample + +import ( + "testing" + + "github.com/m3db/m3cluster/services" + "github.com/m3db/m3cluster/services/leader/campaign" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLeaderLocalService(t *testing.T) { + id := services.NewServiceID().SetName("foo") + electionID := "bar" + leaderSvc := newLocalLeaderService(id) + + ch, err := leaderSvc.Campaign(electionID, nil) + require.NoError(t, err) + + status := <-ch + assert.NoError(t, status.Err) + assert.Equal(t, campaign.Leader, status.State) + + err = leaderSvc.Resign(electionID) + require.NoError(t, err) + + status = <-ch + assert.NoError(t, status.Err) + assert.Equal(t, campaign.Follower, status.State) + + leader, err := leaderSvc.Leader(electionID) + require.NoError(t, err) + assert.Equal(t, id.String(), leader) + + err = leaderSvc.Close() + require.NoError(t, err) +} diff --git a/src/cmd/services/m3coordinator/downsample/metrics_appender.go b/src/cmd/services/m3coordinator/downsample/metrics_appender.go new file mode 100644 index 0000000000..bad51a57ee --- /dev/null +++ b/src/cmd/services/m3coordinator/downsample/metrics_appender.go @@ -0,0 +1,112 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package downsample + +import ( + "fmt" + "sort" + "time" + + "github.com/m3db/m3aggregator/aggregator" + "github.com/m3db/m3db/src/dbnode/serialize" + "github.com/m3db/m3metrics/matcher" + "github.com/m3db/m3x/clock" +) + +type metricsAppender struct { + metricsAppenderOptions + + tags *tags + multiSamplesAppender *multiSamplesAppender +} + +type metricsAppenderOptions struct { + agg aggregator.Aggregator + clockOpts clock.Options + tagEncoder serialize.TagEncoder + matcher matcher.Matcher + encodedTagsIteratorPool *encodedTagsIteratorPool +} + +func (a *metricsAppender) AddTag(name, value string) { + a.tags.append(name, value) +} + +func (a *metricsAppender) SamplesAppender() (SamplesAppender, error) { + // Sort tags + sort.Sort(a.tags) + + // Encode tags and compute a temporary (unowned) ID + a.tagEncoder.Reset() + if err := a.tagEncoder.Encode(a.tags); err != nil { + return nil, err + } + data, ok := a.tagEncoder.Data() + if !ok { + return nil, fmt.Errorf("unable to encode tags: names=%v, values=%v", + a.tags.names, a.tags.values) + } + + a.multiSamplesAppender.reset() + unownedID := data.Bytes() + + // Match policies and rollups and build samples appender + id := a.encodedTagsIteratorPool.Get() + id.Reset(unownedID) + now := time.Now() + nowNanos := now.UnixNano() + fromNanos := nowNanos + toNanos := nowNanos + 1 + matchResult := a.matcher.ForwardMatch(id, fromNanos, toNanos) + id.Close() + + stagedMetadatas := matchResult.ForExistingIDAt(nowNanos) + if !stagedMetadatas.IsDefault() && len(stagedMetadatas) != 0 { + // Only sample if going to actually aggregate + a.multiSamplesAppender.addSamplesAppender(samplesAppender{ + agg: a.agg, + unownedID: unownedID, + stagedMetadatas: stagedMetadatas, + }) + } + + numRollups := matchResult.NumNewRollupIDs() + for i := 0; i < numRollups; i++ { + rollup := matchResult.ForNewRollupIDsAt(i, nowNanos) + a.multiSamplesAppender.addSamplesAppender(samplesAppender{ + agg: a.agg, + unownedID: rollup.ID, + stagedMetadatas: rollup.Metadatas, + }) + } + + return a.multiSamplesAppender, nil +} + +func (a *metricsAppender) Reset() { + a.tags.names = a.tags.names[:0] + a.tags.values = a.tags.values[:0] +} + +func (a *metricsAppender) Finalize() { + a.tagEncoder.Finalize() + a.tagEncoder = nil +} diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go new file mode 100644 index 0000000000..9ab0b434ad --- /dev/null +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -0,0 +1,415 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package downsample + +import ( + "errors" + "fmt" + "reflect" + "runtime" + "time" + + "github.com/m3db/m3aggregator/aggregator" + "github.com/m3db/m3aggregator/aggregator/handler" + "github.com/m3db/m3aggregator/client" + "github.com/m3db/m3cluster/kv" + "github.com/m3db/m3cluster/kv/mem" + "github.com/m3db/m3cluster/placement" + placementservice "github.com/m3db/m3cluster/placement/service" + placementstorage "github.com/m3db/m3cluster/placement/storage" + "github.com/m3db/m3cluster/services" + "github.com/m3db/m3db/src/dbnode/serialize" + "github.com/m3db/m3db/src/query/storage" + "github.com/m3db/m3metrics/aggregation" + "github.com/m3db/m3metrics/filters" + "github.com/m3db/m3metrics/matcher" + "github.com/m3db/m3metrics/matcher/cache" + "github.com/m3db/m3metrics/metric/id" + "github.com/m3db/m3metrics/rules" + "github.com/m3db/m3x/clock" + "github.com/m3db/m3x/instrument" + "github.com/m3db/m3x/pool" + xsync "github.com/m3db/m3x/sync" +) + +const ( + instanceID = "downsampler_local" + placementKVKey = "/placement" + replicationFactor = 1 + defaultStorageFlushConcurrency = 20000 + defaultOpenTimeout = 10 * time.Second +) + +var ( + numShards = runtime.NumCPU() + + errNoStorage = errors.New("dynamic downsampling enabled with storage not set") + errNoRulesStore = errors.New("dynamic downsampling enabled with rules store not set") + errNoClockOptions = errors.New("dynamic downsampling enabled with clock options not set") + errNoInstrumentOptions = errors.New("dynamic downsampling enabled with instrument options not set") + errNoTagEncoderOptions = errors.New("dynamic downsampling enabled with tag encoder options not set") + errNoTagDecoderOptions = errors.New("dynamic downsampling enabled with tag decoder options not set") + errNoTagEncoderPoolOptions = errors.New("dynamic downsampling enabled with tag encoder pool options not set") + errNoTagDecoderPoolOptions = errors.New("dynamic downsampling enabled with tag decoder pool options not set") +) + +// DownsamplerOptions is a set of required downsampler options. +type DownsamplerOptions struct { + Storage storage.Storage + StorageFlushConcurrency int + RulesKVStore kv.Store + NameTag string + ClockOptions clock.Options + InstrumentOptions instrument.Options + TagEncoderOptions serialize.TagEncoderOptions + TagDecoderOptions serialize.TagDecoderOptions + TagEncoderPoolOptions pool.ObjectPoolOptions + TagDecoderPoolOptions pool.ObjectPoolOptions + OpenTimeout time.Duration +} + +// Validate validates the dynamic downsampling options. +func (o DownsamplerOptions) validate() error { + if o.Storage == nil { + return errNoStorage + } + if o.RulesKVStore == nil { + return errNoRulesStore + } + if o.ClockOptions == nil { + return errNoClockOptions + } + if o.InstrumentOptions == nil { + return errNoInstrumentOptions + } + if o.TagEncoderOptions == nil { + return errNoTagEncoderOptions + } + if o.TagDecoderOptions == nil { + return errNoTagDecoderOptions + } + if o.TagEncoderPoolOptions == nil { + return errNoTagEncoderPoolOptions + } + if o.TagDecoderPoolOptions == nil { + return errNoTagDecoderPoolOptions + } + return nil +} + +type agg struct { + aggregator aggregator.Aggregator + clockOpts clock.Options + matcher matcher.Matcher + pools aggPools +} + +func (o DownsamplerOptions) newAggregator() (agg, error) { + // Validate options first. + if err := o.validate(); err != nil { + return agg{}, err + } + + var ( + storageFlushConcurrency = defaultStorageFlushConcurrency + rulesStore = o.RulesKVStore + clockOpts = o.ClockOptions + instrumentOpts = o.InstrumentOptions + openTimeout = defaultOpenTimeout + ) + if o.StorageFlushConcurrency > 0 { + storageFlushConcurrency = o.StorageFlushConcurrency + } + if o.OpenTimeout > 0 { + openTimeout = o.OpenTimeout + } + + pools := o.newAggregatorPools() + ruleSetOpts := o.newAggregatorRulesOptions(pools) + + // Use default aggregation types, in future we can provide more configurability + var defaultAggregationTypes aggregation.TypesConfiguration + aggTypeOpts, err := defaultAggregationTypes.NewOptions(instrumentOpts) + if err != nil { + return agg{}, err + } + + matcher, err := o.newAggregatorMatcher(clockOpts, instrumentOpts, + ruleSetOpts, rulesStore) + if err != nil { + return agg{}, err + } + + aggClient := client.NewClient(client.NewOptions()) + adminAggClient, ok := aggClient.(client.AdminClient) + if !ok { + return agg{}, fmt.Errorf( + "unable to cast %v to AdminClient", reflect.TypeOf(aggClient)) + } + + serviceID := services.NewServiceID(). + SetEnvironment("production"). + SetName("downsampler"). + SetZone("embedded") + + localKVStore := mem.NewStore() + + placementManager, err := o.newAggregatorPlacementManager(serviceID, + localKVStore) + if err != nil { + return agg{}, err + } + + flushTimesManager := aggregator.NewFlushTimesManager( + aggregator.NewFlushTimesManagerOptions(). + SetFlushTimesStore(localKVStore)) + + electionManager, err := o.newAggregatorElectionManager(serviceID, + placementManager, flushTimesManager) + if err != nil { + return agg{}, err + } + + flushManager, flushHandler := o.newAggregatorFlushManagerAndHandler(serviceID, + placementManager, flushTimesManager, electionManager, instrumentOpts, + storageFlushConcurrency, pools) + + // Finally construct all options + aggregatorOpts := aggregator.NewOptions(). + SetClockOptions(clockOpts). + SetInstrumentOptions(instrumentOpts). + SetAggregationTypesOptions(aggTypeOpts). + SetMetricPrefix(nil). + SetCounterPrefix(nil). + SetGaugePrefix(nil). + SetTimerPrefix(nil). + SetAdminClient(adminAggClient). + SetPlacementManager(placementManager). + SetFlushTimesManager(flushTimesManager). + SetElectionManager(electionManager). + SetFlushManager(flushManager). + SetFlushHandler(flushHandler) + + aggregatorInstance := aggregator.NewAggregator(aggregatorOpts) + if err := aggregatorInstance.Open(); err != nil { + return agg{}, err + } + + // Wait until the aggregator becomes leader so we don't miss datapoints + deadline := time.Now().Add(openTimeout) + for { + if !time.Now().Before(deadline) { + return agg{}, fmt.Errorf("aggregator not promoted to leader after: %s", + openTimeout.String()) + } + if electionManager.ElectionState() == aggregator.LeaderState { + break + } + time.Sleep(10 * time.Millisecond) + } + + return agg{ + aggregator: aggregatorInstance, + matcher: matcher, + pools: pools, + }, nil +} + +type aggPools struct { + tagEncoderPool serialize.TagEncoderPool + tagDecoderPool serialize.TagDecoderPool + encodedTagsIteratorPool *encodedTagsIteratorPool +} + +func (o DownsamplerOptions) newAggregatorPools() aggPools { + tagEncoderPool := serialize.NewTagEncoderPool(o.TagEncoderOptions, + o.TagEncoderPoolOptions) + tagEncoderPool.Init() + + tagDecoderPool := serialize.NewTagDecoderPool(o.TagDecoderOptions, + o.TagDecoderPoolOptions) + tagDecoderPool.Init() + + encodedTagsIteratorPool := newEncodedTagsIteratorPool(tagDecoderPool, + o.TagDecoderPoolOptions) + encodedTagsIteratorPool.Init() + + return aggPools{ + tagEncoderPool: tagEncoderPool, + tagDecoderPool: tagDecoderPool, + encodedTagsIteratorPool: encodedTagsIteratorPool, + } +} + +func (o DownsamplerOptions) newAggregatorRulesOptions(pools aggPools) rules.Options { + nameTag := defaultMetricNameTagName + if o.NameTag != "" { + nameTag = []byte(o.NameTag) + } + + sortedTagIteratorFn := func(tagPairs []byte) id.SortedTagIterator { + it := pools.encodedTagsIteratorPool.Get() + it.Reset(tagPairs) + return it + } + + tagsFilterOpts := filters.TagsFilterOptions{ + NameTagKey: nameTag, + NameAndTagsFn: func(id []byte) ([]byte, []byte, error) { + name, err := resolveEncodedTagsNameTag(id, pools.encodedTagsIteratorPool, + nameTag) + if err != nil { + return nil, nil, err + } + // ID is always the encoded tags for IDs in the downsampler + tags := id + return name, tags, nil + }, + SortedTagIteratorFn: sortedTagIteratorFn, + } + + isRollupIDFn := func(name []byte, tags []byte) bool { + return isRollupID(tags, pools.encodedTagsIteratorPool) + } + + newRollupIDProviderPool := newRollupIDProviderPool(pools.tagEncoderPool, + o.TagEncoderPoolOptions) + newRollupIDProviderPool.Init() + + newRollupIDFn := func(name []byte, tagPairs []id.TagPair) []byte { + rollupIDProvider := newRollupIDProviderPool.Get() + id, err := rollupIDProvider.provide(tagPairs) + if err != nil { + panic(err) // Encoding should never fail + } + rollupIDProvider.finalize() + return id + } + + return rules.NewOptions(). + SetTagsFilterOptions(tagsFilterOpts). + SetNewRollupIDFn(newRollupIDFn). + SetIsRollupIDFn(isRollupIDFn) +} + +func (o DownsamplerOptions) newAggregatorMatcher( + clockOpts clock.Options, + instrumentOpts instrument.Options, + ruleSetOpts rules.Options, + rulesStore kv.Store, +) (matcher.Matcher, error) { + opts := matcher.NewOptions(). + SetClockOptions(clockOpts). + SetInstrumentOptions(instrumentOpts). + SetRuleSetOptions(ruleSetOpts). + SetKVStore(rulesStore) + + cacheOpts := cache.NewOptions(). + SetClockOptions(clockOpts). + SetInstrumentOptions(instrumentOpts. + SetMetricsScope(instrumentOpts.MetricsScope().SubScope("matcher-cache"))) + + cache := cache.NewCache(cacheOpts) + + return matcher.NewMatcher(cache, opts) +} + +func (o DownsamplerOptions) newAggregatorPlacementManager( + serviceID services.ServiceID, + localKVStore kv.Store, +) (aggregator.PlacementManager, error) { + instance := placement.NewInstance(). + SetID(instanceID). + SetWeight(1). + SetEndpoint(instanceID) + + placementOpts := placement.NewOptions(). + SetIsStaged(true). + SetShardStateMode(placement.StableShardStateOnly) + + placementSvc := placementservice.NewPlacementService( + placementstorage.NewPlacementStorage(localKVStore, placementKVKey, placementOpts), + placementOpts) + + _, err := placementSvc.BuildInitialPlacement([]placement.Instance{instance}, numShards, + replicationFactor) + if err != nil { + return nil, err + } + + placementWatcherOpts := placement.NewStagedPlacementWatcherOptions(). + SetStagedPlacementKey(placementKVKey). + SetStagedPlacementStore(localKVStore) + placementWatcher := placement.NewStagedPlacementWatcher(placementWatcherOpts) + placementManagerOpts := aggregator.NewPlacementManagerOptions(). + SetInstanceID(instanceID). + SetStagedPlacementWatcher(placementWatcher) + + return aggregator.NewPlacementManager(placementManagerOpts), nil +} + +func (o DownsamplerOptions) newAggregatorElectionManager( + serviceID services.ServiceID, + placementManager aggregator.PlacementManager, + flushTimesManager aggregator.FlushTimesManager, +) (aggregator.ElectionManager, error) { + leaderValue := instanceID + campaignOpts, err := services.NewCampaignOptions() + if err != nil { + return nil, err + } + + campaignOpts = campaignOpts.SetLeaderValue(leaderValue) + + leaderService := newLocalLeaderService(serviceID) + + electionManagerOpts := aggregator.NewElectionManagerOptions(). + SetCampaignOptions(campaignOpts). + SetLeaderService(leaderService). + SetPlacementManager(placementManager). + SetFlushTimesManager(flushTimesManager) + + return aggregator.NewElectionManager(electionManagerOpts), nil +} + +func (o DownsamplerOptions) newAggregatorFlushManagerAndHandler( + serviceID services.ServiceID, + placementManager aggregator.PlacementManager, + flushTimesManager aggregator.FlushTimesManager, + electionManager aggregator.ElectionManager, + instrumentOpts instrument.Options, + storageFlushConcurrency int, + pools aggPools, +) (aggregator.FlushManager, handler.Handler) { + flushManagerOpts := aggregator.NewFlushManagerOptions(). + SetPlacementManager(placementManager). + SetFlushTimesManager(flushTimesManager). + SetElectionManager(electionManager). + SetJitterEnabled(false) + flushManager := aggregator.NewFlushManager(flushManagerOpts) + + flushWorkers := xsync.NewWorkerPool(storageFlushConcurrency) + flushWorkers.Init() + handler := newDownsamplerFlushHandler(o.Storage, pools.encodedTagsIteratorPool, + flushWorkers, instrumentOpts) + + return flushManager, handler +} diff --git a/src/cmd/services/m3coordinator/downsample/samples_appender.go b/src/cmd/services/m3coordinator/downsample/samples_appender.go new file mode 100644 index 0000000000..cad72fc2ff --- /dev/null +++ b/src/cmd/services/m3coordinator/downsample/samples_appender.go @@ -0,0 +1,91 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package downsample + +import ( + "github.com/m3db/m3aggregator/aggregator" + "github.com/m3db/m3metrics/metadata" + "github.com/m3db/m3metrics/metric" + "github.com/m3db/m3metrics/metric/unaggregated" + xerrors "github.com/m3db/m3x/errors" +) + +type samplesAppender struct { + agg aggregator.Aggregator + unownedID []byte + stagedMetadatas metadata.StagedMetadatas +} + +func (a samplesAppender) AppendCounterSample(value int64) error { + sample := unaggregated.MetricUnion{ + Type: metric.CounterType, + ID: a.unownedID, + CounterVal: value, + } + return a.agg.AddUntimed(sample, a.stagedMetadatas) +} + +func (a samplesAppender) AppendGaugeSample(value float64) error { + sample := unaggregated.MetricUnion{ + Type: metric.GaugeType, + ID: a.unownedID, + GaugeVal: value, + } + return a.agg.AddUntimed(sample, a.stagedMetadatas) +} + +// Ensure multiSamplesAppender implements SamplesAppender +var _ SamplesAppender = (*multiSamplesAppender)(nil) + +type multiSamplesAppender struct { + appenders []samplesAppender +} + +func newMultiSamplesAppender() *multiSamplesAppender { + return &multiSamplesAppender{} +} + +func (a *multiSamplesAppender) reset() { + for i := range a.appenders { + a.appenders[i] = samplesAppender{} + } + a.appenders = a.appenders[:0] +} + +func (a *multiSamplesAppender) addSamplesAppender(v samplesAppender) { + a.appenders = append(a.appenders, v) +} + +func (a *multiSamplesAppender) AppendCounterSample(value int64) error { + var multiErr xerrors.MultiError + for _, appender := range a.appenders { + multiErr = multiErr.Add(appender.AppendCounterSample(value)) + } + return multiErr.FinalError() +} + +func (a *multiSamplesAppender) AppendGaugeSample(value float64) error { + var multiErr xerrors.MultiError + for _, appender := range a.appenders { + multiErr = multiErr.Add(appender.AppendGaugeSample(value)) + } + return multiErr.FinalError() +} diff --git a/src/cmd/services/m3coordinator/downsample/tags.go b/src/cmd/services/m3coordinator/downsample/tags.go new file mode 100644 index 0000000000..390e9ef0b5 --- /dev/null +++ b/src/cmd/services/m3coordinator/downsample/tags.go @@ -0,0 +1,114 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package downsample + +import ( + "sort" + + "github.com/m3db/m3x/ident" +) + +const ( + initAllocTagsSliceCapacity = 32 +) + +type tags struct { + names []string + values []string + idx int + nameBuf []byte + valueBuf []byte +} + +// Ensure tags implements TagIterator and sort Interface +var ( + _ ident.TagIterator = (*tags)(nil) + _ sort.Interface = (*tags)(nil) +) + +func newTags() *tags { + return &tags{ + names: make([]string, 0, initAllocTagsSliceCapacity), + values: make([]string, 0, initAllocTagsSliceCapacity), + idx: -1, + } +} + +func (t *tags) append(name, value string) { + t.names = append(t.names, name) + t.values = append(t.values, value) +} + +func (t *tags) Len() int { + return len(t.names) +} + +func (t *tags) Swap(i, j int) { + t.names[i], t.names[j] = t.names[j], t.names[i] + t.values[i], t.values[j] = t.values[j], t.values[i] +} + +func (t *tags) Less(i, j int) bool { + return t.names[i] < t.names[j] +} + +func (t *tags) Next() bool { + hasNext := t.idx+1 < len(t.names) + if hasNext { + t.idx++ + } + return hasNext +} + +func (t *tags) CurrentIndex() int { + if t.idx >= 0 { + return t.idx + } + return 0 +} + +func (t *tags) Current() ident.Tag { + t.nameBuf = append(t.nameBuf[:0], t.names[t.idx]...) + t.valueBuf = append(t.valueBuf[:0], t.values[t.idx]...) + return ident.Tag{ + Name: ident.BytesID(t.nameBuf), + Value: ident.BytesID(t.valueBuf), + } +} + +func (t *tags) Err() error { + return nil +} + +func (t *tags) Close() { + // No-op +} + +func (t *tags) Remaining() int { + if t.idx < 0 { + return t.Len() + } + return t.Len() - t.idx +} + +func (t *tags) Duplicate() ident.TagIterator { + return &tags{idx: -1, names: t.names, values: t.values} +} diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index a94db911a5..2ec29eba4a 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -40,24 +40,24 @@ db: logging: level: info file: /var/log/m3dbnode.log - + metrics: prometheus: handlerPath: /metrics sanitization: prometheus samplingRate: 1.0 extended: detailed - + listenAddress: 0.0.0.0:9000 clusterListenAddress: 0.0.0.0:9001 httpNodeListenAddress: 0.0.0.0:9002 httpClusterListenAddress: 0.0.0.0:9003 debugListenAddress: 0.0.0.0:9004 - + hostID: resolver: config value: host1 - + client: writeConsistencyLevel: majority readConsistencyLevel: unstrict_majority @@ -79,13 +79,13 @@ db: backgroundHealthCheckFailThrottleFactor: 0.5 hashing: seed: 42 - - + + gcPercentage: 100 - + writeNewSeriesLimitPerSecond: 1048576 writeNewSeriesBackoffDuration: 2ms - + bootstrap: bootstrappers: - filesystem @@ -93,7 +93,7 @@ db: - noop-all fs: numProcessorsPerCPU: 0.125 - + commitlog: flushMaxBytes: 524288 flushEvery: 1s @@ -102,7 +102,7 @@ db: size: 2097152 retentionPeriod: 24h blockSize: 10m - + fs: filePathPrefix: /var/lib/m3db writeBufferSize: 65536 @@ -111,7 +111,7 @@ db: seekReadBufferSize: 4096 throughputLimitMbps: 100.0 throughputCheckEvery: 128 - + repair: enabled: false interval: 2h @@ -119,7 +119,7 @@ db: jitter: 1h throttle: 2m checkInterval: 1m - + pooling: blockAllocSize: 16 type: simple @@ -245,7 +245,7 @@ db: size: 32768 lowWatermark: 0.01 highWatermark: 0.02 - + config: service: env: production @@ -546,6 +546,7 @@ db: tls: null m3sd: initTimeout: null + watchWithRevision: 0 static: null seedNodes: rootDir: /var/lib/etcd diff --git a/src/query/api/v1/handler/prometheus/native/read_test.go b/src/query/api/v1/handler/prometheus/native/read_test.go index b49c1c789f..fe0c17d104 100644 --- a/src/query/api/v1/handler/prometheus/native/read_test.go +++ b/src/query/api/v1/handler/prometheus/native/read_test.go @@ -41,7 +41,9 @@ func TestPromRead(t *testing.T) { values, bounds := test.GenerateValuesAndBounds(nil, nil) b := test.NewBlockFromValues(bounds, values) - mockStorage := mock.NewMockStorageWithBlocks([]block.Block{b}) + + mockStorage := mock.NewMockStorage() + mockStorage.SetFetchBlocksResult(block.Result{Blocks: []block.Block{b}}, nil) promRead := &PromReadHandler{engine: executor.NewEngine(mockStorage)} req, _ := http.NewRequest("GET", PromReadURL, nil) diff --git a/src/query/api/v1/handler/prometheus/remote/write.go b/src/query/api/v1/handler/prometheus/remote/write.go index ac602fd85b..0765140464 100644 --- a/src/query/api/v1/handler/prometheus/remote/write.go +++ b/src/query/api/v1/handler/prometheus/remote/write.go @@ -22,14 +22,17 @@ package remote import ( "context" + "errors" "net/http" + "sync" + "github.com/m3db/m3db/src/cmd/services/m3coordinator/downsample" "github.com/m3db/m3db/src/query/api/v1/handler" "github.com/m3db/m3db/src/query/api/v1/handler/prometheus" "github.com/m3db/m3db/src/query/generated/proto/prompb" "github.com/m3db/m3db/src/query/storage" - "github.com/m3db/m3db/src/query/util/execution" "github.com/m3db/m3db/src/query/util/logging" + xerrors "github.com/m3db/m3x/errors" "github.com/golang/protobuf/proto" "github.com/uber-go/tally" @@ -44,18 +47,31 @@ const ( PromWriteHTTPMethod = http.MethodPost ) +var ( + errNoStorageOrDownsampler = errors.New("no storage or downsampler set, requires at least one or both") +) + // PromWriteHandler represents a handler for prometheus write endpoint. type PromWriteHandler struct { store storage.Storage + downsampler downsample.Downsampler promWriteMetrics promWriteMetrics } // NewPromWriteHandler returns a new instance of handler. -func NewPromWriteHandler(store storage.Storage, scope tally.Scope) http.Handler { +func NewPromWriteHandler( + store storage.Storage, + downsampler downsample.Downsampler, + scope tally.Scope, +) (http.Handler, error) { + if store == nil && downsampler == nil { + return nil, errNoStorageOrDownsampler + } return &PromWriteHandler{ store: store, + downsampler: downsampler, promWriteMetrics: newPromWriteMetrics(scope), - } + }, nil } type promWriteMetrics struct { @@ -104,25 +120,104 @@ func (h *PromWriteHandler) parseRequest(r *http.Request) (*prompb.WriteRequest, } func (h *PromWriteHandler) write(ctx context.Context, r *prompb.WriteRequest) error { - requests := make([]execution.Request, len(r.Timeseries)) - for idx, t := range r.Timeseries { - requests[idx] = newLocalWriteRequest(storage.PromWriteTSToM3(t), h.store) + var ( + wg sync.WaitGroup + writeUnaggErr error + writeAggErr error + ) + if h.downsampler != nil { + // If writing downsampled aggregations, write them async + wg.Add(1) + go func() { + writeAggErr = h.writeAggregated(ctx, r) + wg.Done() + }() + } + + if h.store != nil { + // Write the unaggregated points out, don't spawn goroutine + // so we reduce number of goroutines just a fraction + writeUnaggErr = h.writeUnaggregated(ctx, r) } - return execution.ExecuteParallel(ctx, requests) -} -func (w *localWriteRequest) Process(ctx context.Context) error { - return w.store.Write(ctx, w.writeQuery) + if h.downsampler != nil { + // Wait for downsampling to finish if we wrote datapoints + // for aggregations + wg.Wait() + } + + var multiErr xerrors.MultiError + multiErr = multiErr.Add(writeUnaggErr) + multiErr = multiErr.Add(writeAggErr) + return multiErr.FinalError() } -type localWriteRequest struct { - store storage.Storage - writeQuery *storage.WriteQuery +func (h *PromWriteHandler) writeUnaggregated( + ctx context.Context, + r *prompb.WriteRequest, +) error { + var ( + wg sync.WaitGroup + errLock sync.Mutex + multiErr xerrors.MultiError + ) + for _, t := range r.Timeseries { + t := t // Capture for goroutine + + // TODO(r): Consider adding a worker pool to limit write + // request concurrency, instead of using the batch size + // of incoming request to determine concurrency (some level of control). + wg.Add(1) + go func() { + write := storage.PromWriteTSToM3(t) + write.Attributes = storage.Attributes{ + MetricsType: storage.UnaggregatedMetricsType, + } + + if err := h.store.Write(ctx, write); err != nil { + errLock.Lock() + multiErr = multiErr.Add(err) + errLock.Unlock() + } + + wg.Done() + }() + } + + wg.Wait() + + return multiErr.FinalError() } -func newLocalWriteRequest(writeQuery *storage.WriteQuery, store storage.Storage) execution.Request { - return &localWriteRequest{ - store: store, - writeQuery: writeQuery, +func (h *PromWriteHandler) writeAggregated( + ctx context.Context, + r *prompb.WriteRequest, +) error { + var ( + metricsAppender = h.downsampler.NewMetricsAppender() + multiErr xerrors.MultiError + ) + for _, ts := range r.Timeseries { + metricsAppender.Reset() + for _, label := range ts.Labels { + metricsAppender.AddTag(label.Name, label.Value) + } + + samplesAppender, err := metricsAppender.SamplesAppender() + if err != nil { + multiErr = multiErr.Add(err) + continue + } + + for _, elem := range ts.Samples { + err := samplesAppender.AppendGaugeSample(elem.Value) + if err != nil { + multiErr = multiErr.Add(err) + } + } } + + metricsAppender.Finalize() + + return multiErr.FinalError() } diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index ea6cf2a551..6865eecb5b 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -28,6 +28,7 @@ import ( "os" clusterclient "github.com/m3db/m3cluster/client" + "github.com/m3db/m3db/src/cmd/services/m3coordinator/downsample" dbconfig "github.com/m3db/m3db/src/cmd/services/m3dbnode/config" "github.com/m3db/m3db/src/cmd/services/m3query/config" "github.com/m3db/m3db/src/query/api/v1/handler" @@ -60,6 +61,7 @@ type Handler struct { Router *mux.Router CLFLogger *log.Logger storage storage.Storage + downsampler downsample.Downsampler engine *executor.Engine clusterClient clusterclient.Client config config.Configuration @@ -70,6 +72,7 @@ type Handler struct { // NewHandler returns a new instance of handler with routes. func NewHandler( storage storage.Storage, + downsampler downsample.Downsampler, engine *executor.Engine, clusterClient clusterclient.Client, cfg config.Configuration, @@ -87,6 +90,7 @@ func NewHandler( CLFLogger: log.New(os.Stderr, "[httpd] ", 0), Router: r, storage: storage, + downsampler: downsampler, engine: engine, clusterClient: clusterClient, config: cfg, @@ -103,8 +107,14 @@ func (h *Handler) RegisterRoutes() error { h.Router.HandleFunc(openapi.URL, logged(&openapi.DocHandler{}).ServeHTTP).Methods(openapi.HTTPMethod) h.Router.PathPrefix(openapi.StaticURLPrefix).Handler(logged(openapi.StaticHandler())) - h.Router.HandleFunc(remote.PromReadURL, logged(remote.NewPromReadHandler(h.engine, h.scope.Tagged(remoteSource))).ServeHTTP).Methods(remote.PromReadHTTPMethod) - h.Router.HandleFunc(remote.PromWriteURL, logged(remote.NewPromWriteHandler(h.storage, h.scope.Tagged(remoteSource))).ServeHTTP).Methods(remote.PromWriteHTTPMethod) + promRemoteReadHandler := remote.NewPromReadHandler(h.engine, h.scope.Tagged(remoteSource)) + promRemoteWriteHandler, err := remote.NewPromWriteHandler(h.storage, nil, h.scope.Tagged(remoteSource)) + if err != nil { + return err + } + + h.Router.HandleFunc(remote.PromReadURL, logged(promRemoteReadHandler).ServeHTTP).Methods(remote.PromReadHTTPMethod) + h.Router.HandleFunc(remote.PromWriteURL, logged(promRemoteWriteHandler).ServeHTTP).Methods(remote.PromWriteHTTPMethod) h.Router.HandleFunc(native.PromReadURL, logged(native.NewPromReadHandler(h.engine)).ServeHTTP).Methods(native.PromReadHTTPMethod) h.Router.HandleFunc(handler.SearchURL, logged(handler.NewSearchHandler(h.storage)).ServeHTTP).Methods(handler.SearchHTTPMethod) diff --git a/src/query/api/v1/httpd/handler_test.go b/src/query/api/v1/httpd/handler_test.go index 8a5561ad0f..97a2a56ad3 100644 --- a/src/query/api/v1/httpd/handler_test.go +++ b/src/query/api/v1/httpd/handler_test.go @@ -47,7 +47,8 @@ func TestPromRemoteReadGet(t *testing.T) { ctrl := gomock.NewController(t) storage, _ := local.NewStorageAndSession(t, ctrl) - h, err := NewHandler(storage, executor.NewEngine(storage), nil, config.Configuration{}, nil, tally.NewTestScope("", nil)) + h, err := NewHandler(storage, nil, executor.NewEngine(storage), nil, + config.Configuration{}, nil, tally.NewTestScope("", nil)) require.NoError(t, err, "unable to setup handler") err = h.RegisterRoutes() require.NoError(t, err, "unable to register routes") @@ -63,7 +64,8 @@ func TestPromRemoteReadPost(t *testing.T) { ctrl := gomock.NewController(t) storage, _ := local.NewStorageAndSession(t, ctrl) - h, err := NewHandler(storage, executor.NewEngine(storage), nil, config.Configuration{}, nil, tally.NewTestScope("", nil)) + h, err := NewHandler(storage, nil, executor.NewEngine(storage), nil, + config.Configuration{}, nil, tally.NewTestScope("", nil)) require.NoError(t, err, "unable to setup handler") err = h.RegisterRoutes() require.NoError(t, err, "unable to register routes") @@ -79,7 +81,8 @@ func TestPromNativeReadGet(t *testing.T) { ctrl := gomock.NewController(t) storage, _ := local.NewStorageAndSession(t, ctrl) - h, err := NewHandler(storage, executor.NewEngine(storage), nil, config.Configuration{}, nil, tally.NewTestScope("", nil)) + h, err := NewHandler(storage, nil, executor.NewEngine(storage), nil, + config.Configuration{}, nil, tally.NewTestScope("", nil)) require.NoError(t, err, "unable to setup handler") h.RegisterRoutes() h.Router.ServeHTTP(res, req) @@ -94,7 +97,8 @@ func TestPromNativeReadPost(t *testing.T) { ctrl := gomock.NewController(t) storage, _ := local.NewStorageAndSession(t, ctrl) - h, err := NewHandler(storage, executor.NewEngine(storage), nil, config.Configuration{}, nil, tally.NewTestScope("", nil)) + h, err := NewHandler(storage, nil, executor.NewEngine(storage), nil, + config.Configuration{}, nil, tally.NewTestScope("", nil)) require.NoError(t, err, "unable to setup handler") h.RegisterRoutes() h.Router.ServeHTTP(res, req) @@ -109,7 +113,8 @@ func TestRoutesGet(t *testing.T) { ctrl := gomock.NewController(t) storage, _ := local.NewStorageAndSession(t, ctrl) - h, err := NewHandler(storage, executor.NewEngine(storage), nil, config.Configuration{}, nil, tally.NewTestScope("", nil)) + h, err := NewHandler(storage, nil, executor.NewEngine(storage), nil, + config.Configuration{}, nil, tally.NewTestScope("", nil)) require.NoError(t, err, "unable to setup handler") h.RegisterRoutes() h.Router.ServeHTTP(res, req) diff --git a/src/query/functions/fetch_test.go b/src/query/functions/fetch_test.go index 7fd3a6d8e7..9fe8b1870c 100644 --- a/src/query/functions/fetch_test.go +++ b/src/query/functions/fetch_test.go @@ -39,7 +39,8 @@ func TestFetch(t *testing.T) { values, bounds := test.GenerateValuesAndBounds(nil, nil) b := test.NewBlockFromValues(bounds, values) c, sink := executor.NewControllerWithSink(parser.NodeID(1)) - mockStorage := mock.NewMockStorageWithBlocks([]block.Block{b}) + mockStorage := mock.NewMockStorage() + mockStorage.SetFetchBlocksResult(block.Result{Blocks: []block.Block{b}}, nil) source := (&FetchOp{}).Node(c, mockStorage, transform.Options{}) err := source.Execute(context.TODO()) require.NoError(t, err) diff --git a/src/query/policy/filter/storage_test.go b/src/query/policy/filter/storage_test.go index d73ac23a82..2fffe55bc0 100644 --- a/src/query/policy/filter/storage_test.go +++ b/src/query/policy/filter/storage_test.go @@ -30,13 +30,22 @@ import ( ) var ( - local = mock.NewMockStorageWithType(storage.TypeLocalDC) - remote = mock.NewMockStorageWithType(storage.TypeRemoteDC) - multi = mock.NewMockStorageWithType(storage.TypeMultiDC) + local mock.Storage + remote mock.Storage + multi mock.Storage q = &storage.FetchQuery{} ) +func init() { + local = mock.NewMockStorage() + local.SetTypeResult(storage.TypeLocalDC) + remote = mock.NewMockStorage() + remote.SetTypeResult(storage.TypeRemoteDC) + multi = mock.NewMockStorage() + multi.SetTypeResult(storage.TypeMultiDC) +} + func TestLocalOnly(t *testing.T) { assert.True(t, LocalOnly(q, local)) assert.False(t, LocalOnly(q, remote)) diff --git a/src/query/server/server.go b/src/query/server/server.go index 402b5a251a..d9eddeccc7 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -32,9 +32,11 @@ import ( clusterclient "github.com/m3db/m3cluster/client" etcdclient "github.com/m3db/m3cluster/client/etcd" + "github.com/m3db/m3db/src/cmd/services/m3coordinator/downsample" dbconfig "github.com/m3db/m3db/src/cmd/services/m3dbnode/config" "github.com/m3db/m3db/src/cmd/services/m3query/config" "github.com/m3db/m3db/src/dbnode/client" + "github.com/m3db/m3db/src/dbnode/serialize" "github.com/m3db/m3db/src/query/api/v1/httpd" m3dbcluster "github.com/m3db/m3db/src/query/cluster/m3db" "github.com/m3db/m3db/src/query/executor" @@ -46,6 +48,7 @@ import ( "github.com/m3db/m3db/src/query/stores/m3db" tsdbRemote "github.com/m3db/m3db/src/query/tsdb/remote" "github.com/m3db/m3db/src/query/util/logging" + "github.com/m3db/m3x/clock" xconfig "github.com/m3db/m3x/config" "github.com/m3db/m3x/ident" "github.com/m3db/m3x/instrument" @@ -117,6 +120,7 @@ func Run(runOpts RunOptions) { clusterClientCh = runOpts.ClusterClient } + var clusterManagementClient clusterclient.Client if clusterClientCh == nil { var etcdCfg *etcdclient.Configuration switch { @@ -131,13 +135,13 @@ func Run(runOpts RunOptions) { if etcdCfg != nil { // We resolved an etcd configuration for cluster management endpoints clusterSvcClientOpts := etcdCfg.NewOptions() - clusterClient, err := etcdclient.NewConfigServiceClient(clusterSvcClientOpts) + clusterManagementClient, err = etcdclient.NewConfigServiceClient(clusterSvcClientOpts) if err != nil { logger.Fatal("unable to create cluster management etcd client", zap.Any("error", err)) } clusterClientSendableCh := make(chan clusterclient.Client, 1) - clusterClientSendableCh <- clusterClient + clusterClientSendableCh <- clusterManagementClient clusterClientCh = clusterClientSendableCh } } @@ -203,7 +207,7 @@ func Run(runOpts RunOptions) { return workerPool }) - fanoutStorage, storageCleanup := setupStorages(logger, clusters, cfg, objectPool) + fanoutStorage, storageCleanup := newStorages(logger, clusters, cfg, objectPool) defer storageCleanup() var clusterClient clusterclient.Client @@ -215,7 +219,20 @@ func Run(runOpts RunOptions) { }, nil) } - handler, err := httpd.NewHandler(fanoutStorage, executor.NewEngine(fanoutStorage), + var ( + namespaces = clusters.ClusterNamespaces() + downsampler downsample.Downsampler + ) + if n := namespaces.NumAggregatedClusterNamespaces(); n > 0 { + logger.Info("configuring downsampler to use with aggregated cluster namespaces", + zap.Int("numAggregatedClusterNamespaces", n)) + downsampler = newDownsampler(logger, clusterManagementClient, + fanoutStorage, instrumentOptions) + } + + engine := executor.NewEngine(fanoutStorage) + + handler, err := httpd.NewHandler(fanoutStorage, downsampler, engine, clusterClient, cfg, runOpts.DBConfig, scope) if err != nil { logger.Fatal("unable to set up handlers", zap.Any("error", err)) @@ -239,7 +256,57 @@ func Run(runOpts RunOptions) { } } -func setupStorages(logger *zap.Logger, clusters local.Clusters, cfg config.Configuration, workerPool pool.ObjectPool) (storage.Storage, func()) { +func newDownsampler( + logger *zap.Logger, + clusterManagementClient clusterclient.Client, + storage storage.Storage, + instrumentOpts instrument.Options, +) downsample.Downsampler { + if clusterManagementClient == nil { + logger.Fatal("no configured cluster management config, must set this " + + "config for downsampler") + } + + kvStore, err := clusterManagementClient.KV() + if err != nil { + logger.Fatal("unable to create KV store from the cluster management "+ + "config client", zap.Any("error", err)) + } + + tagEncoderOptions := serialize.NewTagEncoderOptions() + tagDecoderOptions := serialize.NewTagDecoderOptions() + tagEncoderPoolOptions := pool.NewObjectPoolOptions(). + SetInstrumentOptions(instrumentOpts. + SetMetricsScope(instrumentOpts.MetricsScope(). + SubScope("tag-encoder-pool"))) + tagDecoderPoolOptions := pool.NewObjectPoolOptions(). + SetInstrumentOptions(instrumentOpts. + SetMetricsScope(instrumentOpts.MetricsScope(). + SubScope("tag-decoder-pool"))) + + downsampler, err := downsample.NewDownsampler(downsample.DownsamplerOptions{ + Storage: storage, + RulesKVStore: kvStore, + ClockOptions: clock.NewOptions(), + InstrumentOptions: instrumentOpts, + TagEncoderOptions: tagEncoderOptions, + TagDecoderOptions: tagDecoderOptions, + TagEncoderPoolOptions: tagEncoderPoolOptions, + TagDecoderPoolOptions: tagDecoderPoolOptions, + }) + if err != nil { + logger.Fatal("unable to create downsampler", zap.Any("error", err)) + } + + return downsampler +} + +func newStorages( + logger *zap.Logger, + clusters local.Clusters, + cfg config.Configuration, + workerPool pool.ObjectPool, +) (storage.Storage, func()) { cleanup := func() {} localStorage := local.NewStorage(clusters, workerPool) diff --git a/src/query/storage/local/cluster.go b/src/query/storage/local/cluster.go index 03983cdfa3..277f079002 100644 --- a/src/query/storage/local/cluster.go +++ b/src/query/storage/local/cluster.go @@ -45,7 +45,7 @@ type Clusters interface { io.Closer // ClusterNamespaces returns all known cluster namespaces. - ClusterNamespaces() []ClusterNamespace + ClusterNamespaces() ClusterNamespaces // UnaggregatedClusterNamespace returns the valid unaggregated // cluster namespace. @@ -70,6 +70,21 @@ type ClusterNamespace interface { Session() client.Session } +// ClusterNamespaces is a slice of ClusterNamespace instances. +type ClusterNamespaces []ClusterNamespace + +// NumAggregatedClusterNamespaces returns the number of aggregated +// cluster namespaces. +func (n ClusterNamespaces) NumAggregatedClusterNamespaces() int { + count := 0 + for _, namespace := range n { + if namespace.Attributes().MetricsType == storage.AggregatedMetricsType { + count++ + } + } + return count +} + // UnaggregatedClusterNamespaceDefinition is the definition for the // cluster namespace that holds unaggregated metrics data. type UnaggregatedClusterNamespaceDefinition struct { @@ -132,7 +147,7 @@ func NewClusters( ) (Clusters, error) { expectedAggregated := len(aggregatedClusterNamespaces) expectedAll := 1 + expectedAggregated - namespaces := make([]ClusterNamespace, 0, expectedAll) + namespaces := make(ClusterNamespaces, 0, expectedAll) aggregatedNamespaces := make(map[RetentionResolution]ClusterNamespace, expectedAggregated) @@ -172,7 +187,7 @@ func NewClusters( }, nil } -func (c *clusters) ClusterNamespaces() []ClusterNamespace { +func (c *clusters) ClusterNamespaces() ClusterNamespaces { return c.namespaces } diff --git a/src/query/storage/mock/storage.go b/src/query/storage/mock/storage.go index 76ac80dab6..73ff34c964 100644 --- a/src/query/storage/mock/storage.go +++ b/src/query/storage/mock/storage.go @@ -22,54 +22,150 @@ package mock import ( "context" + "sync" "github.com/m3db/m3db/src/query/block" "github.com/m3db/m3db/src/query/storage" ) +// Storage implements storage.Storage and provides methods to help +// read what was written and set what to retrieve. +type Storage interface { + storage.Storage + + SetTypeResult(storage.Type) + SetFetchResult(*storage.FetchResult, error) + SetFetchTagsResult(*storage.SearchResults, error) + SetWriteResult(error) + SetFetchBlocksResult(block.Result, error) + SetCloseResult(error) + Writes() []*storage.WriteQuery +} + type mockStorage struct { - sType storage.Type - blocks []block.Block + sync.RWMutex + typeResult struct { + result storage.Type + } + fetchResult struct { + result *storage.FetchResult + err error + } + fetchTagsResult struct { + result *storage.SearchResults + err error + } + writeResult struct { + err error + } + fetchBlocksResult struct { + result block.Result + err error + } + closeResult struct { + err error + } + writes []*storage.WriteQuery } // NewMockStorage creates a new mock Storage instance. -func NewMockStorage() storage.Storage { - return &mockStorage{sType: storage.Type(0)} +func NewMockStorage() Storage { + return &mockStorage{} +} + +func (s *mockStorage) SetTypeResult(result storage.Type) { + s.Lock() + defer s.Unlock() + s.typeResult.result = result +} + +func (s *mockStorage) SetFetchResult(result *storage.FetchResult, err error) { + s.Lock() + defer s.Unlock() + s.fetchResult.result = result + s.fetchResult.err = err +} + +func (s *mockStorage) SetFetchTagsResult(result *storage.SearchResults, err error) { + s.Lock() + defer s.Unlock() + s.fetchTagsResult.result = result + s.fetchTagsResult.err = err +} + +func (s *mockStorage) SetWriteResult(err error) { + s.Lock() + defer s.Unlock() + s.writeResult.err = err +} + +func (s *mockStorage) SetFetchBlocksResult(result block.Result, err error) { + s.Lock() + defer s.Unlock() + s.fetchBlocksResult.result = result + s.fetchBlocksResult.err = err } -// NewMockStorageWithBlocks creates a new mock Storage instance with blocks. -func NewMockStorageWithBlocks(blocks []block.Block) storage.Storage { - return &mockStorage{sType: storage.Type(0), blocks: blocks} +func (s *mockStorage) SetCloseResult(err error) { + s.Lock() + defer s.Unlock() + s.closeResult.err = err } -// NewMockStorageWithType creates a new mock Storage instance. -func NewMockStorageWithType(sType storage.Type) storage.Storage { - return &mockStorage{sType: sType} +func (s *mockStorage) Writes() []*storage.WriteQuery { + s.RLock() + defer s.RUnlock() + return s.writes } -func (s *mockStorage) Fetch(ctx context.Context, query *storage.FetchQuery, _ *storage.FetchOptions) (*storage.FetchResult, error) { - return nil, nil +func (s *mockStorage) Fetch( + ctx context.Context, + query *storage.FetchQuery, + _ *storage.FetchOptions, +) (*storage.FetchResult, error) { + s.RLock() + defer s.RUnlock() + return s.fetchResult.result, s.fetchResult.err } -func (s *mockStorage) FetchTags(ctx context.Context, query *storage.FetchQuery, _ *storage.FetchOptions) (*storage.SearchResults, error) { - return nil, nil +func (s *mockStorage) FetchTags( + ctx context.Context, + query *storage.FetchQuery, + _ *storage.FetchOptions, +) (*storage.SearchResults, error) { + s.RLock() + defer s.RUnlock() + return s.fetchTagsResult.result, s.fetchTagsResult.err } -func (s *mockStorage) Write(ctx context.Context, query *storage.WriteQuery) error { - return nil +func (s *mockStorage) Write( + ctx context.Context, + query *storage.WriteQuery, +) error { + s.Lock() + defer s.Unlock() + s.writes = append(s.writes, query) + return s.writeResult.err } func (s *mockStorage) Type() storage.Type { - return s.sType + s.RLock() + defer s.RUnlock() + return s.typeResult.result } func (s *mockStorage) Close() error { - return nil + s.RLock() + defer s.RUnlock() + return s.closeResult.err } func (s *mockStorage) FetchBlocks( - ctx context.Context, query *storage.FetchQuery, options *storage.FetchOptions) (block.Result, error) { - return block.Result{ - Blocks: s.blocks, - }, nil + ctx context.Context, + query *storage.FetchQuery, + options *storage.FetchOptions, +) (block.Result, error) { + s.RLock() + defer s.RUnlock() + return s.fetchBlocksResult.result, s.fetchBlocksResult.err }