-
Notifications
You must be signed in to change notification settings - Fork 453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[coordinator] Enable running M3 coordinator without Etcd #3814
Changes from 11 commits
5eadad3
ad1e8d5
ecce04a
878c427
59bfd14
e881014
20786e3
26a55eb
ec2caa9
f668cd1
6ee8397
55093a4
39ccce7
5bfd9fa
2be0162
f92b489
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -31,6 +31,7 @@ import ( | |||||
|
||||||
"github.com/m3db/m3/src/aggregator/client" | ||||||
clusterclient "github.com/m3db/m3/src/cluster/client" | ||||||
"github.com/m3db/m3/src/cluster/kv" | ||||||
"github.com/m3db/m3/src/cluster/kv/mem" | ||||||
dbclient "github.com/m3db/m3/src/dbnode/client" | ||||||
"github.com/m3db/m3/src/metrics/aggregation" | ||||||
|
@@ -2841,6 +2842,51 @@ func TestDownsamplerWithOverrideNamespace(t *testing.T) { | |||||
testDownsamplerAggregation(t, testDownsampler) | ||||||
} | ||||||
|
||||||
func TestSafeguardInProcessDownsampler(t *testing.T) { | ||||||
ctrl := gomock.NewController(t) | ||||||
defer ctrl.Finish() | ||||||
store := kv.NewMockStore(ctrl) | ||||||
store.EXPECT().SetIfNotExists(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes() | ||||||
_ = newTestDownsampler(t, testDownsamplerOptions{ | ||||||
remoteClientMock: nil, | ||||||
expectConstructError: "other store then in memory can yield unexpected side effects", | ||||||
kvStore: store, | ||||||
}) | ||||||
} | ||||||
|
||||||
func TestDownsamplerNamespacesEtcdInit(t *testing.T) { | ||||||
t.Run("does not reset namespaces key", func(t *testing.T) { | ||||||
store := mem.NewStore() | ||||||
initialNamespaces := rulepb.Namespaces{Namespaces: []*rulepb.Namespace{{Name: "testNamespace"}}} | ||||||
_, err := store.Set(matcher.NewOptions().NamespacesKey(), &initialNamespaces) | ||||||
require.NoError(t, err) | ||||||
|
||||||
_ = newTestDownsampler(t, testDownsamplerOptions{kvStore: store}) | ||||||
|
||||||
assert.Equal(t, initialNamespaces, readNamespacesKey(t, store), 1) | ||||||
}) | ||||||
|
||||||
t.Run("initializes namespace key", func(t *testing.T) { | ||||||
store := mem.NewStore() | ||||||
|
||||||
_ = newTestDownsampler(t, testDownsamplerOptions{kvStore: store}) | ||||||
|
||||||
ns := readNamespacesKey(t, store) | ||||||
require.NotNil(t, ns) | ||||||
assert.Len(t, ns.Namespaces, 0) | ||||||
}) | ||||||
|
||||||
t.Run("do not initialize namespaces when RequireNamespaceWatchOnInit is true", func(t *testing.T) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit for consistency:
Suggested change
|
||||||
store := mem.NewStore() | ||||||
|
||||||
matcherConfig := MatcherConfiguration{RequireNamespaceWatchOnInit: true} | ||||||
_ = newTestDownsampler(t, testDownsamplerOptions{kvStore: store, matcherConfig: matcherConfig}) | ||||||
|
||||||
_, err := store.Get(matcher.NewOptions().NamespacesKey()) | ||||||
require.Error(t, err) | ||||||
}) | ||||||
} | ||||||
|
||||||
func originalStagedMetadata(t *testing.T, testDownsampler testDownsampler) []metricpb.StagedMetadatas { | ||||||
ds, ok := testDownsampler.downsampler.(*downsampler) | ||||||
require.True(t, ok) | ||||||
|
@@ -3426,8 +3472,11 @@ type testDownsamplerOptions struct { | |||||
matcherConfig MatcherConfiguration | ||||||
|
||||||
// Test ingest and expectations overrides | ||||||
ingest *testDownsamplerOptionsIngest | ||||||
expect *testDownsamplerOptionsExpect | ||||||
ingest *testDownsamplerOptionsIngest | ||||||
expect *testDownsamplerOptionsExpect | ||||||
expectConstructError string | ||||||
|
||||||
kvStore kv.Store | ||||||
} | ||||||
|
||||||
type testDownsamplerOptionsIngest struct { | ||||||
|
@@ -3510,9 +3559,15 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl | |||||
cfg.Matcher = opts.matcherConfig | ||||||
cfg.UntimedRollups = opts.untimedRollups | ||||||
|
||||||
clusterClient := clusterclient.NewMockClient(gomock.NewController(t)) | ||||||
kvStore := opts.kvStore | ||||||
if kvStore == nil { | ||||||
kvStore = mem.NewStore() | ||||||
} | ||||||
clusterClient.EXPECT().KV().Return(kvStore, nil).AnyTimes() | ||||||
instance, err := cfg.NewDownsampler(DownsamplerOptions{ | ||||||
Storage: storage, | ||||||
ClusterClient: clusterclient.NewMockClient(gomock.NewController(t)), | ||||||
ClusterClient: clusterClient, | ||||||
RulesKVStore: rulesKVStore, | ||||||
ClusterNamespacesWatcher: m3.NewClusterNamespacesWatcher(), | ||||||
ClockOptions: clockOpts, | ||||||
|
@@ -3525,6 +3580,11 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl | |||||
RWOptions: xio.NewOptions(), | ||||||
TagOptions: models.NewTagOptions(), | ||||||
}) | ||||||
if opts.expectConstructError != "" { | ||||||
require.Error(t, err) | ||||||
assert.Contains(t, err.Error(), opts.expectConstructError) | ||||||
return testDownsampler{} | ||||||
} | ||||||
require.NoError(t, err) | ||||||
|
||||||
if len(opts.autoMappingRules) > 0 { | ||||||
|
@@ -3617,3 +3677,13 @@ func findWrites( | |||||
func testUpdateMetadata() rules.UpdateMetadata { | ||||||
return rules.NewRuleSetUpdateHelper(0).NewUpdateMetadata(time.Now().UnixNano(), "test") | ||||||
} | ||||||
|
||||||
func readNamespacesKey(t *testing.T, store kv.Store) rulepb.Namespaces { | ||||||
v, err := store.Get(matcher.NewOptions().NamespacesKey()) | ||||||
require.NoError(t, err) | ||||||
var ns rulepb.Namespaces | ||||||
err = v.Unmarshal(&ns) | ||||||
require.NoError(t, err) | ||||||
require.NotNil(t, ns) | ||||||
return ns | ||||||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -737,7 +737,7 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { | |||||||||||||||
kvTxnMemStore := mem.NewStore() | ||||||||||||||||
|
||||||||||||||||
// Initialize the namespaces | ||||||||||||||||
_, err := kvTxnMemStore.Set(matcherOpts.NamespacesKey(), &rulepb.Namespaces{}) | ||||||||||||||||
err := initStoreNamespaces(kvTxnMemStore, matcherOpts.NamespacesKey()) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return agg{}, err | ||||||||||||||||
} | ||||||||||||||||
|
@@ -805,6 +805,21 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { | |||||||||||||||
matcherCacheCapacity = *v | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
kvStore, err := o.ClusterClient.KV() | ||||||||||||||||
if err != nil { | ||||||||||||||||
return agg{}, err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// NB(antanas): matcher registers watcher on namespaces key. Making sure it is set, otherwise watcher times out. | ||||||||||||||||
// With RequireNamespaceWatchOnInit being true we expect namespaces to be set upfront | ||||||||||||||||
// so we do not initialize them here at all because it might potentially hide human error. | ||||||||||||||||
if !matcherOpts.RequireNamespaceWatchOnInit() { | ||||||||||||||||
err = initStoreNamespaces(kvStore, matcherOpts.NamespacesKey()) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return agg{}, err | ||||||||||||||||
} | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit:
Suggested change
|
||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
matcher, err := o.newAggregatorMatcher(matcherOpts, matcherCacheCapacity) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return agg{}, err | ||||||||||||||||
|
@@ -838,13 +853,18 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { | |||||||||||||||
}, nil | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// NB(antanas): to protect against running with real Etcd and overriding existing placements. | ||||||||||||||||
if !mem.IsMem(kvStore) { | ||||||||||||||||
return agg{}, errors.New("running in process downsampler with other store " + | ||||||||||||||||
"then in memory can yield unexpected side effects") | ||||||||||||||||
} | ||||||||||||||||
localKVStore := kvStore | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is good protection, great 👍 |
||||||||||||||||
|
||||||||||||||||
serviceID := services.NewServiceID(). | ||||||||||||||||
SetEnvironment("production"). | ||||||||||||||||
SetName("downsampler"). | ||||||||||||||||
SetZone("embedded") | ||||||||||||||||
|
||||||||||||||||
localKVStore := mem.NewStore() | ||||||||||||||||
|
||||||||||||||||
placementManager, err := o.newAggregatorPlacementManager(serviceID, localKVStore) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return agg{}, err | ||||||||||||||||
|
@@ -978,6 +998,14 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { | |||||||||||||||
}, nil | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func initStoreNamespaces(store kv.Store, nsKey string) error { | ||||||||||||||||
_, err := store.SetIfNotExists(nsKey, &rulepb.Namespaces{}) | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be a subject to a race condition "set if not exist" when multiple writes might be happening to the store? It's probably a corner case and might be an issue only when Etcd is in uninitialized state. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You'll get back a |
||||||||||||||||
if errors.Is(err, kv.ErrAlreadyExists) { | ||||||||||||||||
return nil | ||||||||||||||||
} | ||||||||||||||||
return err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
type aggPools struct { | ||||||||||||||||
tagEncoderPool serialize.TagEncoderPool | ||||||||||||||||
tagDecoderPool serialize.TagDecoderPool | ||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,7 +36,6 @@ import ( | |
"github.com/m3db/m3/src/cmd/services/m3query/config" | ||
"github.com/m3db/m3/src/dbnode/client" | ||
"github.com/m3db/m3/src/metrics/generated/proto/metricpb" | ||
"github.com/m3db/m3/src/metrics/generated/proto/rulepb" | ||
"github.com/m3db/m3/src/metrics/policy" | ||
"github.com/m3db/m3/src/msg/generated/proto/msgpb" | ||
m3msgproto "github.com/m3db/m3/src/msg/protocol/proto" | ||
|
@@ -538,14 +537,15 @@ func runServer(t *testing.T, opts runServerOpts) (string, closeFn) { | |
doneCh = make(chan struct{}) | ||
listenerCh = make(chan net.Listener, 1) | ||
clusterClient = clusterclient.NewMockClient(opts.ctrl) | ||
clusterClientCh = make(chan clusterclient.Client, 1) | ||
clusterClientCh chan clusterclient.Client | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the purpose of this channel? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a away to pass in a mocked instance for cases when Etcd is required. Probably not a best thing to do but thats how it was before. With my change I just added case when no Etcd is needed I pass this channel as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, didn't notice it was there before the change, thought you had introduced it. |
||
) | ||
|
||
store := mem.NewStore() | ||
_, err := store.Set("/namespaces", &rulepb.Namespaces{}) | ||
require.NoError(t, err) | ||
clusterClient.EXPECT().KV().Return(store, nil).MaxTimes(1) | ||
clusterClientCh <- clusterClient | ||
if len(opts.cfg.Clusters) > 0 || opts.cfg.ClusterManagement.Etcd != nil { | ||
clusterClientCh = make(chan clusterclient.Client, 1) | ||
store := mem.NewStore() | ||
clusterClient.EXPECT().KV().Return(store, nil).MaxTimes(2) | ||
clusterClientCh <- clusterClient | ||
} | ||
|
||
go func() { | ||
r := Run(RunOptions{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be cleaner to have separate test methods instead of all those
t.Run
? Or was your goal to have nice test descriptions? I wishtesting.T
simply had some kind ofSetName
for this.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to group those namespace init related tests. This test file is 3k lines long. So different test methods can just get lost.