Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StepIter to blocks #778

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open

Add StepIter to blocks #778

wants to merge 26 commits into from

Conversation

benraskin92
Copy link
Collaborator

No description provided.

@codecov
Copy link

codecov bot commented Jul 2, 2018

Codecov Report

Merging #778 into master will decrease coverage by 0.14%.
The diff coverage is 67.21%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #778      +/-   ##
==========================================
- Coverage   78.08%   77.94%   -0.15%     
==========================================
  Files         364      367       +3     
  Lines       31058    31238     +180     
==========================================
+ Hits        24253    24348      +95     
- Misses       5185     5250      +65     
- Partials     1620     1640      +20
Flag Coverage Δ
#coordinator 60.11% <67.21%> (+0.06%) ⬆️
#dbnode 81.66% <ø> (-0.07%) ⬇️
#m3ninx 72.7% <ø> (ø) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f94b19a...6f11dc6. Read the comment docs.

if err != nil {
return block.Result{}, err
}

res, err := storage.FetchResultToBlockResult(fetchResult, query)
iterAlloc := func(r io.Reader) encoding.ReaderIterator {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull this out into an init{} block since we only need a single iterAlloc.

You may also want to populate this with iterator pools from m3db like this too (should be able to use encoding.IteratorPools() to get the pools out):

                encodingOpts := encoding.NewOptions().
		SetEncoderPool(encoderPool).
		SetReaderIteratorPool(iteratorPool).
		SetBytesPool(bytesPool).
		SetSegmentReaderPool(segmentReaderPool)

iterAlloc := func(r io.Reader) encoding.ReaderIterator {
return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions())
}
sliceOfSeriesBlocks, err := m3db.ConvertM3DBSeriesIterators(seriesIters, iterAlloc)
if err != nil {
return block.Result{}, err
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: var emptyBlock = block.Result{} outside the function, then return emptyBlock instead of block.Result{}

return block.Result{}, err
}

var res block.Result
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

return block.Result {
  Blocks: multiSeriesBlocks,
}, nil


