Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

output/cloudv2: Optimized metric sinks #3085

Merged
merged 8 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 11 additions & 17 deletions output/cloud/expv2/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type timeBucket struct {
// TODO: for performance reasons, use directly a unix time here
// so we can avoid time->unix->time
Time time.Time
Sinks map[metrics.TimeSeries]metrics.Sink
Sinks map[metrics.TimeSeries]metricValue
}

// bucketQ is a queue for buffering the aggregated metrics
Expand Down Expand Up @@ -56,15 +56,12 @@ type collector struct {
aggregationPeriod time.Duration
waitPeriod time.Duration

// We should no longer have to handle metrics that have times long in the past.
// So, instead of a map, we can probably use a simple slice (or even an array!)
// as a ring buffer to store the aggregation buckets.
// This should save us a some time, since it would make the lookups and
// WaitPeriod checks basically O(1).
// And even if for some reason there are occasional metrics with past times
// that don't fit in the chosen ring buffer size, we could just send them
// along to the buffer unaggregated.
timeBuckets map[int64]map[metrics.TimeSeries]metrics.Sink
// we should no longer have to handle metrics that have times long in the past. So instead of a
// map, we can probably use a simple slice (or even an array!) as a ring buffer to store the
// aggregation buckets. This should save us a some time, since it would make the lookups and WaitPeriod
// checks basically O(1). And even if for some reason there are occasional metrics with past times that
// don't fit in the chosen ring buffer size, we could just send them along to the buffer unaggregated
timeBuckets map[int64]map[metrics.TimeSeries]metricValue
}

