diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index 9db630ac34..fec28c40fe 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -284,6 +284,9 @@ type MatcherConfiguration struct { // NamespaceTag defines the namespace tag to use to select rules // namespace to evaluate against. Default is "__m3_namespace__". NamespaceTag string `yaml:"namespaceTag"` + // RequireNamespaceWatchOnInit returns the flag to ensure matcher is initialized with a loaded namespace watch. + // This only makes sense to use if the corresponding namespace / ruleset values are properly seeded. + RequireNamespaceWatchOnInit bool `yaml:"requireNamespaceWatchOnInit"` } // MatcherCacheConfiguration is the configuration for the rule matcher cache. @@ -710,7 +713,8 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { SetInstrumentOptions(instrumentOpts). SetRuleSetOptions(ruleSetOpts). SetKVStore(o.RulesKVStore). - SetNamespaceTag([]byte(namespaceTag)) + SetNamespaceTag([]byte(namespaceTag)). + SetRequireNamespaceWatchOnInit(cfg.Matcher.RequireNamespaceWatchOnInit) // NB(r): If rules are being explicitly set in config then we are // going to use an in memory KV store for rules and explicitly set them up. diff --git a/src/metrics/matcher/namespaces.go b/src/metrics/matcher/namespaces.go index 4b532d0ae2..fa3e8dbcdb 100644 --- a/src/metrics/matcher/namespaces.go +++ b/src/metrics/matcher/namespaces.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/rules" "github.com/m3db/m3/src/x/clock" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/watch" "github.com/uber-go/tally" @@ -112,27 +113,29 @@ type namespaces struct { onNamespaceAddedFn OnNamespaceAddedFn onNamespaceRemovedFn OnNamespaceRemovedFn - proto *rulepb.Namespaces - rules *namespaceRuleSetsMap - metrics namespacesMetrics + proto *rulepb.Namespaces + rules *namespaceRuleSetsMap + metrics namespacesMetrics + requireNamespaceWatchOnInit bool } // NewNamespaces creates a new namespaces object. func NewNamespaces(key string, opts Options) Namespaces { instrumentOpts := opts.InstrumentOptions() n := &namespaces{ - key: key, - store: opts.KVStore(), - opts: opts, - nowFn: opts.ClockOptions().NowFn(), - log: instrumentOpts.Logger(), - ruleSetKeyFn: opts.RuleSetKeyFn(), - matchRangePast: opts.MatchRangePast(), - onNamespaceAddedFn: opts.OnNamespaceAddedFn(), - onNamespaceRemovedFn: opts.OnNamespaceRemovedFn(), - proto: &rulepb.Namespaces{}, - rules: newNamespaceRuleSetsMap(namespaceRuleSetsMapOptions{}), - metrics: newNamespacesMetrics(instrumentOpts.MetricsScope()), + key: key, + store: opts.KVStore(), + opts: opts, + nowFn: opts.ClockOptions().NowFn(), + log: instrumentOpts.Logger(), + ruleSetKeyFn: opts.RuleSetKeyFn(), + matchRangePast: opts.MatchRangePast(), + onNamespaceAddedFn: opts.OnNamespaceAddedFn(), + onNamespaceRemovedFn: opts.OnNamespaceRemovedFn(), + proto: &rulepb.Namespaces{}, + rules: newNamespaceRuleSetsMap(namespaceRuleSetsMapOptions{}), + metrics: newNamespacesMetrics(instrumentOpts.MetricsScope()), + requireNamespaceWatchOnInit: opts.RequireNamespaceWatchOnInit(), } valueOpts := runtime.NewOptions(). SetInstrumentOptions(instrumentOpts). @@ -160,10 +163,15 @@ func (n *namespaces) Open() error { // to be more resilient to error conditions preventing process // from starting up. n.metrics.initWatchErrors.Inc(1) + if n.requireNamespaceWatchOnInit { + return err + } + n.opts.InstrumentOptions().Logger().With( zap.String("key", n.key), zap.Error(err), ).Error("error initializing namespaces values, retrying in the background") + return nil } @@ -255,7 +263,12 @@ func (n *namespaces) process(value interface{}) error { n.Lock() defer n.Unlock() - var watchWg sync.WaitGroup + var ( + watchWg sync.WaitGroup + multiErr xerrors.MultiError + errLock sync.Mutex + ) + for _, entry := range incoming.Iter() { namespace, elem := entry.Key(), rules.Namespace(entry.Value()) nsName, snapshots := elem.Name(), elem.Snapshots() @@ -302,6 +315,13 @@ func (n *namespaces) process(value interface{}) error { n.log.Error("failed to watch ruleset updates", zap.String("ruleSetKey", ruleSet.Key()), zap.Error(err)) + + // Track errors if we explicitly want to ensure watches succeed. + if n.requireNamespaceWatchOnInit { + errLock.Lock() + multiErr = multiErr.Add(err) + errLock.Unlock() + } } }() } @@ -312,6 +332,11 @@ func (n *namespaces) process(value interface{}) error { } watchWg.Wait() + + if !multiErr.Empty() { + return multiErr.FinalError() + } + for _, entry := range n.rules.Iter() { namespace, ruleSet := entry.Key(), entry.Value() _, exists := incoming.Get(namespace) diff --git a/src/metrics/matcher/namespaces_test.go b/src/metrics/matcher/namespaces_test.go index 8a991f93f7..a61d3648db 100644 --- a/src/metrics/matcher/namespaces_test.go +++ b/src/metrics/matcher/namespaces_test.go @@ -46,10 +46,10 @@ func TestNamespacesWatchAndClose(t *testing.T) { store, _, nss, _ := testNamespaces() proto := &rulepb.Namespaces{ Namespaces: []*rulepb.Namespace{ - &rulepb.Namespace{ + { Name: "fooNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 1, Tombstoned: true, }, @@ -64,6 +64,68 @@ func TestNamespacesWatchAndClose(t *testing.T) { nss.Close() } +func TestNamespacesWatchSoftErr(t *testing.T) { + _, _, nss, _ := testNamespaces() + // No value set, so this will soft error + require.NoError(t, nss.Open()) +} + +func TestNamespacesWatchRulesetSoftErr(t *testing.T) { + store, _, nss, _ := testNamespaces() + proto := &rulepb.Namespaces{ + Namespaces: []*rulepb.Namespace{ + { + Name: "fooNs", + Snapshots: []*rulepb.NamespaceSnapshot{ + { + ForRulesetVersion: 1, + Tombstoned: true, + }, + }, + }, + }, + } + _, err := store.SetIfNotExists(testNamespacesKey, proto) + require.NoError(t, err) + + // This should also soft error even though the underlying ruleset does not exist + require.NoError(t, nss.Open()) +} + +func TestNamespacesWatchHardErr(t *testing.T) { + _, _, _, opts := testNamespaces() + opts = opts.SetRequireNamespaceWatchOnInit(true) + nss := NewNamespaces(testNamespacesKey, opts).(*namespaces) + // This should hard error with RequireNamespaceWatchOnInit enabled + require.Error(t, nss.Open()) +} + +func TestNamespacesWatchRulesetHardErr(t *testing.T) { + store, _, _, opts := testNamespaces() + opts = opts.SetRequireNamespaceWatchOnInit(true) + nss := NewNamespaces(testNamespacesKey, opts).(*namespaces) + + proto := &rulepb.Namespaces{ + Namespaces: []*rulepb.Namespace{ + { + Name: "fooNs", + Snapshots: []*rulepb.NamespaceSnapshot{ + { + ForRulesetVersion: 1, + Tombstoned: true, + }, + }, + }, + }, + } + _, err := store.SetIfNotExists(testNamespacesKey, proto) + require.NoError(t, err) + + // This should also hard error with RequireNamespaceWatchOnInit enabled, + // because the underlying ruleset does not exist + require.Error(t, nss.Open()) +} + func TestToNamespacesNilValue(t *testing.T) { _, _, nss, _ := testNamespaces() _, err := nss.toNamespaces(nil) @@ -80,10 +142,10 @@ func TestToNamespacesSuccess(t *testing.T) { store, _, nss, _ := testNamespaces() proto := &rulepb.Namespaces{ Namespaces: []*rulepb.Namespace{ - &rulepb.Namespace{ + { Name: "fooNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 1, Tombstoned: true, }, @@ -128,55 +190,55 @@ func TestNamespacesProcess(t *testing.T) { update := &rulepb.Namespaces{ Namespaces: []*rulepb.Namespace{ - &rulepb.Namespace{ + { Name: "fooNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 1, Tombstoned: false, }, - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 2, Tombstoned: false, }, }, }, - &rulepb.Namespace{ + { Name: "barNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 1, Tombstoned: false, }, - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 2, Tombstoned: true, }, }, }, - &rulepb.Namespace{ + { Name: "bazNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 1, Tombstoned: false, }, - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 2, Tombstoned: false, }, }, }, - &rulepb.Namespace{ + { Name: "catNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 3, Tombstoned: true, }, }, }, - &rulepb.Namespace{ + { Name: "mehNs", Snapshots: nil, }, @@ -215,7 +277,7 @@ func TestNamespacesProcess(t *testing.T) { } } -func testNamespaces() (kv.Store, cache.Cache, *namespaces, Options) { +func testNamespaces() (kv.TxnStore, cache.Cache, *namespaces, Options) { store := mem.NewStore() cache := newMemCache() opts := NewOptions(). diff --git a/src/metrics/matcher/options.go b/src/metrics/matcher/options.go index 2c90aea499..208e102cf9 100644 --- a/src/metrics/matcher/options.go +++ b/src/metrics/matcher/options.go @@ -135,22 +135,29 @@ type Options interface { // OnRuleSetUpdatedFn returns the function to be called when a ruleset is updated. OnRuleSetUpdatedFn() OnRuleSetUpdatedFn + + // SetRequireNamespaceWatchOnInit sets the flag to ensure matcher is initialized with a loaded namespace watch. + SetRequireNamespaceWatchOnInit(value bool) Options + + // RequireNamespaceWatchOnInit returns the flag to ensure matcher is initialized with a loaded namespace watch. + RequireNamespaceWatchOnInit() bool } type options struct { - clockOpts clock.Options - instrumentOpts instrument.Options - ruleSetOpts rules.Options - initWatchTimeout time.Duration - kvStore kv.Store - namespacesKey string - ruleSetKeyFn RuleSetKeyFn - namespaceTag []byte - defaultNamespace []byte - matchRangePast time.Duration - onNamespaceAddedFn OnNamespaceAddedFn - onNamespaceRemovedFn OnNamespaceRemovedFn - onRuleSetUpdatedFn OnRuleSetUpdatedFn + clockOpts clock.Options + instrumentOpts instrument.Options + ruleSetOpts rules.Options + initWatchTimeout time.Duration + kvStore kv.Store + namespacesKey string + ruleSetKeyFn RuleSetKeyFn + namespaceTag []byte + defaultNamespace []byte + matchRangePast time.Duration + onNamespaceAddedFn OnNamespaceAddedFn + onNamespaceRemovedFn OnNamespaceRemovedFn + onRuleSetUpdatedFn OnRuleSetUpdatedFn + requireNamespaceWatchOnInit bool } // NewOptions creates a new set of options. @@ -299,6 +306,18 @@ func (o *options) OnRuleSetUpdatedFn() OnRuleSetUpdatedFn { return o.onRuleSetUpdatedFn } +// SetRequireNamespaceWatchOnInit sets the flag to ensure matcher is initialized with a loaded namespace watch. +func (o *options) SetRequireNamespaceWatchOnInit(value bool) Options { + opts := *o + opts.requireNamespaceWatchOnInit = value + return &opts +} + +// RequireNamespaceWatchOnInit returns the flag to ensure matcher is initialized with a loaded namespace watch. +func (o *options) RequireNamespaceWatchOnInit() bool { + return o.requireNamespaceWatchOnInit +} + func defaultRuleSetKeyFn(namespace []byte) string { return fmt.Sprintf(defaultRuleSetKeyFormat, namespace) }