Skip to content

Commit

Permalink
[matcher] [coordinator] Add RequireNamespaceWatchOnInit option (#3468)
Browse files Browse the repository at this point in the history
* [matcher] [coordinator] Add RequireNamespaceWatchOnInit option

This option on the matcher will require that namespace
values and corresponding rulesets are loaded
in the matcher as part of startup.

This is useful in ensuring dynamic rules are loaded
before we start ingesting metrics.

* Set RequireNamespaceWatchOnInit in downsampler config
  • Loading branch information
wesleyk authored May 6, 2021
1 parent df9bb93 commit b770ecf
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 47 deletions.
6 changes: 5 additions & 1 deletion src/cmd/services/m3coordinator/downsample/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ type MatcherConfiguration struct {
// NamespaceTag defines the namespace tag to use to select rules
// namespace to evaluate against. Default is "__m3_namespace__".
NamespaceTag string `yaml:"namespaceTag"`
// RequireNamespaceWatchOnInit returns the flag to ensure matcher is initialized with a loaded namespace watch.
// This only makes sense to use if the corresponding namespace / ruleset values are properly seeded.
RequireNamespaceWatchOnInit bool `yaml:"requireNamespaceWatchOnInit"`
}

// MatcherCacheConfiguration is the configuration for the rule matcher cache.
Expand Down Expand Up @@ -710,7 +713,8 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
SetInstrumentOptions(instrumentOpts).
SetRuleSetOptions(ruleSetOpts).
SetKVStore(o.RulesKVStore).
SetNamespaceTag([]byte(namespaceTag))
SetNamespaceTag([]byte(namespaceTag)).
SetRequireNamespaceWatchOnInit(cfg.Matcher.RequireNamespaceWatchOnInit)

// NB(r): If rules are being explicitly set in config then we are
// going to use an in memory KV store for rules and explicitly set them up.
Expand Down
57 changes: 41 additions & 16 deletions src/metrics/matcher/namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/m3db/m3/src/metrics/metric"
"github.com/m3db/m3/src/metrics/rules"
"github.com/m3db/m3/src/x/clock"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/watch"

"github.com/uber-go/tally"
Expand Down Expand Up @@ -112,27 +113,29 @@ type namespaces struct {
onNamespaceAddedFn OnNamespaceAddedFn
onNamespaceRemovedFn OnNamespaceRemovedFn

proto *rulepb.Namespaces
rules *namespaceRuleSetsMap
metrics namespacesMetrics
proto *rulepb.Namespaces
rules *namespaceRuleSetsMap
metrics namespacesMetrics
requireNamespaceWatchOnInit bool
}

// NewNamespaces creates a new namespaces object.
func NewNamespaces(key string, opts Options) Namespaces {
instrumentOpts := opts.InstrumentOptions()
n := &namespaces{
key: key,
store: opts.KVStore(),
opts: opts,
nowFn: opts.ClockOptions().NowFn(),
log: instrumentOpts.Logger(),
ruleSetKeyFn: opts.RuleSetKeyFn(),
matchRangePast: opts.MatchRangePast(),
onNamespaceAddedFn: opts.OnNamespaceAddedFn(),
onNamespaceRemovedFn: opts.OnNamespaceRemovedFn(),
proto: &rulepb.Namespaces{},
rules: newNamespaceRuleSetsMap(namespaceRuleSetsMapOptions{}),
metrics: newNamespacesMetrics(instrumentOpts.MetricsScope()),
key: key,
store: opts.KVStore(),
opts: opts,
nowFn: opts.ClockOptions().NowFn(),
log: instrumentOpts.Logger(),
ruleSetKeyFn: opts.RuleSetKeyFn(),
matchRangePast: opts.MatchRangePast(),
onNamespaceAddedFn: opts.OnNamespaceAddedFn(),
onNamespaceRemovedFn: opts.OnNamespaceRemovedFn(),
proto: &rulepb.Namespaces{},
rules: newNamespaceRuleSetsMap(namespaceRuleSetsMapOptions{}),
metrics: newNamespacesMetrics(instrumentOpts.MetricsScope()),
requireNamespaceWatchOnInit: opts.RequireNamespaceWatchOnInit(),
}
valueOpts := runtime.NewOptions().
SetInstrumentOptions(instrumentOpts).
Expand Down Expand Up @@ -160,10 +163,15 @@ func (n *namespaces) Open() error {
// to be more resilient to error conditions preventing process
// from starting up.
n.metrics.initWatchErrors.Inc(1)
if n.requireNamespaceWatchOnInit {
return err
}

n.opts.InstrumentOptions().Logger().With(
zap.String("key", n.key),
zap.Error(err),
).Error("error initializing namespaces values, retrying in the background")

return nil
}

Expand Down Expand Up @@ -255,7 +263,12 @@ func (n *namespaces) process(value interface{}) error {
n.Lock()
defer n.Unlock()

var watchWg sync.WaitGroup
var (
watchWg sync.WaitGroup
multiErr xerrors.MultiError
errLock sync.Mutex
)

for _, entry := range incoming.Iter() {
namespace, elem := entry.Key(), rules.Namespace(entry.Value())
nsName, snapshots := elem.Name(), elem.Snapshots()
Expand Down Expand Up @@ -302,6 +315,13 @@ func (n *namespaces) process(value interface{}) error {
n.log.Error("failed to watch ruleset updates",
zap.String("ruleSetKey", ruleSet.Key()),
zap.Error(err))

// Track errors if we explicitly want to ensure watches succeed.
if n.requireNamespaceWatchOnInit {
errLock.Lock()
multiErr = multiErr.Add(err)
errLock.Unlock()
}
}
}()
}
Expand All @@ -312,6 +332,11 @@ func (n *namespaces) process(value interface{}) error {
}

watchWg.Wait()

if !multiErr.Empty() {
return multiErr.FinalError()
}

for _, entry := range n.rules.Iter() {
namespace, ruleSet := entry.Key(), entry.Value()
_, exists := incoming.Get(namespace)
Expand Down
96 changes: 79 additions & 17 deletions src/metrics/matcher/namespaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ func TestNamespacesWatchAndClose(t *testing.T) {
store, _, nss, _ := testNamespaces()
proto := &rulepb.Namespaces{
Namespaces: []*rulepb.Namespace{
&rulepb.Namespace{
{
Name: "fooNs",
Snapshots: []*rulepb.NamespaceSnapshot{
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 1,
Tombstoned: true,
},
Expand All @@ -64,6 +64,68 @@ func TestNamespacesWatchAndClose(t *testing.T) {
nss.Close()
}

func TestNamespacesWatchSoftErr(t *testing.T) {
_, _, nss, _ := testNamespaces()
// No value set, so this will soft error
require.NoError(t, nss.Open())
}

func TestNamespacesWatchRulesetSoftErr(t *testing.T) {
store, _, nss, _ := testNamespaces()
proto := &rulepb.Namespaces{
Namespaces: []*rulepb.Namespace{
{
Name: "fooNs",
Snapshots: []*rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 1,
Tombstoned: true,
},
},
},
},
}
_, err := store.SetIfNotExists(testNamespacesKey, proto)
require.NoError(t, err)

// This should also soft error even though the underlying ruleset does not exist
require.NoError(t, nss.Open())
}

func TestNamespacesWatchHardErr(t *testing.T) {
_, _, _, opts := testNamespaces()
opts = opts.SetRequireNamespaceWatchOnInit(true)
nss := NewNamespaces(testNamespacesKey, opts).(*namespaces)
// This should hard error with RequireNamespaceWatchOnInit enabled
require.Error(t, nss.Open())
}

func TestNamespacesWatchRulesetHardErr(t *testing.T) {
store, _, _, opts := testNamespaces()
opts = opts.SetRequireNamespaceWatchOnInit(true)
nss := NewNamespaces(testNamespacesKey, opts).(*namespaces)

proto := &rulepb.Namespaces{
Namespaces: []*rulepb.Namespace{
{
Name: "fooNs",
Snapshots: []*rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 1,
Tombstoned: true,
},
},
},
},
}
_, err := store.SetIfNotExists(testNamespacesKey, proto)
require.NoError(t, err)

// This should also hard error with RequireNamespaceWatchOnInit enabled,
// because the underlying ruleset does not exist
require.Error(t, nss.Open())
}

func TestToNamespacesNilValue(t *testing.T) {
_, _, nss, _ := testNamespaces()
_, err := nss.toNamespaces(nil)
Expand All @@ -80,10 +142,10 @@ func TestToNamespacesSuccess(t *testing.T) {
store, _, nss, _ := testNamespaces()
proto := &rulepb.Namespaces{
Namespaces: []*rulepb.Namespace{
&rulepb.Namespace{
{
Name: "fooNs",
Snapshots: []*rulepb.NamespaceSnapshot{
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 1,
Tombstoned: true,
},
Expand Down Expand Up @@ -128,55 +190,55 @@ func TestNamespacesProcess(t *testing.T) {

update := &rulepb.Namespaces{
Namespaces: []*rulepb.Namespace{
&rulepb.Namespace{
{
Name: "fooNs",
Snapshots: []*rulepb.NamespaceSnapshot{
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 1,
Tombstoned: false,
},
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 2,
Tombstoned: false,
},
},
},
&rulepb.Namespace{
{
Name: "barNs",
Snapshots: []*rulepb.NamespaceSnapshot{
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 1,
Tombstoned: false,
},
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 2,
Tombstoned: true,
},
},
},
&rulepb.Namespace{
{
Name: "bazNs",
Snapshots: []*rulepb.NamespaceSnapshot{
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 1,
Tombstoned: false,
},
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 2,
Tombstoned: false,
},
},
},
&rulepb.Namespace{
{
Name: "catNs",
Snapshots: []*rulepb.NamespaceSnapshot{
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 3,
Tombstoned: true,
},
},
},
&rulepb.Namespace{
{
Name: "mehNs",
Snapshots: nil,
},
Expand Down Expand Up @@ -215,7 +277,7 @@ func TestNamespacesProcess(t *testing.T) {
}
}

func testNamespaces() (kv.Store, cache.Cache, *namespaces, Options) {
func testNamespaces() (kv.TxnStore, cache.Cache, *namespaces, Options) {
store := mem.NewStore()
cache := newMemCache()
opts := NewOptions().
Expand Down
45 changes: 32 additions & 13 deletions src/metrics/matcher/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,22 +135,29 @@ type Options interface {

// OnRuleSetUpdatedFn returns the function to be called when a ruleset is updated.
OnRuleSetUpdatedFn() OnRuleSetUpdatedFn

// SetRequireNamespaceWatchOnInit sets the flag to ensure matcher is initialized with a loaded namespace watch.
SetRequireNamespaceWatchOnInit(value bool) Options

// RequireNamespaceWatchOnInit returns the flag to ensure matcher is initialized with a loaded namespace watch.
RequireNamespaceWatchOnInit() bool
}

type options struct {
clockOpts clock.Options
instrumentOpts instrument.Options
ruleSetOpts rules.Options
initWatchTimeout time.Duration
kvStore kv.Store
namespacesKey string
ruleSetKeyFn RuleSetKeyFn
namespaceTag []byte
defaultNamespace []byte
matchRangePast time.Duration
onNamespaceAddedFn OnNamespaceAddedFn
onNamespaceRemovedFn OnNamespaceRemovedFn
onRuleSetUpdatedFn OnRuleSetUpdatedFn
clockOpts clock.Options
instrumentOpts instrument.Options
ruleSetOpts rules.Options
initWatchTimeout time.Duration
kvStore kv.Store
namespacesKey string
ruleSetKeyFn RuleSetKeyFn
namespaceTag []byte
defaultNamespace []byte
matchRangePast time.Duration
onNamespaceAddedFn OnNamespaceAddedFn
onNamespaceRemovedFn OnNamespaceRemovedFn
onRuleSetUpdatedFn OnRuleSetUpdatedFn
requireNamespaceWatchOnInit bool
}

// NewOptions creates a new set of options.
Expand Down Expand Up @@ -299,6 +306,18 @@ func (o *options) OnRuleSetUpdatedFn() OnRuleSetUpdatedFn {
return o.onRuleSetUpdatedFn
}

// SetRequireNamespaceWatchOnInit sets the flag to ensure matcher is initialized with a loaded namespace watch.
func (o *options) SetRequireNamespaceWatchOnInit(value bool) Options {
opts := *o
opts.requireNamespaceWatchOnInit = value
return &opts
}

// RequireNamespaceWatchOnInit returns the flag to ensure matcher is initialized with a loaded namespace watch.
func (o *options) RequireNamespaceWatchOnInit() bool {
return o.requireNamespaceWatchOnInit
}

func defaultRuleSetKeyFn(namespace []byte) string {
return fmt.Sprintf(defaultRuleSetKeyFormat, namespace)
}

0 comments on commit b770ecf

Please sign in to comment.