Skip to content

Commit

Permalink
[dbnode] Tile iterators for wide aggregations (#2646)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored Sep 21, 2020
1 parent 9ea5682 commit a66fb7d
Show file tree
Hide file tree
Showing 13 changed files with 1,336 additions and 2 deletions.
95 changes: 95 additions & 0 deletions src/dbnode/encoding/tile/frame_annotations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package tile

import (
"bytes"

"github.com/m3db/m3/src/dbnode/ts"
)

type annotationRecorder struct {
count int
a ts.Annotation
as []ts.Annotation
}

var _ SeriesFrameAnnotations = (*annotationRecorder)(nil)

func newAnnotationRecorder() *annotationRecorder {
return &annotationRecorder{}
}

func (a *annotationRecorder) SingleValue() (ts.Annotation, bool) {
return a.a, a.count > 0 && len(a.as) == 0
}

func (a *annotationRecorder) Values() []ts.Annotation {
if len(a.as) == 0 {
if a.as == nil {
a.as = make([]ts.Annotation, 0, a.count)
}

for i := 0; i < a.count; i++ {
a.as = append(a.as, a.a)
}
}

return a.as
}

func (a *annotationRecorder) record(annotation ts.Annotation) {
a.count++
if a.count == 1 {
a.a = annotation
return
}

// NB: annotation has already changed in this dataset.
if len(a.as) > 0 {
a.as = append(a.as, annotation)
return
}

// NB: same annotation as previously recorded; skip.
if bytes.Equal(a.a, annotation) {
return
}

if a.as == nil {
a.as = make([]ts.Annotation, 0, a.count)
}

for i := 0; i < a.count-1; i++ {
a.as = append(a.as, a.a)
}

a.as = append(a.as, annotation)
}

func (a *annotationRecorder) reset() {
a.count = 0
for i := range a.as {
a.as[i] = nil
}

a.as = a.as[:0]
}
108 changes: 108 additions & 0 deletions src/dbnode/encoding/tile/frame_annotations_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package tile

import (
"testing"

"github.com/m3db/m3/src/dbnode/ts"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func a(s string) ts.Annotation { return ts.Annotation(s) }

func annotationEqual(t *testing.T, expected string, actual ts.Annotation) {
assert.Equal(t, expected, string(actual))
}

func annotationsEqual(t *testing.T, expected []string, actual []ts.Annotation) {
require.Equal(t, len(expected), len(actual))
for i, ex := range expected {
annotationEqual(t, ex, actual[i])
}
}

func TestSeriesFrameAnnotationsSingle(t *testing.T) {
rec := newAnnotationRecorder()

_, ok := rec.SingleValue()
assert.False(t, ok)

rec.record(a("foo"))
v, ok := rec.SingleValue()
assert.True(t, ok)
annotationEqual(t, "foo", v)

rec.record(a("foo"))
v, ok = rec.SingleValue()
assert.True(t, ok)
annotationEqual(t, "foo", v)

vals := rec.Values()
annotationsEqual(t, []string{"foo", "foo"}, vals)

rec.reset()
_, ok = rec.SingleValue()
assert.False(t, ok)
}

func TestSeriesFrameAnnotationsMultiple(t *testing.T) {
rec := newAnnotationRecorder()
rec.record(a("foo"))
rec.record(a("foo"))
rec.record(a("bar"))

_, ok := rec.SingleValue()
assert.False(t, ok)

vals := rec.Values()
annotationsEqual(t, []string{"foo", "foo", "bar"}, vals)

rec.reset()
_, ok = rec.SingleValue()
assert.False(t, ok)

vals = rec.Values()
require.Equal(t, 0, len(vals))
}

func TestSeriesFrameAnnotationsMultipleChanges(t *testing.T) {
rec := newAnnotationRecorder()
rec.record(a("foo"))
rec.record(a("bar"))
rec.record(a("baz"))
rec.record(a("foo"))

_, ok := rec.SingleValue()
assert.False(t, ok)

vals := rec.Values()
annotationsEqual(t, []string{"foo", "bar", "baz", "foo"}, vals)

rec.reset()
_, ok = rec.SingleValue()
assert.False(t, ok)

vals = rec.Values()
require.Equal(t, 0, len(vals))
}
89 changes: 89 additions & 0 deletions src/dbnode/encoding/tile/frame_units.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package tile

import (
xtime "github.com/m3db/m3/src/x/time"
)

type unitRecorder struct {
count int
u xtime.Unit
us []xtime.Unit
}

var _ SeriesFrameUnits = (*unitRecorder)(nil)

func newUnitRecorder() *unitRecorder {
return &unitRecorder{}
}

func (u *unitRecorder) SingleValue() (xtime.Unit, bool) {
return u.u, u.count > 0 && len(u.us) == 0
}

func (u *unitRecorder) Values() []xtime.Unit {
if len(u.us) == 0 {
if u.us == nil {
u.us = make([]xtime.Unit, 0, u.count)
}

for i := 0; i < u.count; i++ {
u.us = append(u.us, u.u)
}
}

return u.us
}

func (u *unitRecorder) record(unit xtime.Unit) {
u.count++
if u.count == 1 {
u.u = unit
return
}

// NB: unit has already changed in this dataset.
if len(u.us) > 0 {
u.us = append(u.us, unit)
return
}

// NB: same unit as previously recorded; skip.
if u.u == unit {
return
}

if u.us == nil {
u.us = make([]xtime.Unit, 0, u.count)
}

for i := 0; i < u.count-1; i++ {
u.us = append(u.us, u.u)
}

u.us = append(u.us, unit)
}

func (u *unitRecorder) reset() {
u.count = 0
u.us = u.us[:0]
}
95 changes: 95 additions & 0 deletions src/dbnode/encoding/tile/frame_units_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package tile

import (
"testing"

xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSeriesFrameUnitsSingle(t *testing.T) {
rec := newUnitRecorder()

_, ok := rec.SingleValue()
assert.False(t, ok)

rec.record(xtime.Second)
v, ok := rec.SingleValue()
assert.True(t, ok)
assert.Equal(t, xtime.Second, v)

rec.record(xtime.Second)
v, ok = rec.SingleValue()
assert.True(t, ok)
assert.Equal(t, xtime.Second, v)

vals := rec.Values()
assert.Equal(t, []xtime.Unit{xtime.Second, xtime.Second}, vals)

rec.reset()
_, ok = rec.SingleValue()
assert.False(t, ok)
}

func TestSeriesFrameUnitsMultiple(t *testing.T) {
rec := newUnitRecorder()
rec.record(xtime.Second)
rec.record(xtime.Second)
rec.record(xtime.Day)
rec.record(xtime.Second)

_, ok := rec.SingleValue()
assert.False(t, ok)

vals := rec.Values()
assert.Equal(t, []xtime.Unit{xtime.Second, xtime.Second, xtime.Day, xtime.Second}, vals)

rec.reset()
_, ok = rec.SingleValue()
assert.False(t, ok)

vals = rec.Values()
require.Equal(t, 0, len(vals))
}

func TestSeriesFrameUnitsMultipleChanges(t *testing.T) {
rec := newUnitRecorder()
rec.record(xtime.Second)
rec.record(xtime.Day)
rec.record(xtime.Nanosecond)

_, ok := rec.SingleValue()
assert.False(t, ok)

vals := rec.Values()
assert.Equal(t, []xtime.Unit{xtime.Second, xtime.Day, xtime.Nanosecond}, vals)

rec.reset()
_, ok = rec.SingleValue()
assert.False(t, ok)

vals = rec.Values()
require.Equal(t, 0, len(vals))
}
Loading

0 comments on commit a66fb7d

Please sign in to comment.