diff --git a/output/cloud/expv2/flush.go b/output/cloud/expv2/flush.go index 888e133acf36..7e19480de60f 100644 --- a/output/cloud/expv2/flush.go +++ b/output/cloud/expv2/flush.go @@ -112,7 +112,11 @@ func (f *metricsFlusher) flushBatches(batches []metricSetBuilder) error { default: } if err := f.push(chunk[i]); err != nil { - errs <- err + select { + case errs <- err: + case <-stop: + return + } } } }(batches[offset:end]) diff --git a/output/cloud/expv2/flush_test.go b/output/cloud/expv2/flush_test.go index cdfab5d990a2..4c56a578d90f 100644 --- a/output/cloud/expv2/flush_test.go +++ b/output/cloud/expv2/flush_test.go @@ -1,6 +1,7 @@ package expv2 import ( + "errors" "strconv" "sync" "sync/atomic" @@ -39,6 +40,7 @@ func TestMetricSetBuilderAddTimeBucket(t *testing.T) { require.Len(t, msb.MetricSet.Metrics, 1) assert.Len(t, msb.MetricSet.Metrics[0].TimeSeries, 1) } + func TestMetricsFlusherFlushInBatchWithinBucket(t *testing.T) { t.Parallel() @@ -89,11 +91,11 @@ func TestMetricsFlusherFlushInBatchAcrossBuckets(t *testing.T) { t.Parallel() testCases := []struct { - series int - expFlushCalls int + series int + expPushCalls int }{ - {series: 5, expFlushCalls: 2}, - {series: 2, expFlushCalls: 1}, + {series: 5, expPushCalls: 2}, + {series: 2, expPushCalls: 1}, } r := metrics.NewRegistry() @@ -131,7 +133,7 @@ func TestMetricsFlusherFlushInBatchAcrossBuckets(t *testing.T) { err := mf.flush() require.NoError(t, err) - assert.Equal(t, tc.expFlushCalls, pm.timesCalled()) + assert.Equal(t, tc.expPushCalls, pm.timesCalled()) } } @@ -214,7 +216,10 @@ func TestFlushWithReservedLabels(t *testing.T) { } type pusherMock struct { - hook func(*pbcloud.MetricSet) + // hook is called when the push method is called. + hook func(*pbcloud.MetricSet) + // errFn if this defined, it is called instead + errFn func() error pushCalled int64 } @@ -228,5 +233,60 @@ func (pm *pusherMock) push(ms *pbcloud.MetricSet) error { } atomic.AddInt64(&pm.pushCalled, 1) + + if pm.errFn != nil { + return pm.errFn() + } + return nil } + +func TestMetricsFlusherErrorCase(t *testing.T) { + t.Parallel() + + r := metrics.NewRegistry() + m1 := r.MustNewMetric("metric1", metrics.Counter) + + logger, _ := testutils.NewLoggerWithHook(t) + + bq := &bucketQ{} + pm := &pusherMock{ + errFn: func() error { + return errors.New("some error") + }, + } + mf := metricsFlusher{ + bq: bq, + client: pm, + logger: logger, + discardedLabels: make(map[string]struct{}), + maxSeriesInBatch: 3, + batchPushConcurrency: 5, + } + + series := 5 + expPushCalls := 2 + + bq.buckets = make([]timeBucket, 0, series) + for i := 0; i < series; i++ { + ts := metrics.TimeSeries{ + Metric: m1, + Tags: r.RootTagSet().With("key1", "val"+strconv.Itoa(i)), + } + bq.Push([]timeBucket{ + { + Time: int64(i) + 1, + Sinks: map[metrics.TimeSeries]metricValue{ + ts: &counter{Sum: float64(1)}, + }, + }, + }) + } + require.Len(t, bq.buckets, series) + + err := mf.flush() + require.Error(t, err) + // since the push happens concurrently the number of the calls can be less + // than expected + assert.LessOrEqual(t, expPushCalls, pm.timesCalled()) +} diff --git a/output/cloud/expv2/output.go b/output/cloud/expv2/output.go index ec1e5a79a96f..d6befb602279 100644 --- a/output/cloud/expv2/output.go +++ b/output/cloud/expv2/output.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "net/http" - "runtime" "strconv" "sync" "time" @@ -115,8 +114,7 @@ func (o *Output) Start() error { maxSeriesInBatch: int(o.config.MaxTimeSeriesInBatch.Int64), // TODO: when the migration from v1 is over // change the default of cloudapi.MetricPushConcurrency to use GOMAXPROCS(0) - // batchPushConcurrency: int(o.config.MetricPushConcurrency.Int64), - batchPushConcurrency: runtime.GOMAXPROCS(0), + batchPushConcurrency: int(o.config.MetricPushConcurrency.Int64), } o.runFlushWorkers() @@ -185,31 +183,25 @@ func (o *Output) StopWithTestError(_ error) error { func (o *Output) runFlushWorkers() { t := time.NewTicker(o.config.MetricPushInterval.TimeDuration()) - // TODO: drop it when we are sure of the new proposed architecture - // workers := o.config.MetricPushConcurrency.Int64 - // Details: https://github.com/grafana/k6/issues/3192 - workers := 1 + o.wg.Add(1) - for i := 0; i < workers; i++ { - o.wg.Add(1) - go func() { - defer func() { - t.Stop() - o.wg.Done() - }() + go func() { + defer func() { + t.Stop() + o.wg.Done() + }() - for { - select { - case <-t.C: - o.flushMetrics() - case <-o.stop: - return - case <-o.abort: - return - } + for { + select { + case <-t.C: + o.flushMetrics() + case <-o.stop: + return + case <-o.abort: + return } - }() - } + } + }() } // AddMetricSamples receives the samples streaming. diff --git a/output/cloud/expv2/output_test.go b/output/cloud/expv2/output_test.go index 2ad87add7dd1..ba14570e75ef 100644 --- a/output/cloud/expv2/output_test.go +++ b/output/cloud/expv2/output_test.go @@ -305,7 +305,7 @@ func TestOutputStopWithTestError(t *testing.T) { require.NoError(t, o.StopWithTestError(errors.New("an error"))) } -func TestOutputFlushMetricsConcurrently(t *testing.T) { +func TestOutputFlushTicks(t *testing.T) { t.Parallel() done := make(chan struct{}) @@ -321,12 +321,10 @@ func TestOutputFlushMetricsConcurrently(t *testing.T) { close(done) return } - <-done } o := Output{logger: testutils.NewLogger(t)} - o.config.MetricPushConcurrency = null.IntFrom(2) - o.config.MetricPushInterval = types.NullDurationFrom(1) // loop + o.config.MetricPushInterval = types.NullDurationFrom(100 * time.Millisecond) // loop o.flushing = flusherFunc(flusherMock) o.runFlushWorkers()