// StepIter creates a new step iterator for a given MultiSeriesBlock
func (m MultiSeriesBlock) StepIter() block.StepIter {
return &multiSeriesBlockStepIter{seriesIters: newConsolidatedSeriesBlockIters(m.Blocks),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would be easier to read with a newline between { and seriesIters

for i, block := range blocks {
consolidatedNSBlockIters := make([]*consolidatedNSBlockIter, len(blocks[0].ConsolidatedNSBlocks))
for j, jblock := range block.ConsolidatedNSBlocks {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit newline

for i, expectedSlice := range expectedResults {
for j, exp := range expectedSlice {
if math.IsNaN(exp) {
require.True(t, math.IsNaN(actualResults[i][j]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: these can be asserts

@@ -33,7 +36,13 @@ var (
// SeriesBlockToMultiSeriesBlocks converts M3DB blocks to multi series blocks
func SeriesBlockToMultiSeriesBlocks(multiNamespaceSeriesList []MultiNamespaceSeries, seriesIteratorsPool encoding.MutableSeriesIteratorsPool) (MultiSeriesBlocks, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: split this with newlines, it's pretty long

commonTags = make(map[string]string)
firstSeriesTags = make(map[string]string)
)

for multiNamespaceSeriesIdx, multiNamespaceSeries := range multiNamespaceSeriesList {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: multiNamespaceSeriesIdx is a bit verbose for an index we're only using to check if we're on the first run through the loop

@@ -44,13 +53,34 @@ func SeriesBlockToMultiSeriesBlocks(multiNamespaceSeriesList []MultiNamespaceSer
// MultiSeriesBlocks list with the proper size
if multiNamespaceSeriesIdx == 0 {
multiSeriesBlocks = make(MultiSeriesBlocks, len(consolidatedSeriesBlocks))
commonTags, err = storage.FromIdentTagIteratorToTags(consolidatedSeriesBlocks[0].ConsolidatedNSBlocks[0].SeriesIterators.Iters()[0].Tags())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unsafe. Can't you just get tags out of the mutliNamespaceSeries?

Bounds: block.Bounds{
Start: seriesBlock.start,
End: seriesBlock.end,
StepSize: time.Minute, // get actual step size!!
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo

if err != nil {
return block.Result{}, err
}

iterAlloc := func(r io.Reader) encoding.ReaderIterator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do this when you create the storage and keep the function reference ?

iterAlloc := func(r io.Reader) encoding.ReaderIterator {
return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions())
}
sliceOfSeriesBlocks, err := m3db.ConvertM3DBSeriesIterators(seriesIters, iterAlloc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call it M3DBIteratorsToSeriesBlocks ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also the variable can be seriesBlockList

}

var (
res block.Result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

res := block.Result{
Blocks: make([]block.Block, len(multiSeriesBlocks))
}


iter, err := test.BuildTestSeriesIterator()
require.NoError(t, err)
iterators := encoding.NewSeriesIterators([]encoding.SeriesIterator{iter}, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add a test with multiple iterators ?

// // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// // THE SOFTWARE.

package m3db
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider adding a package src/coordinator/ts/m3db/block/ ? A lot of your files can be moved inside it

@@ -88,19 +88,24 @@ type ConsolidatedSeriesBlock struct {
consolidationFunc ConsolidationFunc // nolint
}

func (c ConsolidatedSeriesBlock) beyondBounds(multiSeriesBlock MultiSeriesBlock) bool {
if c.Metadata.Bounds.Start != multiSeriesBlock.Metadata.Bounds.Start || c.Metadata.Bounds.End != multiSeriesBlock.Metadata.Bounds.End {
// func (c ConsolidatedSeriesBlock) equalBounds(multiSeriesBlock MultiSeriesBlock) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this?

return true
}
fmt.Println("seriesIndex: ", c.seriesIndex, "indexTime: ", c.indexTime, "bounds: ", c.bounds)
// curr, _, _ := c.consolidatedNSBlockSeriesIters[c.seriesIndex].Current()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete?

lastDP, _, _ = c.consolidatedNSBlockSeriesIters[c.seriesIndex].Current()
c.lastDP = lastDP
}
// if c.indexTime.Add(c.bounds.StepSize).Before(lastDP.Timestamp) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete?

func (c *consolidatedNSBlockIter) Next() bool {
if len(c.consolidatesNSBlockSeriesIters) == 0 {
c.indexTime = c.indexTime.Add(c.bounds.StepSize)
fmt.Println("index time: ", c.indexTime)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete println

}
c.indexTime = c.indexTime.Add(c.bounds.StepSize)

fmt.Println("last dp: ", lastDP)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete println

}
return true
}
// func (c ConsolidatedNSBlock) beyondBounds(consolidatedSeriesBlock ConsolidatedSeriesBlock) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete

multiSeriesBlocks = make(MultiSeriesBlocks, len(consolidatedSeriesBlocks))
commonTags, err = storage.FromIdentTagIteratorToTags(consolidatedSeriesBlocks[0].ConsolidatedNSBlocks[0].SeriesIterators.Iters()[0].Tags())
commonTags, err = storage.FromIdentTagIteratorToTags(multiNamespaceSeries[0].Tags)
// commonTags, err = storage.FromIdentTagIteratorToTags(consolidatedSeriesBlocks[0].ConsolidatedNSBlocks[0].SeriesIterators.Iters()[0].Tags())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete

}
if c.currentTime.Add(c.bounds.StepSize).Before(lastDP.Timestamp) || c.lastVal {
return math.NaN()
fmt.Println("last dp current: ", c.lastDP)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete println


return false
}

func (c *consolidatedNSBlockIter) Current() float64 {
lastDP := c.lastDP
if !c.indexTime.After(c.lastDP.Timestamp) && c.indexTime.Add(c.bounds.StepSize).After(c.lastDP.Timestamp) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Might read better if you pull out c.indexTime and c.lastDP.Timestamp as variables first

commonTags, err = storage.FromIdentTagIteratorToTags(consolidatedSeriesBlocks[0].ConsolidatedNSBlocks[0].SeriesIterators.Iters()[0].Tags())
commonTags, err = storage.FromIdentTagIteratorToTags(multiNamespaceSeries[0].Tags)
// commonTags, err = storage.FromIdentTagIteratorToTags(consolidatedSeriesBlocks[0].ConsolidatedNSBlocks[0].SeriesIterators.Iters()[0].Tags())
fmt.Println("tags: ", commonTags)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete println

@benraskin92
Copy link
Collaborator Author

I'll get rid of all of the println's and commented out code

func (s *localStorage) fetchRaw(
query index.Query,
opts index.QueryOptions,
) (encoding.SeriesIterators, bool, error) { // nolint: unparam
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: nolint usually goes above the function declaration

multiSeriesBlocks[consolidatedSeriesBlockIdx].Start = consolidatedSeriesBlock.Start
multiSeriesBlocks[consolidatedSeriesBlockIdx].End = consolidatedSeriesBlock.End
if seriesIdx == 0 {
multiSeriesBlocks[consolidatedSeriesBlockIdx].Metadata.Bounds.StepSize = consolidatedSeriesBlock.Metadata.Bounds.StepSize
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you make these names shorter? It's quite difficult to read at the moment

Also, can pull out multiSeriesBlocks[consolidatedSeriesBlockIdx].Metadata.Bounds into a var?

query index.Query,
opts index.QueryOptions,
) (encoding.SeriesIterators, bool, error) { // nolint: unparam
iters, exhaustive, err := s.session.FetchTagged(s.namespace, query, opts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just return s.session.FetchTagged(...) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, wondering why you need this function since you're just calling another function inside

res, err := storage.FetchResultToBlockResult(fetchResult, query)
// NB/todo(braskin): because we are only support querying one namespace now, we can just create
// a multiNamespaceList with one element. However, once we support querying multiple namespaces,
// we will need to append each namespace sliceOfSeriesBlocks to the multiNamespaceList
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

old comment ? Might need to change the names

@@ -123,6 +125,25 @@ func TestLocalRead(t *testing.T) {
assert.Equal(t, tags, results.SeriesList[0].Tags)
}

func TestLocalFetchBlocks(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also test values ?

@@ -18,11 +18,14 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just call it convertor.go ? Its already in the block package

@@ -31,45 +34,81 @@ var (
)

// SeriesBlockToMultiSeriesBlocks converts M3DB blocks to multi series blocks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add comments to highlight the flow ?
something like:

m3db block -> consolidated series block (per namespace) -> ....

return consolidatedSeriesBlockIters
}

func newConsolidatedNSBlockIter(nsBlock ConsolidatedNSBlock) *consolidatedNSBlockIter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment about what you're iterating ?


func (c *consolidatedNSBlockIter) Next() bool {
c.indexTime = c.indexTime.Add(c.bounds.StepSize)
lastDP := c.lastDP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialize it right before you use it

return true
}

func (c *consolidatedNSBlockIter) next() bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call it nextIterator or something ?

}

func (c *consolidatedNSBlockIter) Current() float64 {
if !c.indexTime.After(c.lastDP.Timestamp) && c.indexTime.Add(c.bounds.StepSize).After(c.lastDP.Timestamp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract lastDP ??

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also can you add some comments about what you're checking

return results
}

func TestStepIteration(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably need more tests, also should write down conditions you are testing. This test is pretty opaque

@benraskin92 benraskin92 changed the base branch from master to lazyEval July 11, 2018 22:37
Copy link
Contributor

@nikunjgit nikunjgit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a bit confusing right now jumping back and forth between iterators. My suggestion is:

block/multiSeries.go
block/consolidatedSeries.go
block/nsSeries.go

You can also split their tests in a similar way.

@@ -104,6 +104,12 @@ type StepIter interface {
Meta() Metadata
}

// ValueIterator iterates through a generic iterator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better comment ?


if !consolidatedSeriesBlock.beyondBounds(multiSeriesBlocks[consolidatedSeriesBlockIdx]) {
if !equalBounds(consolidatedSeriesBlock.Metadata.Bounds, multiSeriesBlocks[consolidatedSeriesBlockIdx].Metadata.Bounds) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

define Equals method on the bounds ?


func (m MultiSeriesBlocks) commonTags() map[string]string {
commonTags := make(map[string]string)
for tag, val := range m[0].Blocks[0].Metadata.Tags {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

length checks ?

}

func (m MultiSeriesBlocks) commonTags() map[string]string {
commonTags := make(map[string]string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be models.Tags ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, might be cleaner to define a Clone() method on models.Tags which returns a copy ?

// NB(braskin): all series are the same across MultiSeriesBlocks so only need
// to look at one
for _, series := range m[0].Blocks {
seriesTags := series.Metadata.Tags
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skip the first one ?

}

lastDP := c.lastDP
for c.indexTime.After(lastDP.Timestamp) && c.nextIterator() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment

}

for i, seriesBlock := range blocks {
consolidatedNSBlockIters := make([]block.ValueIterator, len(blocks[0].ConsolidatedNSBlocks))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move out len(blocks[0].ConsolidatedNSBlocks)

func (m *multiSeriesBlockStepIter) SeriesMeta() []block.SeriesMeta {
metas := make([]block.SeriesMeta, len(m.blocks))
for i, s := range m.blocks {
// todo(braskin): add name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metas[i].Name = s.Metadata.Tags[models.MetricName] ?

if len(m.blocks) == 0 {
return 0
}
return m.blocks[0].Metadata.Bounds.Steps()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: this is inclusive of the end

}

bounds := m.meta.Bounds
t := bounds.Start.Add(time.Duration(m.index) * bounds.StepSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use bounds.TimeForIndex(m.index)

@@ -104,6 +104,12 @@ type StepIter interface {
Meta() Metadata
}

// ValueIterator iterates through a generic iterator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit ... is a generic float value iterator

return emptyBlock, err
}

// NB/todo(braskin): because we are only support querying one namespace now, we can just create
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth rebasing on master now that #785 is merged

assert.NoError(t, err)
require.NotNil(t, results)
require.Len(t, results.Blocks, 2)
require.Equal(t, models.Tags{"foo": "bar", "baz": "qux"}, results.Blocks[0].Meta().Tags)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth testing results.Blocks[1].Meta().Tags ?

@@ -18,11 +18,14 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should be converter.go

@@ -31,45 +34,84 @@ var (
)

// SeriesBlockToMultiSeriesBlocks converts M3DB blocks to multi series blocks
func SeriesBlockToMultiSeriesBlocks(multiNamespaceSeriesList []MultiNamespaceSeries, seriesIteratorsPool encoding.MutableSeriesIteratorsPool) (MultiSeriesBlocks, error) {
func SeriesBlockToMultiSeriesBlocks(multiNamespaceSeriesList []MultiNamespaceSeries, seriesIteratorsPool encoding.MutableSeriesIteratorsPool, stepSize time.Duration) (MultiSeriesBlocks, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit1: Split args over newlines
nit2: can we use shorter names? Easy to get lost with lines being this long

End: end,
StepSize: stepSize,
},
indexTime: start.Add(-1 * stepSize),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit start.Sub(stepSize)

},
start: now,
end: now.Add(600 * time.Second),
stepSize: 60 * time.Second,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit time.Minute?

start, end time.Time
stepSize time.Duration
dps [][]ts.Datapoint
expectedResults, actualResults []float64
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would prefer to have actualResults be a variable built in the test, rather than added to the struct itself

type multiSeriesBlockStepIterTestCase struct {
dps [][]float64
expectedResults [][]float64
actualResults [][]float64
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here re actualResults


type mockBlockStepIterTestCase struct {
dps [][]float64
actualResults, expectedResults [][]float64
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here re: actualResults

@@ -50,6 +50,14 @@ type Bounds struct {
StepSize time.Duration
}

// Equals determines whether two bounds (start and end) are equal
Copy link
Contributor

@nikunjgit nikunjgit Jul 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix comment: whether the bound (start and end) is same as the current bound ? Also, do you need to check the step size as well ?

// Clone returns a clone of the Tags
func (t Tags) Clone() Tags {
tags := make(Tags)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove line ?

@@ -18,11 +18,15 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

converter.go ?

multiSeriesBlocks[consolidatedSeriesBlockIdx].Start = consolidatedSeriesBlock.Start
multiSeriesBlocks[consolidatedSeriesBlockIdx].End = consolidatedSeriesBlock.End
if seriesIdx == 0 {
multiSeriesBlocks[consolidatedSeriesBlockIdx].Metadata.Bounds.StepSize = consolidatedSeriesBlock.Metadata.Bounds.StepSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract multiSeriesBlocks[consolidatedSeriesBlockIdx] into a variable ?

@@ -79,11 +123,12 @@ func newConsolidatedSeriesBlocks(multiNamespaceSeries MultiNamespaceSeries, seri
for consolidatedNSBlockIdx, consolidatedNSBlock := range consolidatedNSBlocks {
// we only want to set the start and end times once
if seriesBlocksIdx == 0 {
consolidatedSeriesBlocks[consolidatedNSBlockIdx].Start = consolidatedNSBlock.Start
consolidatedSeriesBlocks[consolidatedNSBlockIdx].End = consolidatedNSBlock.End
consolidatedSeriesBlocks[consolidatedNSBlockIdx].Metadata.Bounds.StepSize = consolidatedNSBlock.Bounds.StepSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract consolidatedSeriesBlocks[consolidatedNSBlockIdx] into variable ?

}

bounds := m.meta.Bounds
t, err := bounds.TimeForIndex(m.index)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do this before creating values ?

// Current returns the float64 value for that step
func (c *consolidatedNSBlockIter) Current() float64 {
lastDP := c.lastDP
// NB(braskin): if the last datapoint is after the current step, but before the (current step+1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move comment down ?

@@ -50,6 +50,14 @@ type Bounds struct {
StepSize time.Duration
}

// Equals determines whether two bounds (start and end) are equal
func (b Bounds) Equals(compareBound Bounds) bool {
if !b.Start.Equal(compareBound.Start) || !b.End.Equal(compareBound.End) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: cleaner to return b.Start.Equal(compareBound.Start) && b.End.Equal(compareBound.End); to Nikunj's point, if we don't care about stepsize here we can rename it from Equalsto EqualEndpoints or something

type ConsolidatedSeriesBlock struct {
Metadata block.Metadata
ConsolidatedNSBlocks []ConsolidatedNSBlock
consolidationFunc ConsolidationFunc // nolint
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:// nolint usually goes on top of struct

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if it's just for one field in the struct?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it doesn't really matter since it shouldn't be // nolint much longer :p


// ConsolidatedSeriesBlocks contain all of the consolidated blocks for
// a single timeseries across namespaces.
// Each ConsolidatedBlockIterator will have the same size
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: misplaced comment?

}

// todo(braskin): until we have consolidation
return values[0]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still a bit dangerous; no guarantee there will be one value

@@ -96,7 +96,7 @@ func TestConsolidatedNSBlockIter(t *testing.T) {
end: now.Add(1200 * time.Second),
stepSize: 60 * time.Second,
expectedResults: []float64{1, 2, nan, nan, nan, 3, nan, nan, nan, nan,
nan, nan, nan, nan, nan, nan, nan, 5, nan, 6},
nan, nan, nan, nan, nan, nan, nan, 5, nan, 6, nan},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did the expected change? Was there a logic issue previously?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah logic changed to be inclusive of the last step.

if err != nil {
return math.NaN()
}
if !indexTime.After(lastDP.Timestamp) && indexTime.Add(c.bounds.StepSize).After(lastDP.Timestamp) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline

Copy link
Collaborator

@arnikola arnikola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the .ci change intentional btw?

},
}

for _, test := range testCases {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little different from the recommended way of writing table tests which gets you a few nice things like splitting each of your test cases into a separate test run, allowing them to fail independently

Your choice if you want to convert it to that style or not here

@@ -31,45 +35,88 @@ var (
)

// SeriesBlockToMultiSeriesBlocks converts M3DB blocks to multi series blocks
func SeriesBlockToMultiSeriesBlocks(multiNamespaceSeriesList []MultiNamespaceSeries, seriesIteratorsPool encoding.MutableSeriesIteratorsPool) (MultiSeriesBlocks, error) {
func SeriesBlockToMultiSeriesBlocks(
multiNamespaceSeriesList []MultiNamespaceSeries, seriesIteratorsPool encoding.MutableSeriesIteratorsPool, stepSize time.Duration) (MultiSeriesBlocks, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you add newlines between these params to reduce the line length?

multiSeriesBlocks[consolidatedSeriesBlockIdx].Start = consolidatedSeriesBlock.Start
multiSeriesBlocks[consolidatedSeriesBlockIdx].End = consolidatedSeriesBlock.End
if seriesIdx == 0 {
blockBounds := multiSeriesBlocks[consolidatedSeriesBlockIdx].Metadata.Bounds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you do this instead? �

blockBounds := multiSeriesBlocks[consolidatedSeriesBlockIdx].Metadata.Bounds
multiSeriesBlocks[consolidatedSeriesBlockIdx].Metadata.Bounds = blockBounds
�```

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that works because the underlaying structs aren't pointers.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it matter if we're just making a copy though?

// newConsolidatedSeriesBlocks creates consolidated blocks by timeseries across namespaces
func newConsolidatedSeriesBlocks(multiNamespaceSeries MultiNamespaceSeries, seriesIteratorsPool encoding.MutableSeriesIteratorsPool) (ConsolidatedSeriesBlocks, error) {
func newConsolidatedSeriesBlocks(
multiNamespaceSeries MultiNamespaceSeries, seriesIteratorsPool encoding.MutableSeriesIteratorsPool, stepSize time.Duration) (ConsolidatedSeriesBlocks, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you add newlines to the arguement list?

type MultiNamespaceSeries []SeriesBlocks

// ID enforces the same ID across namespaces
func (n MultiNamespaceSeries) ID() ident.ID { return n[0].ID }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially dangerous if len(n) == 0

nsBlockIter := newConsolidatedNSBlockIter(nsBlock)
consolidatedNSBlockIters[j] = nsBlockIter
}
seriesBlockIters[i] = &consolidatedSeriesBlockIter{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline

seriesIterator.EXPECT().Next().Return(false)

for _, dp := range dps {
seriesIterator.EXPECT().Current().Return(ts.Datapoint{Timestamp: dp.Timestamp, Value: dp.Value}, xtime.Millisecond, nil).Times(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might be a bit easier to read if you generate the point separately
dp := ts.Datapoint{Timestamp: dp.Timestamp, Value: dp.Value}

@@ -96,16 +146,20 @@ func newConsolidatedSeriesBlocks(multiNamespaceSeries MultiNamespaceSeries, seri

// newConsolidatedNSBlocks creates a slice of consolidated blocks per namespace for a single timeseries
// nolint: unparam
func newConsolidatedNSBlocks(seriesBlocks SeriesBlocks, seriesIteratorsPool encoding.MutableSeriesIteratorsPool) []ConsolidatedNSBlock {
func newConsolidatedNSBlocks(
seriesBlocks SeriesBlocks, seriesIteratorsPool encoding.MutableSeriesIteratorsPool, stepSize time.Duration) []ConsolidatedNSBlock {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newlines in args list

multiSeriesBlocks[consolidatedSeriesBlockIdx].Metadata.Bounds = blockBounds
}

dupedTags := consolidatedSeriesBlock.ConsolidatedNSBlocks[0].SeriesIterators.Iters()[0].Tags().Duplicate()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comments here, or add a method on consolidatedNSBlocks that gives tags? There's a lot of indexing here without any clear guarantees that it's safe to do

return models.Tags{}
}

commonTags := m[0].Blocks[0].Metadata.Tags.Clone()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the clone necessary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@@ -50,6 +50,17 @@ type Bounds struct {
StepSize time.Duration
}

// Equals determines whether the bound (start and end) are equal
func (b Bounds) Equals(compareBound Bounds) bool {
equalTimes := b.Start.Equal(compareBound.Start) && b.End.Equal(compareBound.End)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be simplified:

if !equalTimes {
return false
}

return b.StepSize == compareBound.StepSize

return session.FetchTagged(namespaceID, query, opts)
}

func (s *localStorage) FetchBlocks(ctx context.Context, query *storage.FetchQuery, options *storage.FetchOptions) (block.Result, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a todo here to merge this with Fetch()

// SeriesIter creates a new series iterator for a given MultiSeriesBlock
func (m MultiSeriesBlock) SeriesIter() (block.SeriesIter, error) {
// todo(braskin): implement SeriesIter()
return nil, errors.New("SeriesIter not implemented")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we discussed, you need to implement this so that you can test e2e

type ConsolidatedBlocks []ConsolidatedBlock

func (c *consolidatedBlockStepIter) Current() float64 {
values := make([]float64, 0, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make([]float64, 0, len(c.nsBlockSeriesIters))

values = append(values, dp)
}

if len(values) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just return values[0], I don't think you need to check since Current() is only called when Next() returns true. You should never have to return math.NaN()

}

// Current returns a slice of values for the current series
func (c *consolidatedBlockSeriesIter) Current() []float64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually at conslidatedBlock level, doesn't look like you have a concept of series and steps. I think you should just have one iterator : consolidatedBlockIter which should return a single float64 on current. The concept of Step and Series probably comes at a higher level

// NSBlock is a single block for a given timeseries and namespace
// which contains all of the necessary SeriesIterators so that consolidation can
// happen across namespaces
type NSBlock struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above, at this level you don't have a concept of step and series since you just are representing a single series. Having a single iter with Current returning float64 ?

@nikunjgit
Copy link
Contributor

Are you still working on it or close it out ?

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


Benjamin Raskin seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants