Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Optional value decrease tolerance in M3TSZ decoder #3876

Merged
merged 3 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions src/dbnode/encoding/encoding_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion src/dbnode/encoding/m3tsz/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,14 @@ func (it *readerIterator) Next() bool {
if !it.intOptimized || it.isFloat {
it.curr.Value = math.Float64frombits(it.floatIter.PrevFloatBits)
} else {
it.curr.Value = convertFromIntFloat(it.intVal, it.mult)
prevValue := it.curr.Value
currValue := convertFromIntFloat(it.intVal, it.mult)
decreaseTolerance, toleranceUntil := it.opts.ValueDecreaseTolerance()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You may want to allow toleranceUntil to be zero and if so then always apply? Just thinking instead of needing to set it to MaxTime or something if you want it on for good.

e.g.
(toleranceUntil.IsZero() || it.curr.TimestampNanos.Before(toleranceUntil))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be set to something like 2099-12-31 for such use cases. Don't want to add any more conditionals there.

if decreaseTolerance > 0 && it.curr.TimestampNanos.Before(toleranceUntil) &&
!first && currValue < prevValue && currValue > prevValue*(1-decreaseTolerance) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there some advantage of using prevValue*(1-decreaseTolerance) and not simply delta? Like if current value is smaller then prev value by 0.00001 we say it's the same value?

With multiplication depending on how large the value is different can be bigger. Maybe that's the intention?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolute delta would not work when your actual values themselves are smaller than that delta.

currValue = prevValue
}
it.curr.Value = currValue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: to me a more readable approach would be:

decreaseTolerance, toleranceUntil := it.opts.ValueDecreaseTolerance()
if decreaseTolerance > 0 {
  // do new logic
} else {
  it.curr.Value = currValue
}

I guess it's not idiomatic Go code, but it clearly separates 2 code paths - when tolerance is set and when it is not.

}

return it.hasNext()
Expand Down
92 changes: 92 additions & 0 deletions src/dbnode/encoding/m3tsz/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/x/context"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -410,3 +412,93 @@ func TestReaderIteratorDecodingRegression(t *testing.T) {

require.NoError(t, it.Err())
}

func TestReaderIteratorDecodingDecreaseTolerance(t *testing.T) {
now := xtime.Now().Truncate(time.Hour)
tests := []struct {
name string
given []float64
tolerance float64
until xtime.UnixNano
want []float64
}{
{
name: "no tolerance",
given: []float64{187.80131100000006, 187.801311, 187.80131100000006, 187.801311, 200, 199.99},
tolerance: 0,
until: 0,
want: []float64{187.80131100000006, 187.801311, 187.80131100000006, 187.801311, 200, 199.99},
},
{
name: "low tolerance",
given: []float64{187.80131100000006, 187.801311, 187.80131100000006, 187.801311, 200, 199.99},
tolerance: 0.00000001,
until: now.Add(time.Hour),
want: []float64{187.80131100000006, 187.80131100000006, 187.80131100000006, 187.80131100000006, 200, 199.99},
},
{
name: "high tolerance",
given: []float64{187.80131100000006, 187.801311, 187.80131100000006, 187.801311, 200, 199.99},
tolerance: 0.0001,
until: now.Add(time.Hour),
want: []float64{187.80131100000006, 187.80131100000006, 187.80131100000006, 187.80131100000006, 200, 200},
},
{
name: "tolerance expired",
given: []float64{200, 199.99, 200, 199.99, 200, 199.99},
tolerance: 0.0001,
until: now,
want: []float64{200, 199.99, 200, 199.99, 200, 199.99},
},
{
name: "tolerance expires in the middle",
given: []float64{200, 199.99, 200, 199.99, 200, 199.99},
tolerance: 0.0001,
until: now.Add(3 * time.Minute),
want: []float64{200, 200, 200, 199.99, 200, 199.99},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testReaderIteratorDecodingDecreaseTolerance(t, now, tt.given, tt.want, tt.tolerance, tt.until)
})
}
}

func testReaderIteratorDecodingDecreaseTolerance(
t *testing.T,
now xtime.UnixNano,
input []float64,
expectedOutput []float64,
decreaseTolerance float64,
toleranceUntil xtime.UnixNano,
) {
ctx := context.NewBackground()
defer ctx.Close()

enc := NewEncoder(testStartTime, nil, true, nil)
for _, v := range input {
dp := ts.Datapoint{TimestampNanos: now, Value: v}
err := enc.Encode(dp, xtime.Second, nil)
require.NoError(t, err)
now = now.Add(time.Minute)
}

stream, ok := enc.Stream(ctx)
require.True(t, ok)

opts := encoding.NewOptions().
SetValueDecreaseTolerance(decreaseTolerance).
SetValueDecreaseToleranceUntil(toleranceUntil)
dec := NewDecoder(true, opts)
it := dec.Decode(stream)
defer it.Close()

for i, expected := range expectedOutput {
require.True(t, it.Next())
dp, _, _ := it.Current()
assert.Equal(t, expected, dp.Value, "datapoint #%d", i)
}
require.NoError(t, it.Err())
}
19 changes: 19 additions & 0 deletions src/dbnode/encoding/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type options struct {
iStreamReaderSizeM3TSZ int
iStreamReaderSizeProto int
metrics Metrics

valueDecreaseTolerance float64
valueDecreaseToleranceUntil xtime.UnixNano
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do you think maybe wrapping these under struct would be better? And make it optional. So that if its nil then decrease tolerance logic is not applied? Otherwise if its specified then it must not be 0 for both fields.

}

func newOptions() Options {
Expand Down Expand Up @@ -191,3 +194,19 @@ func (o *options) SetMetrics(value Metrics) Options {
func (o *options) Metrics() Metrics {
return o.metrics
}

func (o *options) SetValueDecreaseTolerance(value float64) Options {
opts := *o
opts.valueDecreaseTolerance = value
return &opts
}

func (o *options) SetValueDecreaseToleranceUntil(value xtime.UnixNano) Options {
opts := *o
opts.valueDecreaseToleranceUntil = value
return &opts
}

func (o *options) ValueDecreaseTolerance() (float64, xtime.UnixNano) {
return o.valueDecreaseTolerance, o.valueDecreaseToleranceUntil
}
13 changes: 11 additions & 2 deletions src/dbnode/encoding/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ type Options interface {
// for proto encoding iteration.
SetIStreamReaderSizeProto(value int) Options

// SetIStreamReaderSizeProto returns the IStream bufio reader size
// IStreamReaderSizeProto returns the IStream bufio reader size
// for proto encoding iteration.
IStreamReaderSizeProto() int

Expand All @@ -171,6 +171,15 @@ type Options interface {

// Metrics returns the encoding metrics.
Metrics() Metrics

// SetValueDecreaseTolerance sets relative tolerance against decoded time series value decrease.
SetValueDecreaseTolerance(value float64) Options

// SetValueDecreaseToleranceUntil sets the timestamp (exclusive) until which the tolerance applies.
SetValueDecreaseToleranceUntil(value xtime.UnixNano) Options
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to have a timestamp until the tolerance applies? Does this mean that it should be set on each iteration?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be set to the date when #3872 is deployed.


// ValueDecreaseTolerance returns relative tolerance against decoded time series value decrease.
ValueDecreaseTolerance() (float64, xtime.UnixNano)
}

// Iterator is the generic interface for iterating over encoded data.
Expand Down Expand Up @@ -210,7 +219,7 @@ type MultiReaderIterator interface {
Reset(readers []xio.SegmentReader, start xtime.UnixNano,
blockSize time.Duration, schema namespace.SchemaDescr)

// Reset resets the iterator to read from a slice of slice readers
// ResetSliceOfSlices resets the iterator to read from a slice of slice readers
// with a new schema (for schema aware iterators).
ResetSliceOfSlices(
readers xio.ReaderSliceOfSlicesIterator,
Expand Down