func newCollector(aggrPeriod, waitPeriod time.Duration) (*collector, error) {
Expand All @@ -80,7 +77,7 @@ func newCollector(aggrPeriod, waitPeriod time.Duration) (*collector, error) {
return &collector{
bq: bucketQ{},
nowFunc: time.Now,
timeBuckets: make(map[int64]map[metrics.TimeSeries]metrics.Sink),
timeBuckets: make(map[int64]map[metrics.TimeSeries]metricValue),
aggregationPeriod: aggrPeriod,
waitPeriod: waitPeriod,
}, nil
Expand Down Expand Up @@ -111,21 +108,18 @@ func (c *collector) collectSample(s metrics.Sample) {
// Get or create a time bucket
bucket, ok := c.timeBuckets[bucketID]
if !ok {
bucket = make(map[metrics.TimeSeries]metrics.Sink)
bucket = make(map[metrics.TimeSeries]metricValue)
c.timeBuckets[bucketID] = bucket
}

// Get or create the bucket's sinks map per time series
sink, ok := bucket[s.TimeSeries]
if !ok {
sink = newSink(s.Metric.Type)
sink = newMetricValue(s.Metric.Type)
bucket[s.TimeSeries] = sink
}

// TODO: we may consider to just pass
// the single value instead of the entire
// sample and save some memory
sink.Add(s)
sink.Add(s.Value)
}

func (c *collector) expiredBuckets() []timeBucket {
Expand Down
40 changes: 21 additions & 19 deletions output/cloud/expv2/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestCollectorCollectSample(t *testing.T) {
c := collector{
aggregationPeriod: 3 * time.Second,
waitPeriod: 1 * time.Second,
timeBuckets: make(map[int64]map[metrics.TimeSeries]metrics.Sink),
timeBuckets: make(map[int64]map[metrics.TimeSeries]metricValue),
nowFunc: func() time.Time {
return time.Unix(31, 0)
},
Expand Down Expand Up @@ -55,7 +55,7 @@ func TestCollectorCollectSampleAggregateNumbers(t *testing.T) {
c := collector{
aggregationPeriod: 3 * time.Second,
waitPeriod: 1 * time.Second,
timeBuckets: make(map[int64]map[metrics.TimeSeries]metrics.Sink),
timeBuckets: make(map[int64]map[metrics.TimeSeries]metricValue),
nowFunc: func() time.Time {
return time.Unix(31, 0)
},
Expand Down Expand Up @@ -83,9 +83,9 @@ func TestCollectorCollectSampleAggregateNumbers(t *testing.T) {
assert.Contains(t, c.timeBuckets, int64(3))
assert.Contains(t, c.timeBuckets, int64(4))

sink, ok := c.timeBuckets[4][ts].(*metrics.CounterSink)
sink, ok := c.timeBuckets[4][ts].(*counter)
require.True(t, ok)
assert.Equal(t, 7.0, sink.Value)
assert.Equal(t, 7.0, sink.Sum)
}

func TestDropExpiringDelay(t *testing.T) {
Expand All @@ -105,7 +105,7 @@ func TestCollectorExpiredBucketsNoExipired(t *testing.T) {
nowFunc: func() time.Time {
return time.Unix(10, 0)
},
timeBuckets: map[int64]map[metrics.TimeSeries]metrics.Sink{
timeBuckets: map[int64]map[metrics.TimeSeries]metricValue{
6: {},
},
}
Expand Down Expand Up @@ -134,21 +134,23 @@ func TestCollectorExpiredBuckets(t *testing.T) {
nowFunc: func() time.Time {
return time.Unix(10, 0)
},
timeBuckets: map[int64]map[metrics.TimeSeries]metrics.Sink{
timeBuckets: map[int64]map[metrics.TimeSeries]metricValue{
3: {
ts1: &metrics.CounterSink{Value: 10},
ts2: &metrics.CounterSink{Value: 4},
ts1: &counter{Sum: 10},
ts2: &counter{Sum: 4},
},
},
}
expired := c.expiredBuckets()
require.Len(t, expired, 1)

assert.NotZero(t, expired[0].Time)
assert.Equal(t, expired[0].Sinks, map[metrics.TimeSeries]metrics.Sink{
ts1: &metrics.CounterSink{Value: 10},
ts2: &metrics.CounterSink{Value: 4},
})

exp := map[metrics.TimeSeries]metricValue{
ts1: &counter{Sum: 10},
ts2: &counter{Sum: 4},
}
assert.Equal(t, exp, expired[0].Sinks)
}

func TestCollectorExpiredBucketsCutoff(t *testing.T) {
Expand All @@ -160,7 +162,7 @@ func TestCollectorExpiredBucketsCutoff(t *testing.T) {
nowFunc: func() time.Time {
return time.Unix(10, 0)
},
timeBuckets: map[int64]map[metrics.TimeSeries]metrics.Sink{
timeBuckets: map[int64]map[metrics.TimeSeries]metricValue{
3: {},
6: {},
9: {},
Expand Down Expand Up @@ -249,9 +251,9 @@ func TestBucketQPopAll(t *testing.T) {
func TestBucketQPushPopConcurrency(t *testing.T) {
t.Parallel()
var (
counter = 0
bq = bucketQ{}
sink = metrics.NewSink(metrics.Counter)
count = 0
bq = bucketQ{}
sink = &counter{}

stop = time.After(100 * time.Millisecond)
pop = make(chan struct{}, 10)
Expand All @@ -278,17 +280,17 @@ func TestBucketQPushPopConcurrency(t *testing.T) {
close(done)
return
default:
counter++
count++
bq.Push([]timeBucket{
{
Time: now,
Sinks: map[metrics.TimeSeries]metrics.Sink{
Sinks: map[metrics.TimeSeries]metricValue{
{}: sink,
},
},
})

if counter%5 == 0 { // a fixed-arbitrary flush rate
if count%5 == 0 { // a fixed-arbitrary flush rate
pop <- struct{}{}
}
}
Expand Down
4 changes: 2 additions & 2 deletions output/cloud/expv2/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func TestMetricSetBuilderAddTimeBucket(t *testing.T) {

tb := timeBucket{
Time: time.Unix(1, 0),
Sinks: map[metrics.TimeSeries]metrics.Sink{
timeSeries: &metrics.CounterSink{},
Sinks: map[metrics.TimeSeries]metricValue{
timeSeries: &counter{},
},
}
msb := newMetricSetBuilder("testrunid-123", 1)
Expand Down
18 changes: 2 additions & 16 deletions output/cloud/expv2/hdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"math/bits"
"time"

"go.k6.io/k6/metrics"
"go.k6.io/k6/output/cloud/expv2/pbcloud"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -80,11 +79,6 @@ type histogram struct {
Count uint32
}

// newHistogram creates an histogram of the provided values.
func newHistogram() histogram {
return histogram{}
}

// addToBucket increments the counter of the bucket of the provided value.
// If the value is lower or higher than the trackable limits
// then it is counted into specific buckets. All the stats are also updated accordingly.
Expand Down Expand Up @@ -265,14 +259,6 @@ func resolveBucketIndex(val float64) uint32 {
return (nkdiff << k) + (upscaled >> nkdiff)
}

func (h *histogram) IsEmpty() bool {
return h.Count == 0
}

func (h *histogram) Add(s metrics.Sample) {
h.addToBucket(s.Value)
}

func (h *histogram) Format(time.Duration) map[string]float64 {
panic("output/cloud/expv2/histogram.Format is not expected to be called")
func (h *histogram) Add(v float64) {
h.addToBucket(v)
}
32 changes: 15 additions & 17 deletions output/cloud/expv2/hdr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestNewHistogramWithSimpleValue(t *testing.T) {
t.Parallel()

// Zero as value
res := newHistogram()
res := histogram{}
res.addToBucket(0)
exp := histogram{
Buckets: []uint32{1},
Expand All @@ -58,7 +58,7 @@ func TestNewHistogramWithSimpleValue(t *testing.T) {
require.Equal(t, exp, res)

// Add a lower bucket index within slice capacity
res = newHistogram()
res = histogram{}
res.addToBucket(8)
res.addToBucket(5)

Expand All @@ -76,7 +76,7 @@ func TestNewHistogramWithSimpleValue(t *testing.T) {
require.Equal(t, exp, res)

// Add a higher bucket index within slice capacity
res = newHistogram()
res = histogram{}
res.addToBucket(100)
res.addToBucket(101)

Expand All @@ -94,7 +94,7 @@ func TestNewHistogramWithSimpleValue(t *testing.T) {
require.Equal(t, exp, res)

// Same case but reversed test check
res = newHistogram()
res = histogram{}
res.addToBucket(101)
res.addToBucket(100)

Expand All @@ -112,7 +112,7 @@ func TestNewHistogramWithSimpleValue(t *testing.T) {
assert.Equal(t, exp, res)

// One more complex case with lower index and more than two indexes
res = newHistogram()
res = histogram{}
res.addToBucket(8)
res.addToBucket(9)
res.addToBucket(10)
Expand All @@ -136,7 +136,7 @@ func TestNewHistogramWithSimpleValue(t *testing.T) {
func TestNewHistogramWithUntrackables(t *testing.T) {
t.Parallel()

res := newHistogram()
res := histogram{}
for _, v := range []float64{5, -3.14, 2 * 1e9, 1} {
res.addToBucket(v)
}
Expand All @@ -158,7 +158,7 @@ func TestNewHistogramWithUntrackables(t *testing.T) {
func TestNewHistogramWithMultipleValues(t *testing.T) {
t.Parallel()

res := newHistogram()
res := histogram{}
for _, v := range []float64{51.8, 103.6, 103.6, 103.6, 103.6} {
res.addToBucket(v)
}
Expand All @@ -181,7 +181,7 @@ func TestNewHistogramWithMultipleValues(t *testing.T) {
func TestNewHistogramWithNegativeNum(t *testing.T) {
t.Parallel()

res := newHistogram()
res := histogram{}
res.addToBucket(-2.42314)

exp := histogram{
Expand All @@ -199,7 +199,7 @@ func TestNewHistogramWithNegativeNum(t *testing.T) {

func TestNewHistogramWithMultipleNegativeNums(t *testing.T) {
t.Parallel()
res := newHistogram()
res := histogram{}
for _, v := range []float64{-0.001, -0.001, -0.001} {
res.addToBucket(v)
}
Expand All @@ -220,7 +220,7 @@ func TestNewHistogramWithMultipleNegativeNums(t *testing.T) {
func TestNewHistoramWithNoVals(t *testing.T) {
t.Parallel()

res := newHistogram()
res := histogram{}
exp := histogram{
Buckets: nil,
FirstNotZeroBucket: 0,
Expand Down Expand Up @@ -267,17 +267,14 @@ func TestHistogramAsProto(t *testing.T) {
cases := []struct {
name string
vals []float64
in histogram
exp *pbcloud.TrendHdrValue
}{
{
name: "empty histogram",
in: histogram{},
exp: &pbcloud.TrendHdrValue{},
},
{
name: "not trackable values",
in: newHistogram(),
vals: []float64{-0.23, 1<<30 + 1},
exp: &pbcloud.TrendHdrValue{
Count: 2,
Expand All @@ -287,11 +284,11 @@ func TestHistogramAsProto(t *testing.T) {
LowerCounterIndex: 0,
MinValue: -0.23,
MaxValue: 1<<30 + 1,
Sum: (1 << 30) + 1 - 0.23,
},
},
{
name: "normal values",
in: newHistogram(),
vals: []float64{2, 1.1, 3},
exp: &pbcloud.TrendHdrValue{
Count: 3,
Expand All @@ -301,18 +298,19 @@ func TestHistogramAsProto(t *testing.T) {
LowerCounterIndex: 2,
MinValue: 1.1,
MaxValue: 3,
Sum: 6.1,
},
},
}

for _, tc := range cases {
h := histogram{}
for _, v := range tc.vals {
tc.in.addToBucket(v)
h.addToBucket(v)
}
tc.exp.MinResolution = 1.0
tc.exp.SignificantDigits = 2
tc.exp.Time = &timestamppb.Timestamp{Seconds: 1}
tc.exp.Sum = tc.in.Sum
assert.Equal(t, tc.exp, histogramAsProto(&tc.in, time.Unix(1, 0)), tc.name)
assert.Equal(t, tc.exp, histogramAsProto(&h, time.Unix(1, 0)), tc.name)
}
}
4 changes: 3 additions & 1 deletion output/cloud/expv2/integration/testdata/metricset.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@
"time": "2023-05-01T02:00:00Z",
"last": 3.14,
"min": 3.14,
"max": 3.14
"max": 3.14,
"avg": 3.14,
"count": 1
}
]
}
Expand Down
Loading