Skip to content

Commit

Permalink
A better error handling and test the case
Browse files Browse the repository at this point in the history
  • Loading branch information
codebien authored and olegbespalov committed Jul 20, 2023
1 parent 739739a commit e2f4dc6
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 36 deletions.
6 changes: 5 additions & 1 deletion output/cloud/expv2/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ func (f *metricsFlusher) flushBatches(batches []metricSetBuilder) error {
default:
}
if err := f.push(chunk[i]); err != nil {
errs <- err
select {
case errs <- err:
case <-stop:
return
}
}
}
}(batches[offset:end])
Expand Down
73 changes: 67 additions & 6 deletions output/cloud/expv2/flush_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package expv2

import (
"errors"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -39,6 +40,7 @@ func TestMetricSetBuilderAddTimeBucket(t *testing.T) {
require.Len(t, msb.MetricSet.Metrics, 1)
assert.Len(t, msb.MetricSet.Metrics[0].TimeSeries, 1)
}

func TestMetricsFlusherFlushInBatchWithinBucket(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -89,11 +91,11 @@ func TestMetricsFlusherFlushInBatchAcrossBuckets(t *testing.T) {
t.Parallel()

testCases := []struct {
series int
expFlushCalls int
series int
expPushCalls int
}{
{series: 5, expFlushCalls: 2},
{series: 2, expFlushCalls: 1},
{series: 5, expPushCalls: 2},
{series: 2, expPushCalls: 1},
}

r := metrics.NewRegistry()
Expand Down Expand Up @@ -131,7 +133,7 @@ func TestMetricsFlusherFlushInBatchAcrossBuckets(t *testing.T) {

err := mf.flush()
require.NoError(t, err)
assert.Equal(t, tc.expFlushCalls, pm.timesCalled())
assert.Equal(t, tc.expPushCalls, pm.timesCalled())
}
}

Expand Down Expand Up @@ -214,7 +216,10 @@ func TestFlushWithReservedLabels(t *testing.T) {
}

type pusherMock struct {
hook func(*pbcloud.MetricSet)
// hook is called when the push method is called.
hook func(*pbcloud.MetricSet)
// errFn if this defined, it is called instead
errFn func() error
pushCalled int64
}

Expand All @@ -228,5 +233,61 @@ func (pm *pusherMock) push(ms *pbcloud.MetricSet) error {
}

atomic.AddInt64(&pm.pushCalled, 1)

if pm.errFn != nil {
return pm.errFn()
}

return nil
}

func TestMetricsFlusherErrorCase(t *testing.T) {
t.Parallel()

r := metrics.NewRegistry()
m1 := r.MustNewMetric("metric1", metrics.Counter)

logger, _ := testutils.NewLoggerWithHook(t)

bq := &bucketQ{}
pm := &pusherMock{
errFn: func() error {
return errors.New("some error")
},
}
mf := metricsFlusher{
bq: bq,
client: pm,
logger: logger,
discardedLabels: make(map[string]struct{}),
maxSeriesInBatch: 3,
batchPushConcurrency: 2,
}

series := 7

bq.buckets = make([]timeBucket, 0, series)
for i := 0; i < series; i++ {
ts := metrics.TimeSeries{
Metric: m1,
Tags: r.RootTagSet().With("key1", "val"+strconv.Itoa(i)),
}
bq.Push([]timeBucket{
{
Time: int64(i) + 1,
Sinks: map[metrics.TimeSeries]metricValue{
ts: &counter{Sum: float64(1)},
},
},
})
}
require.Len(t, bq.buckets, series)

err := mf.flush()
require.Error(t, err)
// since the push happens concurrently the number of the calls could vary,
// but at least one call should happen and it should be less than the
// batchPushConcurrency
assert.LessOrEqual(t, pm.timesCalled(), mf.batchPushConcurrency)
assert.GreaterOrEqual(t, pm.timesCalled(), 1)
}
42 changes: 17 additions & 25 deletions output/cloud/expv2/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"net/http"
"runtime"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -115,8 +114,7 @@ func (o *Output) Start() error {
maxSeriesInBatch: int(o.config.MaxTimeSeriesInBatch.Int64),
// TODO: when the migration from v1 is over
// change the default of cloudapi.MetricPushConcurrency to use GOMAXPROCS(0)
// batchPushConcurrency: int(o.config.MetricPushConcurrency.Int64),
batchPushConcurrency: runtime.GOMAXPROCS(0),
batchPushConcurrency: int(o.config.MetricPushConcurrency.Int64),
}

o.runFlushWorkers()
Expand Down Expand Up @@ -185,31 +183,25 @@ func (o *Output) StopWithTestError(_ error) error {
func (o *Output) runFlushWorkers() {
t := time.NewTicker(o.config.MetricPushInterval.TimeDuration())

// TODO: drop it when we are sure of the new proposed architecture
// workers := o.config.MetricPushConcurrency.Int64
// Details: https://github.com/grafana/k6/issues/3192
workers := 1
o.wg.Add(1)

for i := 0; i < workers; i++ {
o.wg.Add(1)
go func() {
defer func() {
t.Stop()
o.wg.Done()
}()
go func() {
defer func() {
t.Stop()
o.wg.Done()
}()

for {
select {
case <-t.C:
o.flushMetrics()
case <-o.stop:
return
case <-o.abort:
return
}
for {
select {
case <-t.C:
o.flushMetrics()
case <-o.stop:
return
case <-o.abort:
return
}
}()
}
}
}()
}

// AddMetricSamples receives the samples streaming.
Expand Down
6 changes: 2 additions & 4 deletions output/cloud/expv2/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func TestOutputStopWithTestError(t *testing.T) {
require.NoError(t, o.StopWithTestError(errors.New("an error")))
}

func TestOutputFlushMetricsConcurrently(t *testing.T) {
func TestOutputFlushTicks(t *testing.T) {
t.Parallel()

done := make(chan struct{})
Expand All @@ -321,12 +321,10 @@ func TestOutputFlushMetricsConcurrently(t *testing.T) {
close(done)
return
}
<-done
}

o := Output{logger: testutils.NewLogger(t)}
o.config.MetricPushConcurrency = null.IntFrom(2)
o.config.MetricPushInterval = types.NullDurationFrom(1) // loop
o.config.MetricPushInterval = types.NullDurationFrom(100 * time.Millisecond) // loop
o.flushing = flusherFunc(flusherMock)
o.runFlushWorkers()

Expand Down

0 comments on commit e2f4dc6

Please sign in to comment.