From 6520b3a0c3154939dfe3153b103930c80c644fee Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Sat, 29 Sep 2018 18:03:42 -0400 Subject: [PATCH 01/10] Dont clone IDs in client session if IsNoFinalize() --- src/dbnode/client/session.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index 462affb737..cf77b82f91 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -972,8 +972,8 @@ func (s *session) writeAttemptWithRLock( // use in the various queues. Tracking per writeAttempt isn't sufficient as // we may enqueue multiple writeStates concurrently depending on retries // and consistency level checks. - nsID := s.pools.id.Clone(namespace) - tsID := s.pools.id.Clone(id) + nsID := s.maybeClone(namespace) + tsID := s.maybeClone(id) var tagEncoder serialize.TagEncoder if wType == taggedWriteAttemptType { tagEncoder = s.pools.tagEncoder.Get() @@ -1191,7 +1191,7 @@ func (s *session) fetchTaggedAttemptWithRLock( ) (*fetchState, error) { // NB(prateek): we have to clone the namespace, as we cannot guarantee the lifecycle // of the hostQueues responding is less than the lifecycle of the current method. - nsClone := s.pools.id.Clone(ns) + nsClone := s.maybeClone(ns) // FOLLOWUP(prateek): currently both `index.Query` and the returned request depend on // native, un-pooled types; so we do not Clone() either. We will start doing so @@ -1261,7 +1261,7 @@ func (s *session) fetchIDsAttempt( // NB(prateek): need to make a copy of inputNamespace and inputIDs to control // their life-cycle within this function. - namespace := s.pools.id.Clone(inputNamespace) + namespace := s.maybeClone(inputNamespace) // First, we duplicate the iterator (only the struct referencing the underlying slice, // not the slice itself). Need this to be able to iterate the original iterator // multiple times in case of retries. @@ -1315,7 +1315,7 @@ func (s *session) fetchIDsAttempt( for idx := 0; ids.Next(); idx++ { var ( idx = idx // capture loop variable - tsID = s.pools.id.Clone(ids.Current()) + tsID = s.maybeClone(ids.Current()) wgIsDone int32 // NB(xichen): resultsAccessors and idAccessors get initialized to number of replicas + 1 @@ -1367,8 +1367,8 @@ func (s *session) fetchIDsAttempt( // to have control over the lifecycle of ID. We cannot allow seriesIterator // to control the lifecycle of the original ident.ID, as it might still be in use // due to a pending request in queue. - seriesID := s.pools.id.Clone(tsID) - namespaceID := s.pools.id.Clone(namespace) + seriesID := s.maybeClone(tsID) + namespaceID := s.maybeClone(namespace) iter.Reset(encoding.SeriesIteratorOptions{ ID: seriesID, Namespace: namespaceID, @@ -3024,6 +3024,13 @@ func (s *session) verifyFetchedBlock(block *rpc.Block) error { return nil } +func (s *session) maybeClone(id ident.ID) ident.ID { + if id.IsNoFinalize() { + return id + } + return s.pools.id.Clone(id) +} + type reason int const ( From 32a4de25010c029fe9c761549f59272857e4e9fb Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Sat, 29 Sep 2018 18:08:12 -0400 Subject: [PATCH 02/10] Rename maybeClone to cloneFinalizable --- src/dbnode/client/session.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index cf77b82f91..fa25c7ff0c 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -972,8 +972,8 @@ func (s *session) writeAttemptWithRLock( // use in the various queues. Tracking per writeAttempt isn't sufficient as // we may enqueue multiple writeStates concurrently depending on retries // and consistency level checks. - nsID := s.maybeClone(namespace) - tsID := s.maybeClone(id) + nsID := s.cloneFinalizable(namespace) + tsID := s.cloneFinalizable(id) var tagEncoder serialize.TagEncoder if wType == taggedWriteAttemptType { tagEncoder = s.pools.tagEncoder.Get() @@ -1191,7 +1191,7 @@ func (s *session) fetchTaggedAttemptWithRLock( ) (*fetchState, error) { // NB(prateek): we have to clone the namespace, as we cannot guarantee the lifecycle // of the hostQueues responding is less than the lifecycle of the current method. - nsClone := s.maybeClone(ns) + nsClone := s.cloneFinalizable(ns) // FOLLOWUP(prateek): currently both `index.Query` and the returned request depend on // native, un-pooled types; so we do not Clone() either. We will start doing so @@ -1261,7 +1261,7 @@ func (s *session) fetchIDsAttempt( // NB(prateek): need to make a copy of inputNamespace and inputIDs to control // their life-cycle within this function. - namespace := s.maybeClone(inputNamespace) + namespace := s.cloneFinalizable(inputNamespace) // First, we duplicate the iterator (only the struct referencing the underlying slice, // not the slice itself). Need this to be able to iterate the original iterator // multiple times in case of retries. @@ -1315,7 +1315,7 @@ func (s *session) fetchIDsAttempt( for idx := 0; ids.Next(); idx++ { var ( idx = idx // capture loop variable - tsID = s.maybeClone(ids.Current()) + tsID = s.cloneFinalizable(ids.Current()) wgIsDone int32 // NB(xichen): resultsAccessors and idAccessors get initialized to number of replicas + 1 @@ -1367,8 +1367,8 @@ func (s *session) fetchIDsAttempt( // to have control over the lifecycle of ID. We cannot allow seriesIterator // to control the lifecycle of the original ident.ID, as it might still be in use // due to a pending request in queue. - seriesID := s.maybeClone(tsID) - namespaceID := s.maybeClone(namespace) + seriesID := s.cloneFinalizable(tsID) + namespaceID := s.cloneFinalizable(namespace) iter.Reset(encoding.SeriesIteratorOptions{ ID: seriesID, Namespace: namespaceID, @@ -3024,7 +3024,7 @@ func (s *session) verifyFetchedBlock(block *rpc.Block) error { return nil } -func (s *session) maybeClone(id ident.ID) ident.ID { +func (s *session) cloneFinalizable(id ident.ID) ident.ID { if id.IsNoFinalize() { return id } From 63c20330cab637902c98eeda1b14248858dd071d Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Sat, 29 Sep 2018 18:11:21 -0400 Subject: [PATCH 03/10] Only add cloneFinalizable to write path --- src/dbnode/client/session.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index fa25c7ff0c..f2f084b6da 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -1191,7 +1191,7 @@ func (s *session) fetchTaggedAttemptWithRLock( ) (*fetchState, error) { // NB(prateek): we have to clone the namespace, as we cannot guarantee the lifecycle // of the hostQueues responding is less than the lifecycle of the current method. - nsClone := s.cloneFinalizable(ns) + nsClone := s.pools.id.Clone(ns) // FOLLOWUP(prateek): currently both `index.Query` and the returned request depend on // native, un-pooled types; so we do not Clone() either. We will start doing so @@ -1261,7 +1261,7 @@ func (s *session) fetchIDsAttempt( // NB(prateek): need to make a copy of inputNamespace and inputIDs to control // their life-cycle within this function. - namespace := s.cloneFinalizable(inputNamespace) + namespace := s.pools.id.Clone(inputNamespace) // First, we duplicate the iterator (only the struct referencing the underlying slice, // not the slice itself). Need this to be able to iterate the original iterator // multiple times in case of retries. @@ -1315,7 +1315,7 @@ func (s *session) fetchIDsAttempt( for idx := 0; ids.Next(); idx++ { var ( idx = idx // capture loop variable - tsID = s.cloneFinalizable(ids.Current()) + tsID = s.pools.id.Clone(ids.Current()) wgIsDone int32 // NB(xichen): resultsAccessors and idAccessors get initialized to number of replicas + 1 @@ -1367,8 +1367,8 @@ func (s *session) fetchIDsAttempt( // to have control over the lifecycle of ID. We cannot allow seriesIterator // to control the lifecycle of the original ident.ID, as it might still be in use // due to a pending request in queue. - seriesID := s.cloneFinalizable(tsID) - namespaceID := s.cloneFinalizable(namespace) + seriesID := s.pools.id.Clone(tsID) + namespaceID := s.pools.id.Clone(namespace) iter.Reset(encoding.SeriesIteratorOptions{ ID: seriesID, Namespace: namespaceID, From f0817bc8eb75e834e8cbbded5e27cedbd1180a5f Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Sat, 29 Sep 2018 18:57:23 -0400 Subject: [PATCH 04/10] add test --- src/dbnode/client/session_write_test.go | 43 +++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/src/dbnode/client/session_write_test.go b/src/dbnode/client/session_write_test.go index 675d0a65f9..ccc962cb60 100644 --- a/src/dbnode/client/session_write_test.go +++ b/src/dbnode/client/session_write_test.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" "github.com/m3db/m3/src/dbnode/topology" xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" + xtest "github.com/m3db/m3/src/x/test" xerrors "github.com/m3db/m3x/errors" "github.com/m3db/m3x/ident" xretry "github.com/m3db/m3x/retry" @@ -101,6 +102,48 @@ func TestSessionWrite(t *testing.T) { assert.NoError(t, session.Close()) } +func TestSessionWriteDoesNotCloneNoFinalize(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + session := newDefaultTestSession(t).(*session) + + w := newWriteStub() + var completionFn completionFn + enqueueWg := mockHostQueues(ctrl, session, sessionTestReplicas, []testEnqueueFn{func(idx int, op op) { + completionFn = op.CompletionFn() + write, ok := op.(*writeOperation) + assert.True(t, ok) + assert.True(t, + xtest.ByteSlicesBackedBySameData( + w.id.Bytes(), + write.request.ID)) + assert.Equal(t, w.value, write.request.Datapoint.Value) + assert.Equal(t, w.t.Unix(), write.request.Datapoint.Timestamp) + assert.Equal(t, rpc.TimeType_UNIX_SECONDS, write.request.Datapoint.TimestampTimeType) + assert.NotNil(t, write.completionFn) + }}) + + assert.NoError(t, session.Open()) + + // Begin write + var resultErr error + var writeWg sync.WaitGroup + writeWg.Add(1) + go func() { + resultErr = session.Write(w.ns, w.id, w.t, w.value, w.unit, w.annotation) + writeWg.Done() + }() + + // Callback + enqueueWg.Wait() + for i := 0; i < session.state.topoMap.Replicas(); i++ { + completionFn(session.state.topoMap.Hosts()[0], nil) + } + + assert.NoError(t, session.Close()) +} + func TestSessionWriteBadUnitErr(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() From b5d3e6f29adaf42d00403bb3bc1c2f5836e45a6a Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Sat, 29 Sep 2018 18:58:52 -0400 Subject: [PATCH 05/10] Update deps --- glide.lock | 6 +++--- glide.yaml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/glide.lock b/glide.lock index b31b41624d..075ba16358 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 5cf709f4cd0810365b19227ef1ae322dc36893f68f87b80d2bbb34f44e5b8c01 -updated: 2018-09-29T17:54:45.564757-04:00 +hash: 819e53ce790007654311d180da9a52c45e28be074eefd608adf2e6cbfc67d544 +updated: 2018-09-29T18:58:44.951247-04:00 imports: - name: github.com/apache/thrift version: c2fb1c4e8c931d22617bebb0bf388cb4d5e6fcff @@ -312,7 +312,7 @@ imports: - protocol/proto - topic - name: github.com/m3db/m3x - version: 3ab0cd67ff3c6e63a40893bad1ec4d993b5422c2 + version: 1bcb37ae8e67118c1d99ac31098a683a3a46a68f vcs: git subpackages: - checked diff --git a/glide.yaml b/glide.yaml index 14a59b06e4..992b5d619c 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,7 +1,7 @@ package: github.com/m3db/m3 import: - package: github.com/m3db/m3x - version: 3ab0cd67ff3c6e63a40893bad1ec4d993b5422c2 + version: 1bcb37ae8e67118c1d99ac31098a683a3a46a68f vcs: git subpackages: - checked From e5977ae10bb330bbacb8e7906199ae8680117554 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Sat, 29 Sep 2018 19:06:02 -0400 Subject: [PATCH 06/10] fix tests --- .../client/session_write_tagged_test.go | 57 +++++++++++++++++++ src/dbnode/client/session_write_test.go | 19 ++++--- 2 files changed, 67 insertions(+), 9 deletions(-) diff --git a/src/dbnode/client/session_write_tagged_test.go b/src/dbnode/client/session_write_tagged_test.go index 5970ffe7ab..c38755074b 100644 --- a/src/dbnode/client/session_write_tagged_test.go +++ b/src/dbnode/client/session_write_tagged_test.go @@ -33,6 +33,7 @@ import ( "github.com/m3db/m3/src/dbnode/serialize" "github.com/m3db/m3/src/dbnode/topology" xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" + xm3test "github.com/m3db/m3/src/x/test" "github.com/m3db/m3x/checked" xerrors "github.com/m3db/m3x/errors" "github.com/m3db/m3x/ident" @@ -157,6 +158,62 @@ func TestSessionWriteTagged(t *testing.T) { assert.NoError(t, session.Close()) } +func TestSessionWriteTaggedDoesNotCloneNoFinalize(t *testing.T) { + ctrl := gomock.NewController(xtest.Reporter{t}) + defer ctrl.Finish() + + w := newWriteTaggedStub() + session := newDefaultTestSession(t).(*session) + mockEncoder := serialize.NewMockTagEncoder(ctrl) + mockEncoderPool := serialize.NewMockTagEncoderPool(ctrl) + session.pools.tagEncoder = mockEncoderPool + + gomock.InOrder( + mockEncoderPool.EXPECT().Get().Return(mockEncoder), + mockEncoder.EXPECT().Encode(gomock.Any()).Return(nil), + mockEncoder.EXPECT().Data().Return(testEncodeTags(w.tags), true), + mockEncoder.EXPECT().Finalize(), + ) + + var completionFn completionFn + enqueueWg := mockHostQueues(ctrl, session, sessionTestReplicas, []testEnqueueFn{func(idx int, op op) { + completionFn = op.CompletionFn() + write, ok := op.(*writeTaggedOperation) + require.True(t, ok) + require.True(t, + xm3test.ByteSlicesBackedBySameData( + w.ns.Bytes(), + write.namespace.Bytes())) + require.True(t, + xm3test.ByteSlicesBackedBySameData( + w.id.Bytes(), + write.request.ID)) + }}) + + require.NoError(t, session.Open()) + // Begin write + var resultErr error + var writeWg sync.WaitGroup + writeWg.Add(1) + go func() { + resultErr = session.WriteTagged(w.ns, w.id, ident.NewTagsIterator(w.tags), + w.t, w.value, w.unit, w.annotation) + writeWg.Done() + }() + + // Callback + enqueueWg.Wait() + for i := 0; i < session.state.topoMap.Replicas(); i++ { + completionFn(session.state.topoMap.Hosts()[0], nil) + } + + // Wait for write to complete + writeWg.Wait() + require.NoError(t, resultErr) + + require.NoError(t, session.Close()) +} + func TestSessionWriteTaggedBadUnitErr(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/src/dbnode/client/session_write_test.go b/src/dbnode/client/session_write_test.go index ccc962cb60..5537655528 100644 --- a/src/dbnode/client/session_write_test.go +++ b/src/dbnode/client/session_write_test.go @@ -107,24 +107,23 @@ func TestSessionWriteDoesNotCloneNoFinalize(t *testing.T) { defer ctrl.Finish() session := newDefaultTestSession(t).(*session) - w := newWriteStub() var completionFn completionFn enqueueWg := mockHostQueues(ctrl, session, sessionTestReplicas, []testEnqueueFn{func(idx int, op op) { completionFn = op.CompletionFn() write, ok := op.(*writeOperation) - assert.True(t, ok) - assert.True(t, + require.True(t, ok) + require.True(t, + xtest.ByteSlicesBackedBySameData( + w.ns.Bytes(), + write.namespace.Bytes())) + require.True(t, xtest.ByteSlicesBackedBySameData( w.id.Bytes(), write.request.ID)) - assert.Equal(t, w.value, write.request.Datapoint.Value) - assert.Equal(t, w.t.Unix(), write.request.Datapoint.Timestamp) - assert.Equal(t, rpc.TimeType_UNIX_SECONDS, write.request.Datapoint.TimestampTimeType) - assert.NotNil(t, write.completionFn) }}) - assert.NoError(t, session.Open()) + require.NoError(t, session.Open()) // Begin write var resultErr error @@ -141,7 +140,9 @@ func TestSessionWriteDoesNotCloneNoFinalize(t *testing.T) { completionFn(session.state.topoMap.Hosts()[0], nil) } - assert.NoError(t, session.Close()) + writeWg.Wait() + require.NoError(t, resultErr) + require.NoError(t, session.Close()) } func TestSessionWriteBadUnitErr(t *testing.T) { From 2de87dbc8081374b5195568ef2577aee67dfad85 Mon Sep 17 00:00:00 2001 From: Prateek Rungta Date: Sat, 29 Sep 2018 20:40:23 -0400 Subject: [PATCH 07/10] persist manager fixes --- glide.lock | 2 +- glide.yaml | 2 +- src/dbnode/persist/fs/persist_manager_test.go | 25 ++++++++-------- src/x/test/ident_cmp_matcher.go | 30 +++++++++++++++++++ 4 files changed, 45 insertions(+), 14 deletions(-) create mode 100644 src/x/test/ident_cmp_matcher.go diff --git a/glide.lock b/glide.lock index 075ba16358..6a744faa22 100644 --- a/glide.lock +++ b/glide.lock @@ -312,7 +312,7 @@ imports: - protocol/proto - topic - name: github.com/m3db/m3x - version: 1bcb37ae8e67118c1d99ac31098a683a3a46a68f + version: 659975cd3b5b69f112f1b74244b4d8b440888cf4 vcs: git subpackages: - checked diff --git a/glide.yaml b/glide.yaml index 992b5d619c..6a748a561b 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,7 +1,7 @@ package: github.com/m3db/m3 import: - package: github.com/m3db/m3x - version: 1bcb37ae8e67118c1d99ac31098a683a3a46a68f + version: 659975cd3b5b69f112f1b74244b4d8b440888cf4 vcs: git subpackages: - checked diff --git a/src/dbnode/persist/fs/persist_manager_test.go b/src/dbnode/persist/fs/persist_manager_test.go index dcaf1cc10c..ae0b345203 100644 --- a/src/dbnode/persist/fs/persist_manager_test.go +++ b/src/dbnode/persist/fs/persist_manager_test.go @@ -33,6 +33,7 @@ import ( "github.com/m3db/m3/src/m3ninx/index/segment" m3ninxfs "github.com/m3db/m3/src/m3ninx/index/segment/fst" m3ninxpersist "github.com/m3db/m3/src/m3ninx/persist" + m3test "github.com/m3db/m3/src/x/test" "github.com/m3db/m3x/checked" "github.com/m3db/m3x/ident" xtest "github.com/m3db/m3x/test" @@ -92,7 +93,7 @@ func TestPersistenceManagerPrepareDataFileExistsWithDelete(t *testing.T) { BlockStart: blockStart, }, BlockSize: testBlockSize, - }) + }, m3test.IdentTransformer) writer.EXPECT().Open(writerOpts).Return(nil) shardDir := createDataShardDir(t, pm.filePathPrefix, testNs1ID, shard) @@ -142,7 +143,7 @@ func TestPersistenceManagerPrepareOpenError(t *testing.T) { BlockStart: blockStart, }, BlockSize: testBlockSize, - }) + }, m3test.IdentTransformer) writer.EXPECT().Open(writerOpts).Return(expectedErr) flush, err := pm.StartDataPersist() @@ -179,7 +180,7 @@ func TestPersistenceManagerPrepareSuccess(t *testing.T) { BlockStart: blockStart, }, BlockSize: testBlockSize, - }) + }, m3test.IdentTransformer) writer.EXPECT().Open(writerOpts).Return(nil) var ( @@ -277,8 +278,8 @@ func TestPersistenceManagerPrepareIndexFileExists(t *testing.T) { Namespace: testNs1ID, VolumeIndex: 1, }, - }, - )).Return(nil) + }, m3test.IdentTransformer), + ).Return(nil) prepared, err := flush.PrepareIndex(prepareOpts) require.NoError(t, err) require.NotNil(t, prepared.Persist) @@ -303,7 +304,7 @@ func TestPersistenceManagerPrepareIndexOpenError(t *testing.T) { BlockStart: blockStart, }, BlockSize: testBlockSize, - }) + }, m3test.IdentTransformer) writer.EXPECT().Open(writerOpts).Return(expectedErr) flush, err := pm.StartIndexPersist() @@ -340,7 +341,7 @@ func TestPersistenceManagerPrepareIndexSuccess(t *testing.T) { }, BlockSize: testBlockSize, } - writer.EXPECT().Open(xtest.CmpMatcher(writerOpts)).Return(nil) + writer.EXPECT().Open(xtest.CmpMatcher(writerOpts, m3test.IdentTransformer)).Return(nil) flush, err := pm.StartIndexPersist() require.NoError(t, err) @@ -369,7 +370,7 @@ func TestPersistenceManagerPrepareIndexSuccess(t *testing.T) { reader.EXPECT().Open(xtest.CmpMatcher(IndexReaderOpenOptions{ Identifier: writerOpts.Identifier, - })).Return(IndexReaderOpenResult{}, nil) + }, m3test.IdentTransformer)).Return(IndexReaderOpenResult{}, nil) file := NewMockIndexSegmentFile(ctrl) gomock.InOrder( @@ -408,7 +409,7 @@ func TestPersistenceManagerNoRateLimit(t *testing.T) { BlockStart: blockStart, }, BlockSize: testBlockSize, - }) + }, m3test.IdentTransformer) writer.EXPECT().Open(writerOpts).Return(nil) var ( @@ -487,7 +488,7 @@ func TestPersistenceManagerWithRateLimit(t *testing.T) { BlockStart: blockStart, }, BlockSize: testBlockSize, - }) + }, m3test.IdentTransformer) writer.EXPECT().Open(writerOpts).Return(nil).Times(iter) writer.EXPECT().WriteAll(id, ident.Tags{}, pm.dataPM.segmentHolder, checksum).Return(nil).AnyTimes() writer.EXPECT().Close().Times(iter) @@ -577,7 +578,7 @@ func TestPersistenceManagerNamespaceSwitch(t *testing.T) { BlockStart: blockStart, }, BlockSize: testBlockSize, - }) + }, m3test.IdentTransformer) writer.EXPECT().Open(writerOpts).Return(nil) prepareOpts := persist.DataPrepareOptions{ NamespaceMetadata: testNs1Metadata(t), @@ -596,7 +597,7 @@ func TestPersistenceManagerNamespaceSwitch(t *testing.T) { BlockStart: blockStart, }, BlockSize: testBlockSize, - }) + }, m3test.IdentTransformer) writer.EXPECT().Open(writerOpts).Return(nil) prepareOpts = persist.DataPrepareOptions{ NamespaceMetadata: testNs2Metadata(t), diff --git a/src/x/test/ident_cmp_matcher.go b/src/x/test/ident_cmp_matcher.go new file mode 100644 index 0000000000..c17e50bc14 --- /dev/null +++ b/src/x/test/ident_cmp_matcher.go @@ -0,0 +1,30 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package xtest + +import ( + "github.com/google/go-cmp/cmp" + "github.com/m3db/m3x/ident" +) + +// IdentTransform transforms any ident.ID into ident.BytesID to make it easier for comparison. +var IdentTransformer = cmp.Transformer("", + func(id ident.ID) ident.BytesID { return ident.BytesID(id.Bytes()) }) From bcef0af4529e9a11b845cb8b6504a021be26ab40 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Sat, 29 Sep 2018 21:09:44 -0400 Subject: [PATCH 08/10] Comment out local storage --- src/query/ts/m3db/storage/local.go | 216 ++++++++++++------------ src/query/ts/m3db/storage/local_test.go | 176 +++++++++---------- 2 files changed, 196 insertions(+), 196 deletions(-) diff --git a/src/query/ts/m3db/storage/local.go b/src/query/ts/m3db/storage/local.go index cf0ac4fab9..7c46a47431 100644 --- a/src/query/ts/m3db/storage/local.go +++ b/src/query/ts/m3db/storage/local.go @@ -20,111 +20,111 @@ package storage -import ( - "context" - "io" - - "github.com/m3db/m3/src/dbnode/encoding" - "github.com/m3db/m3/src/dbnode/encoding/m3tsz" - "github.com/m3db/m3/src/dbnode/storage/index" - "github.com/m3db/m3/src/query/storage" - "github.com/m3db/m3/src/query/storage/m3" - m3block "github.com/m3db/m3/src/query/ts/m3db" - "github.com/m3db/m3x/ident" - "github.com/m3db/m3x/pool" -) - -type localStorage struct { - clusters m3.Clusters - workerPool pool.ObjectPool -} - -var ( - iterAlloc = func(r io.Reader) encoding.ReaderIterator { - return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions()) - } - - emptySeriesMap map[ident.ID][]m3block.SeriesBlocks -) - -// nolint: deadcode -func newStorage(clusters m3.Clusters, workerPool pool.ObjectPool) *localStorage { - return &localStorage{clusters: clusters, workerPool: workerPool} -} - -// nolint: unparam -func (s *localStorage) fetchRaw( - namespace m3.ClusterNamespace, - query index.Query, - opts index.QueryOptions, -) (encoding.SeriesIterators, bool, error) { - namespaceID := namespace.NamespaceID() - session := namespace.Session() - return session.FetchTagged(namespaceID, query, opts) -} - -// todo(braskin): merge this with Fetch() -func (s *localStorage) fetchBlocks( - _ context.Context, - query *storage.FetchQuery, - options *storage.FetchOptions, -) (map[ident.ID][]m3block.SeriesBlocks, error) { - - m3query, err := storage.FetchQueryToM3Query(query) - if err != nil { - return emptySeriesMap, err - } - - opts := storage.FetchOptionsToM3Options(options, query) - - // todo(braskin): figure out how to deal with multiple namespaces - namespaces := s.clusters.ClusterNamespaces() - // todo(braskin): figure out what to do with second return argument - seriesIters, _, err := s.fetchRaw(namespaces[0], m3query, opts) - if err != nil { - return emptySeriesMap, err - } - - seriesBlockList, err := m3block.ConvertM3DBSeriesIterators(seriesIters, iterAlloc) - if err != nil { - return emptySeriesMap, err - } - - // NB/todo(braskin): because we are only support querying one namespace now, we can just create - // a multiNamespaceSeriesList with one element. However, once we support querying multiple namespaces, - // we will need to append each namespace seriesBlockList to the multiNamespaceSeriesList - namespaceSeriesList := m3block.NamespaceSeriesList{ - Namespace: namespaces[0].NamespaceID().String(), - SeriesList: seriesBlockList, - } - - multiNamespaceSeriesList := []m3block.NamespaceSeriesList{namespaceSeriesList} - multiNamespaceSeries := fromNamespaceListToSeriesList(multiNamespaceSeriesList) - return multiNamespaceSeries, nil -} - -func (s *localStorage) Close() error { - return nil -} - -func fromNamespaceListToSeriesList(nsList []m3block.NamespaceSeriesList) map[ident.ID][]m3block.SeriesBlocks { - seriesList := make(map[ident.ID][]m3block.SeriesBlocks) - - for _, ns := range nsList { - for _, series := range ns.SeriesList { - if _, ok := seriesList[series.ID]; !ok { - seriesList[series.ID] = []m3block.SeriesBlocks{ - { - Blocks: series.Blocks, - ID: series.ID, - Tags: series.Tags, - Namespace: series.Namespace, - }, - } - } else { - seriesList[series.ID] = append(seriesList[series.ID], series) - } - } - } - return seriesList -} +// import ( +// "context" +// "io" + +// "github.com/m3db/m3/src/dbnode/encoding" +// "github.com/m3db/m3/src/dbnode/encoding/m3tsz" +// "github.com/m3db/m3/src/dbnode/storage/index" +// "github.com/m3db/m3/src/query/storage" +// "github.com/m3db/m3/src/query/storage/m3" +// m3block "github.com/m3db/m3/src/query/ts/m3db" +// "github.com/m3db/m3x/ident" +// "github.com/m3db/m3x/pool" +// ) + +// type localStorage struct { +// clusters m3.Clusters +// workerPool pool.ObjectPool +// } + +// var ( +// iterAlloc = func(r io.Reader) encoding.ReaderIterator { +// return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions()) +// } + +// emptySeriesMap map[ident.ID][]m3block.SeriesBlocks +// ) + +// // nolint: deadcode +// func newStorage(clusters m3.Clusters, workerPool pool.ObjectPool) *localStorage { +// return &localStorage{clusters: clusters, workerPool: workerPool} +// } + +// // nolint: unparam +// func (s *localStorage) fetchRaw( +// namespace m3.ClusterNamespace, +// query index.Query, +// opts index.QueryOptions, +// ) (encoding.SeriesIterators, bool, error) { +// namespaceID := namespace.NamespaceID() +// session := namespace.Session() +// return session.FetchTagged(namespaceID, query, opts) +// } + +// // todo(braskin): merge this with Fetch() +// func (s *localStorage) fetchBlocks( +// _ context.Context, +// query *storage.FetchQuery, +// options *storage.FetchOptions, +// ) (map[ident.ID][]m3block.SeriesBlocks, error) { + +// m3query, err := storage.FetchQueryToM3Query(query) +// if err != nil { +// return emptySeriesMap, err +// } + +// opts := storage.FetchOptionsToM3Options(options, query) + +// // todo(braskin): figure out how to deal with multiple namespaces +// namespaces := s.clusters.ClusterNamespaces() +// // todo(braskin): figure out what to do with second return argument +// seriesIters, _, err := s.fetchRaw(namespaces[0], m3query, opts) +// if err != nil { +// return emptySeriesMap, err +// } + +// seriesBlockList, err := m3block.ConvertM3DBSeriesIterators(seriesIters, iterAlloc) +// if err != nil { +// return emptySeriesMap, err +// } + +// // NB/todo(braskin): because we are only support querying one namespace now, we can just create +// // a multiNamespaceSeriesList with one element. However, once we support querying multiple namespaces, +// // we will need to append each namespace seriesBlockList to the multiNamespaceSeriesList +// namespaceSeriesList := m3block.NamespaceSeriesList{ +// Namespace: namespaces[0].NamespaceID().String(), +// SeriesList: seriesBlockList, +// } + +// multiNamespaceSeriesList := []m3block.NamespaceSeriesList{namespaceSeriesList} +// multiNamespaceSeries := fromNamespaceListToSeriesList(multiNamespaceSeriesList) +// return multiNamespaceSeries, nil +// } + +// func (s *localStorage) Close() error { +// return nil +// } + +// func fromNamespaceListToSeriesList(nsList []m3block.NamespaceSeriesList) map[ident.ID][]m3block.SeriesBlocks { +// seriesList := make(map[ident.ID][]m3block.SeriesBlocks) + +// for _, ns := range nsList { +// for _, series := range ns.SeriesList { +// if _, ok := seriesList[series.ID]; !ok { +// seriesList[series.ID] = []m3block.SeriesBlocks{ +// { +// Blocks: series.Blocks, +// ID: series.ID, +// Tags: series.Tags, +// Namespace: series.Namespace, +// }, +// } +// } else { +// seriesList[series.ID] = append(seriesList[series.ID], series) +// } +// } +// } +// return seriesList +// } diff --git a/src/query/ts/m3db/storage/local_test.go b/src/query/ts/m3db/storage/local_test.go index 45666d74ef..27ffc6d8b7 100644 --- a/src/query/ts/m3db/storage/local_test.go +++ b/src/query/ts/m3db/storage/local_test.go @@ -20,99 +20,99 @@ package storage -import ( - "context" - "testing" - "time" +// import ( +// "context" +// "testing" +// "time" - "github.com/m3db/m3/src/dbnode/client" - "github.com/m3db/m3/src/dbnode/encoding" - "github.com/m3db/m3/src/query/models" - "github.com/m3db/m3/src/query/storage" - "github.com/m3db/m3/src/query/storage/m3" - "github.com/m3db/m3/src/query/test" - "github.com/m3db/m3/src/query/util/logging" - "github.com/m3db/m3x/ident" +// "github.com/m3db/m3/src/dbnode/client" +// "github.com/m3db/m3/src/dbnode/encoding" +// "github.com/m3db/m3/src/query/models" +// "github.com/m3db/m3/src/query/storage" +// "github.com/m3db/m3/src/query/storage/m3" +// "github.com/m3db/m3/src/query/test" +// "github.com/m3db/m3/src/query/util/logging" +// "github.com/m3db/m3x/ident" - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) +// "github.com/golang/mock/gomock" +// "github.com/stretchr/testify/assert" +// "github.com/stretchr/testify/require" +// ) -var testRetention = 30 * 24 * time.Hour +// var testRetention = 30 * 24 * time.Hour -type testSessions struct { - unaggregated1MonthRetention *client.MockSession - aggregated1MonthRetention1MinuteResolution *client.MockSession -} +// type testSessions struct { +// unaggregated1MonthRetention *client.MockSession +// aggregated1MonthRetention1MinuteResolution *client.MockSession +// } -func setup( - t *testing.T, - ctrl *gomock.Controller, -) (*localStorage, testSessions) { - logging.InitWithCores(nil) - logger := logging.WithContext(context.TODO()) - defer logger.Sync() - unaggregated1MonthRetention := client.NewMockSession(ctrl) - aggregated1MonthRetention1MinuteResolution := client.NewMockSession(ctrl) - clusters, err := m3.NewClusters(m3.UnaggregatedClusterNamespaceDefinition{ - NamespaceID: ident.StringID("metrics_unaggregated"), - Session: unaggregated1MonthRetention, - Retention: testRetention, - }, m3.AggregatedClusterNamespaceDefinition{ - NamespaceID: ident.StringID("metrics_aggregated"), - Session: aggregated1MonthRetention1MinuteResolution, - Retention: testRetention, - Resolution: time.Minute, - }) - require.NoError(t, err) - storage := newStorage(clusters, nil) - return storage, testSessions{ - unaggregated1MonthRetention: unaggregated1MonthRetention, - aggregated1MonthRetention1MinuteResolution: aggregated1MonthRetention1MinuteResolution, - } -} +// func setup( +// t *testing.T, +// ctrl *gomock.Controller, +// ) (*localStorage, testSessions) { +// logging.InitWithCores(nil) +// logger := logging.WithContext(context.TODO()) +// defer logger.Sync() +// unaggregated1MonthRetention := client.NewMockSession(ctrl) +// aggregated1MonthRetention1MinuteResolution := client.NewMockSession(ctrl) +// clusters, err := m3.NewClusters(m3.UnaggregatedClusterNamespaceDefinition{ +// NamespaceID: ident.StringID("metrics_unaggregated"), +// Session: unaggregated1MonthRetention, +// Retention: testRetention, +// }, m3.AggregatedClusterNamespaceDefinition{ +// NamespaceID: ident.StringID("metrics_aggregated"), +// Session: aggregated1MonthRetention1MinuteResolution, +// Retention: testRetention, +// Resolution: time.Minute, +// }) +// require.NoError(t, err) +// storage := newStorage(clusters, nil) +// return storage, testSessions{ +// unaggregated1MonthRetention: unaggregated1MonthRetention, +// aggregated1MonthRetention1MinuteResolution: aggregated1MonthRetention1MinuteResolution, +// } +// } -func newFetchReq() *storage.FetchQuery { - matchers := models.Matchers{ - { - Type: models.MatchEqual, - Name: "foo", - Value: "bar", - }, - { - Type: models.MatchEqual, - Name: "biz", - Value: "baz", - }, - } - return &storage.FetchQuery{ - TagMatchers: matchers, - Start: time.Now().Add(-10 * time.Minute), - End: time.Now(), - } -} +// func newFetchReq() *storage.FetchQuery { +// matchers := models.Matchers{ +// { +// Type: models.MatchEqual, +// Name: "foo", +// Value: "bar", +// }, +// { +// Type: models.MatchEqual, +// Name: "biz", +// Value: "baz", +// }, +// } +// return &storage.FetchQuery{ +// TagMatchers: matchers, +// Start: time.Now().Add(-10 * time.Minute), +// End: time.Now(), +// } +// } -func TestLocalRead(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - store, sessions := setup(t, ctrl) - iter, err := test.BuildTestSeriesIterator() - require.NoError(t, err) - iterators := encoding.NewSeriesIterators([]encoding.SeriesIterator{iter}, nil) - sessions.unaggregated1MonthRetention.EXPECT().FetchTagged(gomock.Any(), gomock.Any(), gomock.Any()). - Return(iterators, true, nil) - searchReq := newFetchReq() - results, err := store.fetchBlocks(context.TODO(), searchReq, &storage.FetchOptions{Limit: 100}) - assert.NoError(t, err) +// func TestLocalRead(t *testing.T) { +// ctrl := gomock.NewController(t) +// defer ctrl.Finish() +// store, sessions := setup(t, ctrl) +// iter, err := test.BuildTestSeriesIterator() +// require.NoError(t, err) +// iterators := encoding.NewSeriesIterators([]encoding.SeriesIterator{iter}, nil) +// sessions.unaggregated1MonthRetention.EXPECT().FetchTagged(gomock.Any(), gomock.Any(), gomock.Any()). +// Return(iterators, true, nil) +// searchReq := newFetchReq() +// results, err := store.fetchBlocks(context.TODO(), searchReq, &storage.FetchOptions{Limit: 100}) +// assert.NoError(t, err) - for id, seriesBlocks := range results { - assert.Equal(t, "id", id.String()) - for _, blocks := range seriesBlocks { - assert.Equal(t, "namespace", blocks.Namespace.String()) - blockTags, err := storage.FromIdentTagIteratorToTags(blocks.Tags) - require.NoError(t, err) - assert.Equal(t, models.Tags{{"baz", "qux"}, {"foo", "bar"}}, blockTags) - } - } -} +// for id, seriesBlocks := range results { +// assert.Equal(t, "id", id.String()) +// for _, blocks := range seriesBlocks { +// assert.Equal(t, "namespace", blocks.Namespace.String()) +// blockTags, err := storage.FromIdentTagIteratorToTags(blocks.Tags) +// require.NoError(t, err) +// assert.Equal(t, models.Tags{{"baz", "qux"}, {"foo", "bar"}}, blockTags) +// } +// } +// } From d56ff4565d1bd0cd2ed866520b9b8145c4599688 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Sat, 29 Sep 2018 21:16:24 -0400 Subject: [PATCH 09/10] Order imports --- src/x/test/ident_cmp_matcher.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/x/test/ident_cmp_matcher.go b/src/x/test/ident_cmp_matcher.go index c17e50bc14..979e50cf8d 100644 --- a/src/x/test/ident_cmp_matcher.go +++ b/src/x/test/ident_cmp_matcher.go @@ -21,10 +21,11 @@ package xtest import ( - "github.com/google/go-cmp/cmp" "github.com/m3db/m3x/ident" + + "github.com/google/go-cmp/cmp" ) -// IdentTransform transforms any ident.ID into ident.BytesID to make it easier for comparison. +// IdentTransformer transforms any ident.ID into ident.BytesID to make it easier for comparison. var IdentTransformer = cmp.Transformer("", func(id ident.ID) ident.BytesID { return ident.BytesID(id.Bytes()) }) From 649c103fbc15c78711373932627a0bdc22bf8dd2 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Sat, 29 Sep 2018 21:50:19 -0400 Subject: [PATCH 10/10] convert map[ident.ID] -> map[string] --- src/query/ts/m3db/storage/local.go | 218 ++++++++++++------------ src/query/ts/m3db/storage/local_test.go | 176 +++++++++---------- 2 files changed, 198 insertions(+), 196 deletions(-) diff --git a/src/query/ts/m3db/storage/local.go b/src/query/ts/m3db/storage/local.go index 7c46a47431..3ae63d214c 100644 --- a/src/query/ts/m3db/storage/local.go +++ b/src/query/ts/m3db/storage/local.go @@ -20,111 +20,113 @@ package storage -// import ( -// "context" -// "io" - -// "github.com/m3db/m3/src/dbnode/encoding" -// "github.com/m3db/m3/src/dbnode/encoding/m3tsz" -// "github.com/m3db/m3/src/dbnode/storage/index" -// "github.com/m3db/m3/src/query/storage" -// "github.com/m3db/m3/src/query/storage/m3" -// m3block "github.com/m3db/m3/src/query/ts/m3db" -// "github.com/m3db/m3x/ident" -// "github.com/m3db/m3x/pool" -// ) - -// type localStorage struct { -// clusters m3.Clusters -// workerPool pool.ObjectPool -// } - -// var ( -// iterAlloc = func(r io.Reader) encoding.ReaderIterator { -// return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions()) -// } - -// emptySeriesMap map[ident.ID][]m3block.SeriesBlocks -// ) - -// // nolint: deadcode -// func newStorage(clusters m3.Clusters, workerPool pool.ObjectPool) *localStorage { -// return &localStorage{clusters: clusters, workerPool: workerPool} -// } - -// // nolint: unparam -// func (s *localStorage) fetchRaw( -// namespace m3.ClusterNamespace, -// query index.Query, -// opts index.QueryOptions, -// ) (encoding.SeriesIterators, bool, error) { -// namespaceID := namespace.NamespaceID() -// session := namespace.Session() -// return session.FetchTagged(namespaceID, query, opts) -// } - -// // todo(braskin): merge this with Fetch() -// func (s *localStorage) fetchBlocks( -// _ context.Context, -// query *storage.FetchQuery, -// options *storage.FetchOptions, -// ) (map[ident.ID][]m3block.SeriesBlocks, error) { - -// m3query, err := storage.FetchQueryToM3Query(query) -// if err != nil { -// return emptySeriesMap, err -// } - -// opts := storage.FetchOptionsToM3Options(options, query) - -// // todo(braskin): figure out how to deal with multiple namespaces -// namespaces := s.clusters.ClusterNamespaces() -// // todo(braskin): figure out what to do with second return argument -// seriesIters, _, err := s.fetchRaw(namespaces[0], m3query, opts) -// if err != nil { -// return emptySeriesMap, err -// } - -// seriesBlockList, err := m3block.ConvertM3DBSeriesIterators(seriesIters, iterAlloc) -// if err != nil { -// return emptySeriesMap, err -// } - -// // NB/todo(braskin): because we are only support querying one namespace now, we can just create -// // a multiNamespaceSeriesList with one element. However, once we support querying multiple namespaces, -// // we will need to append each namespace seriesBlockList to the multiNamespaceSeriesList -// namespaceSeriesList := m3block.NamespaceSeriesList{ -// Namespace: namespaces[0].NamespaceID().String(), -// SeriesList: seriesBlockList, -// } - -// multiNamespaceSeriesList := []m3block.NamespaceSeriesList{namespaceSeriesList} -// multiNamespaceSeries := fromNamespaceListToSeriesList(multiNamespaceSeriesList) -// return multiNamespaceSeries, nil -// } - -// func (s *localStorage) Close() error { -// return nil -// } - -// func fromNamespaceListToSeriesList(nsList []m3block.NamespaceSeriesList) map[ident.ID][]m3block.SeriesBlocks { -// seriesList := make(map[ident.ID][]m3block.SeriesBlocks) - -// for _, ns := range nsList { -// for _, series := range ns.SeriesList { -// if _, ok := seriesList[series.ID]; !ok { -// seriesList[series.ID] = []m3block.SeriesBlocks{ -// { -// Blocks: series.Blocks, -// ID: series.ID, -// Tags: series.Tags, -// Namespace: series.Namespace, -// }, -// } -// } else { -// seriesList[series.ID] = append(seriesList[series.ID], series) -// } -// } -// } -// return seriesList -// } +import ( + "context" + "io" + + "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/dbnode/encoding/m3tsz" + "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/query/storage/m3" + m3block "github.com/m3db/m3/src/query/ts/m3db" + "github.com/m3db/m3x/pool" +) + +type localStorage struct { + clusters m3.Clusters + workerPool pool.ObjectPool +} + +var ( + iterAlloc = func(r io.Reader) encoding.ReaderIterator { + return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions()) + } + + emptySeriesMap map[string][]m3block.SeriesBlocks +) + +// nolint: deadcode +func newStorage(clusters m3.Clusters, workerPool pool.ObjectPool) *localStorage { + return &localStorage{clusters: clusters, workerPool: workerPool} +} + +// nolint: unparam +func (s *localStorage) fetchRaw( + namespace m3.ClusterNamespace, + query index.Query, + opts index.QueryOptions, +) (encoding.SeriesIterators, bool, error) { + namespaceID := namespace.NamespaceID() + session := namespace.Session() + return session.FetchTagged(namespaceID, query, opts) +} + +// todo(braskin): merge this with Fetch() and use genny to generate +// a map that can handle ident.ID as a key so we don't need to +// allocate a string for each entry. +func (s *localStorage) fetchBlocks( + _ context.Context, + query *storage.FetchQuery, + options *storage.FetchOptions, +) (map[string][]m3block.SeriesBlocks, error) { + + m3query, err := storage.FetchQueryToM3Query(query) + if err != nil { + return emptySeriesMap, err + } + + opts := storage.FetchOptionsToM3Options(options, query) + + // todo(braskin): figure out how to deal with multiple namespaces + namespaces := s.clusters.ClusterNamespaces() + // todo(braskin): figure out what to do with second return argument + seriesIters, _, err := s.fetchRaw(namespaces[0], m3query, opts) + if err != nil { + return emptySeriesMap, err + } + + seriesBlockList, err := m3block.ConvertM3DBSeriesIterators(seriesIters, iterAlloc) + if err != nil { + return emptySeriesMap, err + } + + // NB/todo(braskin): because we are only support querying one namespace now, we can just create + // a multiNamespaceSeriesList with one element. However, once we support querying multiple namespaces, + // we will need to append each namespace seriesBlockList to the multiNamespaceSeriesList + namespaceSeriesList := m3block.NamespaceSeriesList{ + Namespace: namespaces[0].NamespaceID().String(), + SeriesList: seriesBlockList, + } + + multiNamespaceSeriesList := []m3block.NamespaceSeriesList{namespaceSeriesList} + multiNamespaceSeries := fromNamespaceListToSeriesList(multiNamespaceSeriesList) + return multiNamespaceSeries, nil +} + +func (s *localStorage) Close() error { + return nil +} + +func fromNamespaceListToSeriesList(nsList []m3block.NamespaceSeriesList) map[string][]m3block.SeriesBlocks { + seriesList := make(map[string][]m3block.SeriesBlocks) + + for _, ns := range nsList { + for _, series := range ns.SeriesList { + idStr := series.ID.String() + if _, ok := seriesList[idStr]; !ok { + seriesList[idStr] = []m3block.SeriesBlocks{ + { + Blocks: series.Blocks, + ID: series.ID, + Tags: series.Tags, + Namespace: series.Namespace, + }, + } + } else { + seriesList[idStr] = append(seriesList[idStr], series) + } + } + } + return seriesList +} diff --git a/src/query/ts/m3db/storage/local_test.go b/src/query/ts/m3db/storage/local_test.go index 27ffc6d8b7..edfd690f7b 100644 --- a/src/query/ts/m3db/storage/local_test.go +++ b/src/query/ts/m3db/storage/local_test.go @@ -20,99 +20,99 @@ package storage -// import ( -// "context" -// "testing" -// "time" +import ( + "context" + "testing" + "time" -// "github.com/m3db/m3/src/dbnode/client" -// "github.com/m3db/m3/src/dbnode/encoding" -// "github.com/m3db/m3/src/query/models" -// "github.com/m3db/m3/src/query/storage" -// "github.com/m3db/m3/src/query/storage/m3" -// "github.com/m3db/m3/src/query/test" -// "github.com/m3db/m3/src/query/util/logging" -// "github.com/m3db/m3x/ident" + "github.com/m3db/m3/src/dbnode/client" + "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/query/storage/m3" + "github.com/m3db/m3/src/query/test" + "github.com/m3db/m3/src/query/util/logging" + "github.com/m3db/m3x/ident" -// "github.com/golang/mock/gomock" -// "github.com/stretchr/testify/assert" -// "github.com/stretchr/testify/require" -// ) + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) -// var testRetention = 30 * 24 * time.Hour +var testRetention = 30 * 24 * time.Hour -// type testSessions struct { -// unaggregated1MonthRetention *client.MockSession -// aggregated1MonthRetention1MinuteResolution *client.MockSession -// } +type testSessions struct { + unaggregated1MonthRetention *client.MockSession + aggregated1MonthRetention1MinuteResolution *client.MockSession +} -// func setup( -// t *testing.T, -// ctrl *gomock.Controller, -// ) (*localStorage, testSessions) { -// logging.InitWithCores(nil) -// logger := logging.WithContext(context.TODO()) -// defer logger.Sync() -// unaggregated1MonthRetention := client.NewMockSession(ctrl) -// aggregated1MonthRetention1MinuteResolution := client.NewMockSession(ctrl) -// clusters, err := m3.NewClusters(m3.UnaggregatedClusterNamespaceDefinition{ -// NamespaceID: ident.StringID("metrics_unaggregated"), -// Session: unaggregated1MonthRetention, -// Retention: testRetention, -// }, m3.AggregatedClusterNamespaceDefinition{ -// NamespaceID: ident.StringID("metrics_aggregated"), -// Session: aggregated1MonthRetention1MinuteResolution, -// Retention: testRetention, -// Resolution: time.Minute, -// }) -// require.NoError(t, err) -// storage := newStorage(clusters, nil) -// return storage, testSessions{ -// unaggregated1MonthRetention: unaggregated1MonthRetention, -// aggregated1MonthRetention1MinuteResolution: aggregated1MonthRetention1MinuteResolution, -// } -// } +func setup( + t *testing.T, + ctrl *gomock.Controller, +) (*localStorage, testSessions) { + logging.InitWithCores(nil) + logger := logging.WithContext(context.TODO()) + defer logger.Sync() + unaggregated1MonthRetention := client.NewMockSession(ctrl) + aggregated1MonthRetention1MinuteResolution := client.NewMockSession(ctrl) + clusters, err := m3.NewClusters(m3.UnaggregatedClusterNamespaceDefinition{ + NamespaceID: ident.StringID("metrics_unaggregated"), + Session: unaggregated1MonthRetention, + Retention: testRetention, + }, m3.AggregatedClusterNamespaceDefinition{ + NamespaceID: ident.StringID("metrics_aggregated"), + Session: aggregated1MonthRetention1MinuteResolution, + Retention: testRetention, + Resolution: time.Minute, + }) + require.NoError(t, err) + storage := newStorage(clusters, nil) + return storage, testSessions{ + unaggregated1MonthRetention: unaggregated1MonthRetention, + aggregated1MonthRetention1MinuteResolution: aggregated1MonthRetention1MinuteResolution, + } +} -// func newFetchReq() *storage.FetchQuery { -// matchers := models.Matchers{ -// { -// Type: models.MatchEqual, -// Name: "foo", -// Value: "bar", -// }, -// { -// Type: models.MatchEqual, -// Name: "biz", -// Value: "baz", -// }, -// } -// return &storage.FetchQuery{ -// TagMatchers: matchers, -// Start: time.Now().Add(-10 * time.Minute), -// End: time.Now(), -// } -// } +func newFetchReq() *storage.FetchQuery { + matchers := models.Matchers{ + { + Type: models.MatchEqual, + Name: "foo", + Value: "bar", + }, + { + Type: models.MatchEqual, + Name: "biz", + Value: "baz", + }, + } + return &storage.FetchQuery{ + TagMatchers: matchers, + Start: time.Now().Add(-10 * time.Minute), + End: time.Now(), + } +} -// func TestLocalRead(t *testing.T) { -// ctrl := gomock.NewController(t) -// defer ctrl.Finish() -// store, sessions := setup(t, ctrl) -// iter, err := test.BuildTestSeriesIterator() -// require.NoError(t, err) -// iterators := encoding.NewSeriesIterators([]encoding.SeriesIterator{iter}, nil) -// sessions.unaggregated1MonthRetention.EXPECT().FetchTagged(gomock.Any(), gomock.Any(), gomock.Any()). -// Return(iterators, true, nil) -// searchReq := newFetchReq() -// results, err := store.fetchBlocks(context.TODO(), searchReq, &storage.FetchOptions{Limit: 100}) -// assert.NoError(t, err) +func TestLocalRead(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + store, sessions := setup(t, ctrl) + iter, err := test.BuildTestSeriesIterator() + require.NoError(t, err) + iterators := encoding.NewSeriesIterators([]encoding.SeriesIterator{iter}, nil) + sessions.unaggregated1MonthRetention.EXPECT().FetchTagged(gomock.Any(), gomock.Any(), gomock.Any()). + Return(iterators, true, nil) + searchReq := newFetchReq() + results, err := store.fetchBlocks(context.TODO(), searchReq, &storage.FetchOptions{Limit: 100}) + assert.NoError(t, err) -// for id, seriesBlocks := range results { -// assert.Equal(t, "id", id.String()) -// for _, blocks := range seriesBlocks { -// assert.Equal(t, "namespace", blocks.Namespace.String()) -// blockTags, err := storage.FromIdentTagIteratorToTags(blocks.Tags) -// require.NoError(t, err) -// assert.Equal(t, models.Tags{{"baz", "qux"}, {"foo", "bar"}}, blockTags) -// } -// } -// } + for id, seriesBlocks := range results { + assert.Equal(t, "id", id) + for _, blocks := range seriesBlocks { + assert.Equal(t, "namespace", blocks.Namespace.String()) + blockTags, err := storage.FromIdentTagIteratorToTags(blocks.Tags) + require.NoError(t, err) + assert.Equal(t, models.Tags{{"baz", "qux"}, {"foo", "bar"}}, blockTags) + } + } +}