diff --git a/.gitignore b/.gitignore index 8b26ce9839..05d4521442 100644 --- a/.gitignore +++ b/.gitignore @@ -61,4 +61,4 @@ m3db.io/openapi .vagrant #docs ignores -docs/public \ No newline at end of file +docs/public diff --git a/netlify.toml b/netlify.toml index d7c6f085b7..bb06d1d619 100644 --- a/netlify.toml +++ b/netlify.toml @@ -1,13 +1,13 @@ [build] # Directory to change to before starting a build. # This is where we will look for package.json/.nvmrc/etc. - base = "docs-beta/" + base = "docs/" # Directory that contains the deploy-ready HTML files and assets generated by # the build. This is relative to the base directory if one has been set, or the # root directory if a base has not been set. This sample publishes the # directory located at the absolute path "root/project/build-output" - publish = "docs-beta/public" + publish = "docs/public" [context."chrischinch/quickstart"] command = "hugo" diff --git a/src/cmd/services/m3coordinator/server/m3msg/config.go b/src/cmd/services/m3coordinator/server/m3msg/config.go index f135b3ae39..953f62a2d0 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/config.go +++ b/src/cmd/services/m3coordinator/server/m3msg/config.go @@ -21,6 +21,7 @@ package m3msg import ( + "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/msg/consumer" "github.com/m3db/m3/src/x/instrument" xio "github.com/m3db/m3/src/x/io" @@ -67,6 +68,7 @@ func (c Configuration) NewServer( type handlerConfiguration struct { // ProtobufDecoderPool configs the protobuf decoder pool. ProtobufDecoderPool pool.ObjectPoolConfiguration `yaml:"protobufDecoderPool"` + BlackholePolicies []policy.StoragePolicy `yaml:"blackholePolicies"` } func (c handlerConfiguration) newHandler( @@ -82,6 +84,7 @@ func (c handlerConfiguration) newHandler( }), ), ProtobufDecoderPoolOptions: c.ProtobufDecoderPool.NewObjectPoolOptions(iOpts), + BlockholePolicies: c.BlackholePolicies, }) return consumer.NewMessageHandler(p, cOpts), nil } diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go index 4c3d78b242..7589b95595 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go @@ -25,6 +25,7 @@ import ( "sync" "github.com/m3db/m3/src/metrics/encoding/protobuf" + "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/msg/consumer" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" @@ -38,11 +39,13 @@ type Options struct { InstrumentOptions instrument.Options WriteFn WriteFn ProtobufDecoderPoolOptions pool.ObjectPoolOptions + BlockholePolicies []policy.StoragePolicy } type handlerMetrics struct { messageReadError tally.Counter metricAccepted tally.Counter + droppedMetricBlackholePolicy tally.Counter droppedMetricDecodeError tally.Counter droppedMetricDecodeMalformed tally.Counter } @@ -55,6 +58,9 @@ func newHandlerMetrics(scope tally.Scope) handlerMetrics { droppedMetricDecodeError: messageScope.Tagged(map[string]string{ "reason": "decode-error", }).Counter("dropped"), + droppedMetricBlackholePolicy: messageScope.Tagged(map[string]string{ + "reason": "blackhole-policy", + }).Counter("dropped"), droppedMetricDecodeMalformed: messageScope.Tagged(map[string]string{ "reason": "decode-malformed", }).Counter("dropped"), @@ -68,19 +74,34 @@ type pbHandler struct { wg *sync.WaitGroup logger *zap.Logger m handlerMetrics + + // Set of policies for which when we see a metric we drop it on the floor. + blackholePolicies []policy.StoragePolicy } func newProtobufProcessor(opts Options) consumer.MessageProcessor { p := protobuf.NewAggregatedDecoderPool(opts.ProtobufDecoderPoolOptions) p.Init() - return &pbHandler{ - ctx: context.Background(), - writeFn: opts.WriteFn, - pool: p, - wg: &sync.WaitGroup{}, - logger: opts.InstrumentOptions.Logger(), - m: newHandlerMetrics(opts.InstrumentOptions.MetricsScope()), + + h := &pbHandler{ + ctx: context.Background(), + writeFn: opts.WriteFn, + pool: p, + wg: &sync.WaitGroup{}, + logger: opts.InstrumentOptions.Logger(), + m: newHandlerMetrics(opts.InstrumentOptions.MetricsScope()), + blackholePolicies: opts.BlockholePolicies, + } + + if len(opts.BlockholePolicies) > 0 { + policyNames := make([]string, 0, len(opts.BlockholePolicies)) + for _, sp := range h.blackholePolicies { + policyNames = append(policyNames, sp.String()) + } + h.logger.Info("m3msg handler blackholing metrics for configured policies", zap.Strings("policyNames", policyNames)) } + + return h } func (h *pbHandler) Process(msg consumer.Message) { @@ -96,10 +117,22 @@ func (h *pbHandler) Process(msg consumer.Message) { h.m.droppedMetricDecodeMalformed.Inc(1) return } + h.m.metricAccepted.Inc(1) h.wg.Add(1) r := NewProtobufCallback(msg, dec, h.wg) + + // If storage policy is blackholed, ack the message immediately and don't + // bother passing down the write path. + for _, blackholeSp := range h.blackholePolicies { + if sp.Equivalent(blackholeSp) { + h.m.droppedMetricBlackholePolicy.Inc(1) + r.Callback(OnSuccess) + return + } + } + h.writeFn(h.ctx, dec.ID(), dec.TimeNanos(), dec.EncodeNanos(), dec.Value(), sp, r) } diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go index b67319836f..02e411749a 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go @@ -26,6 +26,7 @@ import ( "net" "sync" "testing" + "time" "github.com/m3db/m3/src/metrics/encoding/protobuf" "github.com/m3db/m3/src/metrics/metric" @@ -36,13 +37,19 @@ import ( "github.com/m3db/m3/src/msg/protocol/proto" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/server" + xtime "github.com/m3db/m3/src/x/time" "github.com/stretchr/testify/require" ) var ( - testID = "stats.sjc1.gauges.m3+some-name+dc=sjc1,env=production,service=foo,type=gauge" - validStoragePolicy = policy.MustParseStoragePolicy("1m:40d") + testID = "stats.foo1.gauges.m3+some-name+dc=foo1,env=production,service=foo,type=gauge" + + // baseStoragePolicy represents what we typically define in config for SP. + // precisionStoragePolicy is the same retention/resolution, but includes the + // precision (which is often included with incoming writes). + baseStoragePolicy = policy.MustParseStoragePolicy("1m:40d") + precisionStoragePolicy = policy.NewStoragePolicy(time.Minute, xtime.Second, 40*24*time.Hour) ) func TestM3MsgServerWithProtobufHandler(t *testing.T) { @@ -74,7 +81,7 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) { Value: 1, Type: metric.GaugeType, }, - StoragePolicy: validStoragePolicy, + StoragePolicy: precisionStoragePolicy, } encoder := protobuf.NewAggregatedEncoder(nil) @@ -98,7 +105,7 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) { Value: 0, Type: metric.UnknownType, }, - StoragePolicy: validStoragePolicy, + StoragePolicy: precisionStoragePolicy, } require.NoError(t, encoder.Encode(m2, 3000)) enc = proto.NewEncoder(opts.EncoderOptions()) @@ -127,6 +134,95 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) { require.Equal(t, m2.StoragePolicy, payload.sp) } +func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + w := &mockWriter{m: make(map[string]payload)} + hOpts := Options{ + WriteFn: w.write, + InstrumentOptions: instrument.NewOptions(), + BlockholePolicies: []policy.StoragePolicy{baseStoragePolicy}, + } + opts := consumer.NewOptions(). + SetAckBufferSize(1). + SetConnectionWriteBufferSize(1) + + s := server.NewServer( + "a", + consumer.NewMessageHandler(newProtobufProcessor(hOpts), opts), + server.NewOptions(), + ) + s.Serve(l) + + conn, err := net.Dial("tcp", l.Addr().String()) + require.NoError(t, err) + m1 := aggregated.MetricWithStoragePolicy{ + Metric: aggregated.Metric{ + ID: []byte(testID), + TimeNanos: 1000, + Value: 1, + Type: metric.GaugeType, + }, + StoragePolicy: precisionStoragePolicy, + } + + encoder := protobuf.NewAggregatedEncoder(nil) + require.NoError(t, encoder.Encode(m1, 2000)) + enc := proto.NewEncoder(opts.EncoderOptions()) + require.NoError(t, enc.Encode(&msgpb.Message{ + Value: encoder.Buffer().Bytes(), + })) + _, err = conn.Write(enc.Bytes()) + require.NoError(t, err) + + var a msgpb.Ack + dec := proto.NewDecoder(conn, opts.DecoderOptions(), 10) + require.NoError(t, dec.Decode(&a)) + require.Equal(t, 0, w.ingested()) + + // Ensure a metric with a different policy still gets ingested. + m2 := aggregated.MetricWithStoragePolicy{ + Metric: aggregated.Metric{ + ID: []byte{}, + TimeNanos: 0, + Value: 0, + Type: metric.UnknownType, + }, + StoragePolicy: policy.MustParseStoragePolicy("5m:180d"), + } + require.NoError(t, encoder.Encode(m2, 3000)) + enc = proto.NewEncoder(opts.EncoderOptions()) + require.NoError(t, enc.Encode(&msgpb.Message{ + Value: encoder.Buffer().Bytes(), + })) + _, err = conn.Write(enc.Bytes()) + require.NoError(t, err) + require.NoError(t, dec.Decode(&a)) + require.Equal(t, 1, w.ingested()) + + // Ensure a metric with base policy (equivalent but default precision) is + // still ignored. + m3 := aggregated.MetricWithStoragePolicy{ + Metric: aggregated.Metric{ + ID: []byte(testID), + TimeNanos: 1000, + Value: 1, + Type: metric.GaugeType, + }, + StoragePolicy: baseStoragePolicy, + } + require.NoError(t, encoder.Encode(m3, 3000)) + enc = proto.NewEncoder(opts.EncoderOptions()) + require.NoError(t, enc.Encode(&msgpb.Message{ + Value: encoder.Buffer().Bytes(), + })) + _, err = conn.Write(enc.Bytes()) + require.NoError(t, err) + require.NoError(t, dec.Decode(&a)) + require.Equal(t, 1, w.ingested()) +} + type mockWriter struct { sync.Mutex diff --git a/src/dbnode/client/client_mock.go b/src/dbnode/client/client_mock.go index 3a9842dceb..0eddc159c9 100644 --- a/src/dbnode/client/client_mock.go +++ b/src/dbnode/client/client_mock.go @@ -2224,6 +2224,34 @@ func (mr *MockOptionsMockRecorder) HostQueueOpsArrayPoolSize() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HostQueueOpsArrayPoolSize", reflect.TypeOf((*MockOptions)(nil).HostQueueOpsArrayPoolSize)) } +// SetHostQueueEmitsHealthStatus mocks base method +func (m *MockOptions) SetHostQueueEmitsHealthStatus(value bool) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetHostQueueEmitsHealthStatus", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetHostQueueEmitsHealthStatus indicates an expected call of SetHostQueueEmitsHealthStatus +func (mr *MockOptionsMockRecorder) SetHostQueueEmitsHealthStatus(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHostQueueEmitsHealthStatus", reflect.TypeOf((*MockOptions)(nil).SetHostQueueEmitsHealthStatus), value) +} + +// HostQueueEmitsHealthStatus mocks base method +func (m *MockOptions) HostQueueEmitsHealthStatus() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HostQueueEmitsHealthStatus") + ret0, _ := ret[0].(bool) + return ret0 +} + +// HostQueueEmitsHealthStatus indicates an expected call of HostQueueEmitsHealthStatus +func (mr *MockOptionsMockRecorder) HostQueueEmitsHealthStatus() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HostQueueEmitsHealthStatus", reflect.TypeOf((*MockOptions)(nil).HostQueueEmitsHealthStatus)) +} + // SetSeriesIteratorPoolSize mocks base method func (m *MockOptions) SetSeriesIteratorPoolSize(value int) Options { m.ctrl.T.Helper() @@ -3731,6 +3759,34 @@ func (mr *MockAdminOptionsMockRecorder) HostQueueOpsArrayPoolSize() *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HostQueueOpsArrayPoolSize", reflect.TypeOf((*MockAdminOptions)(nil).HostQueueOpsArrayPoolSize)) } +// SetHostQueueEmitsHealthStatus mocks base method +func (m *MockAdminOptions) SetHostQueueEmitsHealthStatus(value bool) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetHostQueueEmitsHealthStatus", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetHostQueueEmitsHealthStatus indicates an expected call of SetHostQueueEmitsHealthStatus +func (mr *MockAdminOptionsMockRecorder) SetHostQueueEmitsHealthStatus(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHostQueueEmitsHealthStatus", reflect.TypeOf((*MockAdminOptions)(nil).SetHostQueueEmitsHealthStatus), value) +} + +// HostQueueEmitsHealthStatus mocks base method +func (m *MockAdminOptions) HostQueueEmitsHealthStatus() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HostQueueEmitsHealthStatus") + ret0, _ := ret[0].(bool) + return ret0 +} + +// HostQueueEmitsHealthStatus indicates an expected call of HostQueueEmitsHealthStatus +func (mr *MockAdminOptionsMockRecorder) HostQueueEmitsHealthStatus() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HostQueueEmitsHealthStatus", reflect.TypeOf((*MockAdminOptions)(nil).HostQueueEmitsHealthStatus)) +} + // SetSeriesIteratorPoolSize mocks base method func (m *MockAdminOptions) SetSeriesIteratorPoolSize(value int) Options { m.ctrl.T.Helper() diff --git a/src/dbnode/client/connection_pool.go b/src/dbnode/client/connection_pool.go index bcfc04973c..937408673d 100644 --- a/src/dbnode/client/connection_pool.go +++ b/src/dbnode/client/connection_pool.go @@ -32,8 +32,9 @@ import ( "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" "github.com/m3db/m3/src/dbnode/topology" xclose "github.com/m3db/m3/src/x/close" - "github.com/m3db/stackmurmur3/v2" + murmur3 "github.com/m3db/stackmurmur3/v2" + "github.com/uber-go/tally" "github.com/uber/tchannel-go/thrift" "go.uber.org/zap" ) @@ -63,6 +64,7 @@ type connPool struct { sleepHealth sleepFn sleepHealthRetry sleepFn status status + healthStatus tally.Gauge } type conn struct { @@ -94,6 +96,7 @@ func newConnectionPool(host topology.Host, opts Options) connectionPool { sleepConnect: time.Sleep, sleepHealth: time.Sleep, sleepHealthRetry: time.Sleep, + healthStatus: opts.InstrumentOptions().MetricsScope().Gauge("health-status"), } return p @@ -186,11 +189,13 @@ func (p *connPool) connectEvery(interval time.Duration, stutter time.Duration) { // Health check the connection if err := p.healthCheckNewConn(client, p.opts); err != nil { + p.maybeEmitHealthStatus(healthStatusCheckFailed) log.Debug("could not connect, failed health check", zap.String("host", address), zap.Error(err)) channel.Close() return } + p.maybeEmitHealthStatus(healthStatusOK) p.Lock() if p.status == statusOpen { p.pool = append(p.pool, conn{channel, client}) @@ -206,6 +211,12 @@ func (p *connPool) connectEvery(interval time.Duration, stutter time.Duration) { } } +func (p *connPool) maybeEmitHealthStatus(hs healthStatus) { + if p.opts.HostQueueEmitsHealthStatus() { + p.healthStatus.Update(float64(hs)) + } +} + func (p *connPool) healthCheckEvery(interval time.Duration, stutter time.Duration) { log := p.opts.InstrumentOptions().Logger() nowFn := p.opts.ClockOptions().NowFn() diff --git a/src/dbnode/client/options.go b/src/dbnode/client/options.go index 6463d4087e..6b06004048 100644 --- a/src/dbnode/client/options.go +++ b/src/dbnode/client/options.go @@ -119,6 +119,9 @@ const ( // defaultHostQueueOpsArrayPoolSize is the default host queue ops array pool size defaultHostQueueOpsArrayPoolSize = 8 + // defaultHostQueueEmitsHealthStatus is false + defaultHostQueueEmitsHealthStatus = false + // defaultBackgroundConnectInterval is the default background connect interval defaultBackgroundConnectInterval = 4 * time.Second @@ -261,6 +264,7 @@ type options struct { hostQueueOpsFlushSize int hostQueueOpsFlushInterval time.Duration hostQueueOpsArrayPoolSize int + hostQueueEmitsHealthStatus bool seriesIteratorPoolSize int seriesIteratorArrayPoolBuckets []pool.Bucket checkedBytesWrapperPoolSize int @@ -381,6 +385,7 @@ func newOptions() *options { hostQueueOpsFlushSize: defaultHostQueueOpsFlushSize, hostQueueOpsFlushInterval: defaultHostQueueOpsFlushInterval, hostQueueOpsArrayPoolSize: defaultHostQueueOpsArrayPoolSize, + hostQueueEmitsHealthStatus: defaultHostQueueEmitsHealthStatus, seriesIteratorPoolSize: defaultSeriesIteratorPoolSize, seriesIteratorArrayPoolBuckets: defaultSeriesIteratorArrayPoolBuckets, checkedBytesWrapperPoolSize: defaultCheckedBytesWrapperPoolSize, @@ -884,6 +889,16 @@ func (o *options) HostQueueOpsArrayPoolSize() int { return o.hostQueueOpsArrayPoolSize } +func (o *options) SetHostQueueEmitsHealthStatus(value bool) Options { + opts := *o + opts.hostQueueEmitsHealthStatus = value + return &opts +} + +func (o *options) HostQueueEmitsHealthStatus() bool { + return o.hostQueueEmitsHealthStatus +} + func (o *options) SetSeriesIteratorPoolSize(value int) Options { opts := *o opts.seriesIteratorPoolSize = value diff --git a/src/dbnode/client/types.go b/src/dbnode/client/types.go index 3c4ba2ed03..ccbd574f94 100644 --- a/src/dbnode/client/types.go +++ b/src/dbnode/client/types.go @@ -515,6 +515,12 @@ type Options interface { // HostQueueOpsArrayPoolSize returns the hostQueueOpsArrayPoolSize. HostQueueOpsArrayPoolSize() int + // SetHostQueueEmitsHealthStatus sets the hostQueueEmitHealthStatus. + SetHostQueueEmitsHealthStatus(value bool) Options + + // HostQueueEmitsHealthStatus returns the hostQueueEmitHealthStatus. + HostQueueEmitsHealthStatus() bool + // SetSeriesIteratorPoolSize sets the seriesIteratorPoolSize. SetSeriesIteratorPoolSize(value int) Options @@ -710,6 +716,13 @@ const ( statusClosed ) +type healthStatus int + +const ( + healthStatusCheckFailed healthStatus = iota + healthStatusOK +) + type op interface { // Size returns the effective size of inner operations. Size() int