From 77d3f921dd9f2014497d0386651cdac827320f88 Mon Sep 17 00:00:00 2001 From: codebien <2103732+codebien@users.noreply.github.com> Date: Thu, 13 Jul 2023 16:37:15 +0200 Subject: [PATCH 01/10] Set the flush operation as no-concurrent --- output/cloud/expv2/output.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/output/cloud/expv2/output.go b/output/cloud/expv2/output.go index a03a8f0bc0c..38d5078489b 100644 --- a/output/cloud/expv2/output.go +++ b/output/cloud/expv2/output.go @@ -178,9 +178,13 @@ func (o *Output) StopWithTestError(_ error) error { } func (o *Output) runFlushWorkers() { + // workers := o.config.MetricPushConcurrency.Int64 + // Details: https://github.com/grafana/k6/issues/3192 + workers := 1 + t := time.NewTicker(o.config.MetricPushInterval.TimeDuration()) - for i := int64(0); i < o.config.MetricPushConcurrency.Int64; i++ { + for i := 0; i < workers; i++ { o.wg.Add(1) go func() { defer func() { From 739739af21a0a52490ab99d009c1c3c459fb70d6 Mon Sep 17 00:00:00 2001 From: codebien <2103732+codebien@users.noreply.github.com> Date: Thu, 13 Jul 2023 23:41:58 +0200 Subject: [PATCH 02/10] Concurrent flush for batches --- output/cloud/expv2/flush.go | 83 ++++++++++++++++++++++++++------ output/cloud/expv2/flush_test.go | 52 ++++++++++++-------- output/cloud/expv2/output.go | 10 +++- 3 files changed, 107 insertions(+), 38 deletions(-) diff --git a/output/cloud/expv2/flush.go b/output/cloud/expv2/flush.go index 63cb3ab75db..888e133acf3 100644 --- a/output/cloud/expv2/flush.go +++ b/output/cloud/expv2/flush.go @@ -1,6 +1,8 @@ package expv2 import ( + "math" + "sync" "time" "github.com/sirupsen/logrus" @@ -21,6 +23,7 @@ type metricsFlusher struct { discardedLabels map[string]struct{} aggregationPeriodInSeconds uint32 maxSeriesInBatch int + batchPushConcurrency int } // flush flushes the queued buckets sending them to the remote Cloud service. @@ -43,9 +46,9 @@ func (f *metricsFlusher) flush() error { // the metricSetBuilder is used for doing it during the traverse of the buckets. var ( - seriesCount int - batchesCount int - start = time.Now() + start = time.Now() + batches []metricSetBuilder + seriesCount int ) defer func() { @@ -53,35 +56,83 @@ func (f *metricsFlusher) flush() error { WithField("t", time.Since(start)). WithField("series", seriesCount). WithField("buckets", len(buckets)). - WithField("batches", batchesCount).Debug("Flush the queued buckets") + WithField("batches", len(batches)).Debug("Flush the queued buckets") }() msb := newMetricSetBuilder(f.testRunID, f.aggregationPeriodInSeconds) for i := 0; i < len(buckets); i++ { for timeSeries, sink := range buckets[i].Sinks { msb.addTimeSeries(buckets[i].Time, timeSeries, sink) - if len(msb.seriesIndex) < f.maxSeriesInBatch { + if len(msb.seriesIndex) <= f.maxSeriesInBatch { continue } - // we hit the batch size, let's flush - batchesCount++ + // We hit the batch size, let's flush seriesCount += len(msb.seriesIndex) - if err := f.push(msb); err != nil { - return err - } + batches = append(batches, msb) + + // Reset the builder msb = newMetricSetBuilder(f.testRunID, f.aggregationPeriodInSeconds) } } - if len(msb.seriesIndex) < 1 { - return nil - } - // send the last (or the unique) MetricSet chunk to the remote service - batchesCount++ seriesCount += len(msb.seriesIndex) - return f.push(msb) + batches = append(batches, msb) + + return f.flushBatches(batches) +} + +func (f *metricsFlusher) flushBatches(batches []metricSetBuilder) error { + var ( + wg = sync.WaitGroup{} + errs = make(chan error) + done = make(chan struct{}) + stop = make(chan struct{}) + + workers = int(math.Min(float64(len(batches)), float64(f.batchPushConcurrency))) + chunkSize = len(batches) / workers + ) + + wg.Add(workers) + for workersIndex := 0; workersIndex < workers; workersIndex++ { + offset := (workersIndex * chunkSize) + end := offset + chunkSize + if workersIndex == workers-1 { + end = len(batches) + } + + go func(chunk []metricSetBuilder) { + defer wg.Done() + + for i := 0; i < len(chunk); i++ { + select { + case <-stop: + return + default: + } + if err := f.push(chunk[i]); err != nil { + errs <- err + } + } + }(batches[offset:end]) + } + + go func() { + defer close(done) + wg.Wait() + }() + + for { + select { + case err := <-errs: + close(stop) + <-done + return err + case <-done: + return nil + } + } } // push sends the metric set to the remote service. diff --git a/output/cloud/expv2/flush_test.go b/output/cloud/expv2/flush_test.go index 35847ef8dd8..cdfab5d990a 100644 --- a/output/cloud/expv2/flush_test.go +++ b/output/cloud/expv2/flush_test.go @@ -2,6 +2,8 @@ package expv2 import ( "strconv" + "sync" + "sync/atomic" "testing" "github.com/sirupsen/logrus" @@ -56,11 +58,12 @@ func TestMetricsFlusherFlushInBatchWithinBucket(t *testing.T) { bq := &bucketQ{} pm := &pusherMock{} mf := metricsFlusher{ - bq: bq, - client: pm, - logger: logger, - discardedLabels: make(map[string]struct{}), - maxSeriesInBatch: 3, + bq: bq, + client: pm, + logger: logger, + discardedLabels: make(map[string]struct{}), + maxSeriesInBatch: 3, + batchPushConcurrency: 5, } bq.buckets = make([]timeBucket, 0, tc.series) @@ -78,7 +81,7 @@ func TestMetricsFlusherFlushInBatchWithinBucket(t *testing.T) { bq.Push([]timeBucket{{Time: 1, Sinks: sinks}}) err := mf.flush() require.NoError(t, err) - assert.Equal(t, tc.expFlushCalls, pm.pushCalled) + assert.Equal(t, tc.expFlushCalls, pm.timesCalled()) } } @@ -101,11 +104,12 @@ func TestMetricsFlusherFlushInBatchAcrossBuckets(t *testing.T) { bq := &bucketQ{} pm := &pusherMock{} mf := metricsFlusher{ - bq: bq, - client: pm, - logger: logger, - discardedLabels: make(map[string]struct{}), - maxSeriesInBatch: 3, + bq: bq, + client: pm, + logger: logger, + discardedLabels: make(map[string]struct{}), + maxSeriesInBatch: 3, + batchPushConcurrency: 5, } bq.buckets = make([]timeBucket, 0, tc.series) @@ -127,7 +131,7 @@ func TestMetricsFlusherFlushInBatchAcrossBuckets(t *testing.T) { err := mf.flush() require.NoError(t, err) - assert.Equal(t, tc.expFlushCalls, pm.pushCalled) + assert.Equal(t, tc.expFlushCalls, pm.timesCalled()) } } @@ -136,21 +140,25 @@ func TestFlushWithReservedLabels(t *testing.T) { logger, hook := testutils.NewLoggerWithHook(t) + mutex := sync.Mutex{} collected := make([]*pbcloud.MetricSet, 0) bq := &bucketQ{} pm := &pusherMock{ hook: func(ms *pbcloud.MetricSet) { + mutex.Lock() collected = append(collected, ms) + mutex.Unlock() }, } mf := metricsFlusher{ - bq: bq, - client: pm, - maxSeriesInBatch: 2, - logger: logger, - discardedLabels: make(map[string]struct{}), + bq: bq, + client: pm, + maxSeriesInBatch: 2, + logger: logger, + discardedLabels: make(map[string]struct{}), + batchPushConcurrency: 5, } r := metrics.NewRegistry() @@ -186,7 +194,7 @@ func TestFlushWithReservedLabels(t *testing.T) { require.NoError(t, err) loglines := hook.Drain() - assert.Equal(t, 1, len(collected)) + require.Len(t, collected, 1) // check that warnings sown only once per label assert.Len(t, testutils.FilterEntries(loglines, logrus.WarnLevel, "Tag __name__ has been discarded since it is reserved for Cloud operations."), 1) @@ -207,7 +215,11 @@ func TestFlushWithReservedLabels(t *testing.T) { type pusherMock struct { hook func(*pbcloud.MetricSet) - pushCalled int + pushCalled int64 +} + +func (pm *pusherMock) timesCalled() int { + return int(atomic.LoadInt64(&pm.pushCalled)) } func (pm *pusherMock) push(ms *pbcloud.MetricSet) error { @@ -215,6 +227,6 @@ func (pm *pusherMock) push(ms *pbcloud.MetricSet) error { pm.hook(ms) } - pm.pushCalled++ + atomic.AddInt64(&pm.pushCalled, 1) return nil } diff --git a/output/cloud/expv2/output.go b/output/cloud/expv2/output.go index 38d5078489b..ec1e5a79a96 100644 --- a/output/cloud/expv2/output.go +++ b/output/cloud/expv2/output.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "net/http" + "runtime" "strconv" "sync" "time" @@ -112,6 +113,10 @@ func (o *Output) Start() error { discardedLabels: make(map[string]struct{}), aggregationPeriodInSeconds: uint32(o.config.AggregationPeriod.TimeDuration().Seconds()), maxSeriesInBatch: int(o.config.MaxTimeSeriesInBatch.Int64), + // TODO: when the migration from v1 is over + // change the default of cloudapi.MetricPushConcurrency to use GOMAXPROCS(0) + // batchPushConcurrency: int(o.config.MetricPushConcurrency.Int64), + batchPushConcurrency: runtime.GOMAXPROCS(0), } o.runFlushWorkers() @@ -178,12 +183,13 @@ func (o *Output) StopWithTestError(_ error) error { } func (o *Output) runFlushWorkers() { + t := time.NewTicker(o.config.MetricPushInterval.TimeDuration()) + + // TODO: drop it when we are sure of the new proposed architecture // workers := o.config.MetricPushConcurrency.Int64 // Details: https://github.com/grafana/k6/issues/3192 workers := 1 - t := time.NewTicker(o.config.MetricPushInterval.TimeDuration()) - for i := 0; i < workers; i++ { o.wg.Add(1) go func() { From 459cac977ff0f7cf0a6997558887d6bc5a307636 Mon Sep 17 00:00:00 2001 From: codebien <2103732+codebien@users.noreply.github.com> Date: Thu, 13 Jul 2023 23:49:23 +0200 Subject: [PATCH 03/10] A better error handling and test the case --- output/cloud/expv2/flush.go | 6 ++- output/cloud/expv2/flush_test.go | 74 ++++++++++++++++++++++++++++--- output/cloud/expv2/output.go | 42 +++++++----------- output/cloud/expv2/output_test.go | 6 +-- 4 files changed, 92 insertions(+), 36 deletions(-) diff --git a/output/cloud/expv2/flush.go b/output/cloud/expv2/flush.go index 888e133acf3..7e19480de60 100644 --- a/output/cloud/expv2/flush.go +++ b/output/cloud/expv2/flush.go @@ -112,7 +112,11 @@ func (f *metricsFlusher) flushBatches(batches []metricSetBuilder) error { default: } if err := f.push(chunk[i]); err != nil { - errs <- err + select { + case errs <- err: + case <-stop: + return + } } } }(batches[offset:end]) diff --git a/output/cloud/expv2/flush_test.go b/output/cloud/expv2/flush_test.go index cdfab5d990a..f1f481a7843 100644 --- a/output/cloud/expv2/flush_test.go +++ b/output/cloud/expv2/flush_test.go @@ -1,6 +1,7 @@ package expv2 import ( + "errors" "strconv" "sync" "sync/atomic" @@ -39,6 +40,7 @@ func TestMetricSetBuilderAddTimeBucket(t *testing.T) { require.Len(t, msb.MetricSet.Metrics, 1) assert.Len(t, msb.MetricSet.Metrics[0].TimeSeries, 1) } + func TestMetricsFlusherFlushInBatchWithinBucket(t *testing.T) { t.Parallel() @@ -89,11 +91,11 @@ func TestMetricsFlusherFlushInBatchAcrossBuckets(t *testing.T) { t.Parallel() testCases := []struct { - series int - expFlushCalls int + series int + expPushCalls int }{ - {series: 5, expFlushCalls: 2}, - {series: 2, expFlushCalls: 1}, + {series: 5, expPushCalls: 2}, + {series: 2, expPushCalls: 1}, } r := metrics.NewRegistry() @@ -131,7 +133,7 @@ func TestMetricsFlusherFlushInBatchAcrossBuckets(t *testing.T) { err := mf.flush() require.NoError(t, err) - assert.Equal(t, tc.expFlushCalls, pm.timesCalled()) + assert.Equal(t, tc.expPushCalls, pm.timesCalled()) } } @@ -214,7 +216,11 @@ func TestFlushWithReservedLabels(t *testing.T) { } type pusherMock struct { - hook func(*pbcloud.MetricSet) + // hook is called when the push method is called. + hook func(*pbcloud.MetricSet) + // errFn if this defined, it is called at the end of push + // and result error is returned. + errFn func() error pushCalled int64 } @@ -228,5 +234,61 @@ func (pm *pusherMock) push(ms *pbcloud.MetricSet) error { } atomic.AddInt64(&pm.pushCalled, 1) + + if pm.errFn != nil { + return pm.errFn() + } + return nil } + +func TestMetricsFlusherErrorCase(t *testing.T) { + t.Parallel() + + r := metrics.NewRegistry() + m1 := r.MustNewMetric("metric1", metrics.Counter) + + logger, _ := testutils.NewLoggerWithHook(t) + + bq := &bucketQ{} + pm := &pusherMock{ + errFn: func() error { + return errors.New("some error") + }, + } + mf := metricsFlusher{ + bq: bq, + client: pm, + logger: logger, + discardedLabels: make(map[string]struct{}), + maxSeriesInBatch: 3, + batchPushConcurrency: 2, + } + + series := 7 + + bq.buckets = make([]timeBucket, 0, series) + for i := 0; i < series; i++ { + ts := metrics.TimeSeries{ + Metric: m1, + Tags: r.RootTagSet().With("key1", "val"+strconv.Itoa(i)), + } + bq.Push([]timeBucket{ + { + Time: int64(i) + 1, + Sinks: map[metrics.TimeSeries]metricValue{ + ts: &counter{Sum: float64(1)}, + }, + }, + }) + } + require.Len(t, bq.buckets, series) + + err := mf.flush() + require.Error(t, err) + // since the push happens concurrently the number of the calls could vary, + // but at least one call should happen and it should be less than the + // batchPushConcurrency + assert.LessOrEqual(t, pm.timesCalled(), mf.batchPushConcurrency) + assert.GreaterOrEqual(t, pm.timesCalled(), 1) +} diff --git a/output/cloud/expv2/output.go b/output/cloud/expv2/output.go index ec1e5a79a96..d6befb60227 100644 --- a/output/cloud/expv2/output.go +++ b/output/cloud/expv2/output.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "net/http" - "runtime" "strconv" "sync" "time" @@ -115,8 +114,7 @@ func (o *Output) Start() error { maxSeriesInBatch: int(o.config.MaxTimeSeriesInBatch.Int64), // TODO: when the migration from v1 is over // change the default of cloudapi.MetricPushConcurrency to use GOMAXPROCS(0) - // batchPushConcurrency: int(o.config.MetricPushConcurrency.Int64), - batchPushConcurrency: runtime.GOMAXPROCS(0), + batchPushConcurrency: int(o.config.MetricPushConcurrency.Int64), } o.runFlushWorkers() @@ -185,31 +183,25 @@ func (o *Output) StopWithTestError(_ error) error { func (o *Output) runFlushWorkers() { t := time.NewTicker(o.config.MetricPushInterval.TimeDuration()) - // TODO: drop it when we are sure of the new proposed architecture - // workers := o.config.MetricPushConcurrency.Int64 - // Details: https://github.com/grafana/k6/issues/3192 - workers := 1 + o.wg.Add(1) - for i := 0; i < workers; i++ { - o.wg.Add(1) - go func() { - defer func() { - t.Stop() - o.wg.Done() - }() + go func() { + defer func() { + t.Stop() + o.wg.Done() + }() - for { - select { - case <-t.C: - o.flushMetrics() - case <-o.stop: - return - case <-o.abort: - return - } + for { + select { + case <-t.C: + o.flushMetrics() + case <-o.stop: + return + case <-o.abort: + return } - }() - } + } + }() } // AddMetricSamples receives the samples streaming. diff --git a/output/cloud/expv2/output_test.go b/output/cloud/expv2/output_test.go index 2ad87add7dd..ba14570e75e 100644 --- a/output/cloud/expv2/output_test.go +++ b/output/cloud/expv2/output_test.go @@ -305,7 +305,7 @@ func TestOutputStopWithTestError(t *testing.T) { require.NoError(t, o.StopWithTestError(errors.New("an error"))) } -func TestOutputFlushMetricsConcurrently(t *testing.T) { +func TestOutputFlushTicks(t *testing.T) { t.Parallel() done := make(chan struct{}) @@ -321,12 +321,10 @@ func TestOutputFlushMetricsConcurrently(t *testing.T) { close(done) return } - <-done } o := Output{logger: testutils.NewLogger(t)} - o.config.MetricPushConcurrency = null.IntFrom(2) - o.config.MetricPushInterval = types.NullDurationFrom(1) // loop + o.config.MetricPushInterval = types.NullDurationFrom(100 * time.Millisecond) // loop o.flushing = flusherFunc(flusherMock) o.runFlushWorkers() From 806ab25c7fc1c20ba2e48b775009dbeb158d79c5 Mon Sep 17 00:00:00 2001 From: Oleg Bespalov Date: Thu, 20 Jul 2023 14:53:49 +0200 Subject: [PATCH 04/10] feat: simplifying and move discarded labels reporting --- output/cloud/expv2/flush.go | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/output/cloud/expv2/flush.go b/output/cloud/expv2/flush.go index 7e19480de60..d98810dd9f2 100644 --- a/output/cloud/expv2/flush.go +++ b/output/cloud/expv2/flush.go @@ -47,7 +47,7 @@ func (f *metricsFlusher) flush() error { var ( start = time.Now() - batches []metricSetBuilder + batches []*pbcloud.MetricSet seriesCount int ) @@ -69,7 +69,8 @@ func (f *metricsFlusher) flush() error { // We hit the batch size, let's flush seriesCount += len(msb.seriesIndex) - batches = append(batches, msb) + batches = append(batches, msb.MetricSet) + f.reportDiscardedLabels(msb.discardedLabels) // Reset the builder msb = newMetricSetBuilder(f.testRunID, f.aggregationPeriodInSeconds) @@ -78,12 +79,13 @@ func (f *metricsFlusher) flush() error { // send the last (or the unique) MetricSet chunk to the remote service seriesCount += len(msb.seriesIndex) - batches = append(batches, msb) + batches = append(batches, msb.MetricSet) + f.reportDiscardedLabels(msb.discardedLabels) return f.flushBatches(batches) } -func (f *metricsFlusher) flushBatches(batches []metricSetBuilder) error { +func (f *metricsFlusher) flushBatches(batches []*pbcloud.MetricSet) error { var ( wg = sync.WaitGroup{} errs = make(chan error) @@ -102,7 +104,7 @@ func (f *metricsFlusher) flushBatches(batches []metricSetBuilder) error { end = len(batches) } - go func(chunk []metricSetBuilder) { + go func(chunk []*pbcloud.MetricSet) { defer wg.Done() for i := 0; i < len(chunk); i++ { @@ -111,7 +113,7 @@ func (f *metricsFlusher) flushBatches(batches []metricSetBuilder) error { return default: } - if err := f.push(chunk[i]); err != nil { + if err := f.client.push(chunk[i]); err != nil { select { case errs <- err: case <-stop: @@ -139,10 +141,8 @@ func (f *metricsFlusher) flushBatches(batches []metricSetBuilder) error { } } -// push sends the metric set to the remote service. -// it also checks if the labels are discarded and logs a warning if so. -func (f *metricsFlusher) push(msb metricSetBuilder) error { - for key := range msb.discardedLabels { +func (f *metricsFlusher) reportDiscardedLabels(discardedLabels map[string]struct{}) { + for key := range discardedLabels { if _, ok := f.discardedLabels[key]; ok { continue } @@ -150,8 +150,6 @@ func (f *metricsFlusher) push(msb metricSetBuilder) error { f.discardedLabels[key] = struct{}{} f.logger.Warnf("Tag %s has been discarded since it is reserved for Cloud operations.", key) } - - return f.client.push(msb.MetricSet) } type metricSetBuilder struct { From e4731fc1a5801a842f6dafff641b2a590f13b776 Mon Sep 17 00:00:00 2001 From: Oleg Bespalov Date: Thu, 20 Jul 2023 14:56:41 +0200 Subject: [PATCH 05/10] chore: rename runFlushWorkers --- output/cloud/expv2/output.go | 4 ++-- output/cloud/expv2/output_test.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/output/cloud/expv2/output.go b/output/cloud/expv2/output.go index d6befb60227..55a9332b042 100644 --- a/output/cloud/expv2/output.go +++ b/output/cloud/expv2/output.go @@ -117,7 +117,7 @@ func (o *Output) Start() error { batchPushConcurrency: int(o.config.MetricPushConcurrency.Int64), } - o.runFlushWorkers() + o.runPeriodicFlush() o.periodicInvoke(o.config.AggregationPeriod.TimeDuration(), o.collectSamples) if insightsOutput.Enabled(o.config) { @@ -180,7 +180,7 @@ func (o *Output) StopWithTestError(_ error) error { return nil } -func (o *Output) runFlushWorkers() { +func (o *Output) runPeriodicFlush() { t := time.NewTicker(o.config.MetricPushInterval.TimeDuration()) o.wg.Add(1) diff --git a/output/cloud/expv2/output_test.go b/output/cloud/expv2/output_test.go index ba14570e75e..ec6441bf3cb 100644 --- a/output/cloud/expv2/output_test.go +++ b/output/cloud/expv2/output_test.go @@ -326,7 +326,7 @@ func TestOutputFlushTicks(t *testing.T) { o := Output{logger: testutils.NewLogger(t)} o.config.MetricPushInterval = types.NullDurationFrom(100 * time.Millisecond) // loop o.flushing = flusherFunc(flusherMock) - o.runFlushWorkers() + o.runPeriodicFlush() select { case <-time.After(5 * time.Second): @@ -352,7 +352,7 @@ func TestOutputFlushWorkersStop(t *testing.T) { } o.flushing = flusherFunc(flusherMock) - o.runFlushWorkers() + o.runPeriodicFlush() // it asserts that all flushers exit done := make(chan struct{}) @@ -383,7 +383,7 @@ func TestOutputFlushWorkersAbort(t *testing.T) { } o.flushing = flusherFunc(flusherMock) - o.runFlushWorkers() + o.runPeriodicFlush() // it asserts that all flushers exit done := make(chan struct{}) From a3267b1438923e8a1d530ace274aa8a09a704ab7 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Mon, 24 Jul 2023 14:33:21 +0300 Subject: [PATCH 06/10] cloudv2: add min helper --- output/cloud/expv2/flush.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/output/cloud/expv2/flush.go b/output/cloud/expv2/flush.go index d98810dd9f2..08311fbfe8e 100644 --- a/output/cloud/expv2/flush.go +++ b/output/cloud/expv2/flush.go @@ -1,7 +1,6 @@ package expv2 import ( - "math" "sync" "time" @@ -86,13 +85,20 @@ func (f *metricsFlusher) flush() error { } func (f *metricsFlusher) flushBatches(batches []*pbcloud.MetricSet) error { + // TODO remove after go 1.21 becomes the minimum supported version - it has `min` in it + min := func(a, b int) int { + if a < b { + return a + } + return b + } var ( wg = sync.WaitGroup{} errs = make(chan error) done = make(chan struct{}) stop = make(chan struct{}) - workers = int(math.Min(float64(len(batches)), float64(f.batchPushConcurrency))) + workers = min(len(batches), f.batchPushConcurrency) chunkSize = len(batches) / workers ) From 0f9dbe229aabdc4c7cc25a0306479e3f541a19e9 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Mon, 24 Jul 2023 14:48:46 +0300 Subject: [PATCH 07/10] cloudv2: use channels to split batches --- output/cloud/expv2/flush.go | 47 ++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/output/cloud/expv2/flush.go b/output/cloud/expv2/flush.go index 08311fbfe8e..0d5c382ecf0 100644 --- a/output/cloud/expv2/flush.go +++ b/output/cloud/expv2/flush.go @@ -92,34 +92,29 @@ func (f *metricsFlusher) flushBatches(batches []*pbcloud.MetricSet) error { } return b } + var ( wg = sync.WaitGroup{} errs = make(chan error) done = make(chan struct{}) stop = make(chan struct{}) + feed = make(chan *pbcloud.MetricSet) - workers = min(len(batches), f.batchPushConcurrency) - chunkSize = len(batches) / workers + workers = min(len(batches), f.batchPushConcurrency) ) wg.Add(workers) for workersIndex := 0; workersIndex < workers; workersIndex++ { - offset := (workersIndex * chunkSize) - end := offset + chunkSize - if workersIndex == workers-1 { - end = len(batches) - } - - go func(chunk []*pbcloud.MetricSet) { + go func() { defer wg.Done() - for i := 0; i < len(chunk); i++ { + for chunk := range feed { select { case <-stop: return default: } - if err := f.client.push(chunk[i]); err != nil { + if err := f.client.push(chunk); err != nil { select { case errs <- err: case <-stop: @@ -127,24 +122,34 @@ func (f *metricsFlusher) flushBatches(batches []*pbcloud.MetricSet) error { } } } - }(batches[offset:end]) + }() } - go func() { - defer close(done) - wg.Wait() - }() - - for { + for i := 0; i < len(batches); i++ { select { case err := <-errs: close(stop) - <-done + close(feed) return err - case <-done: - return nil + case feed <- batches[i]: } } + + close(feed) + + go func() { + defer close(done) + wg.Wait() + }() + + select { + case err := <-errs: + close(stop) + <-done + return err + case <-done: + return nil + } } func (f *metricsFlusher) reportDiscardedLabels(discardedLabels map[string]struct{}) { From a19e027cceda6e45a26b80a932c69cb3559caf76 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Mon, 24 Jul 2023 14:58:50 +0300 Subject: [PATCH 08/10] cloudv2: refactor --- output/cloud/expv2/flush.go | 36 ++++++++++++------------------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/output/cloud/expv2/flush.go b/output/cloud/expv2/flush.go index 0d5c382ecf0..4ff5dabd6e7 100644 --- a/output/cloud/expv2/flush.go +++ b/output/cloud/expv2/flush.go @@ -94,42 +94,36 @@ func (f *metricsFlusher) flushBatches(batches []*pbcloud.MetricSet) error { } var ( - wg = sync.WaitGroup{} - errs = make(chan error) - done = make(chan struct{}) - stop = make(chan struct{}) - feed = make(chan *pbcloud.MetricSet) - + wg = sync.WaitGroup{} workers = min(len(batches), f.batchPushConcurrency) + errs = make(chan error, workers) + feed = make(chan *pbcloud.MetricSet) + done = make(chan struct{}) ) wg.Add(workers) - for workersIndex := 0; workersIndex < workers; workersIndex++ { + for i := 0; i < workers; i++ { go func() { defer wg.Done() for chunk := range feed { - select { - case <-stop: - return - default: - } if err := f.client.push(chunk); err != nil { - select { - case errs <- err: - case <-stop: - return - } + errs <- err + return } } }() } + go func() { + defer close(done) + wg.Wait() + }() for i := 0; i < len(batches); i++ { select { case err := <-errs: - close(stop) close(feed) + <-done return err case feed <- batches[i]: } @@ -137,14 +131,8 @@ func (f *metricsFlusher) flushBatches(batches []*pbcloud.MetricSet) error { close(feed) - go func() { - defer close(done) - wg.Wait() - }() - select { case err := <-errs: - close(stop) <-done return err case <-done: From 3769dc06791ec9b05a5b171bcca96fc2cfd7e4b6 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Mon, 24 Jul 2023 15:12:35 +0300 Subject: [PATCH 09/10] cloudv2: drop waitgroup for a buffer channel --- output/cloud/expv2/flush.go | 37 +++++++++++++++---------------------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/output/cloud/expv2/flush.go b/output/cloud/expv2/flush.go index 4ff5dabd6e7..e70de7e6b0e 100644 --- a/output/cloud/expv2/flush.go +++ b/output/cloud/expv2/flush.go @@ -1,7 +1,6 @@ package expv2 import ( - "sync" "time" "github.com/sirupsen/logrus" @@ -94,50 +93,44 @@ func (f *metricsFlusher) flushBatches(batches []*pbcloud.MetricSet) error { } var ( - wg = sync.WaitGroup{} - workers = min(len(batches), f.batchPushConcurrency) - errs = make(chan error, workers) - feed = make(chan *pbcloud.MetricSet) - done = make(chan struct{}) + workers = min(len(batches), f.batchPushConcurrency) + errs = make(chan error, workers) + feed = make(chan *pbcloud.MetricSet) + finalErr error ) - wg.Add(workers) for i := 0; i < workers; i++ { go func() { - defer wg.Done() - for chunk := range feed { if err := f.client.push(chunk); err != nil { errs <- err return } } + errs <- nil }() } - go func() { - defer close(done) - wg.Wait() - }() +outer: for i := 0; i < len(batches); i++ { select { case err := <-errs: - close(feed) - <-done - return err + workers-- + finalErr = err + break outer case feed <- batches[i]: } } close(feed) - select { - case err := <-errs: - <-done - return err - case <-done: - return nil + for ; workers != 0; workers-- { + err := <-errs + if err != nil && finalErr != nil { + finalErr = err + } } + return finalErr } func (f *metricsFlusher) reportDiscardedLabels(discardedLabels map[string]struct{}) { From a2878f74c3f4ceb9930bfb00d5cc06d0a45ef98f Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Mon, 24 Jul 2023 15:39:30 +0300 Subject: [PATCH 10/10] fixup! cloudv2: drop waitgroup for a buffer channel --- output/cloud/expv2/flush.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/output/cloud/expv2/flush.go b/output/cloud/expv2/flush.go index e70de7e6b0e..d6dc9f9c866 100644 --- a/output/cloud/expv2/flush.go +++ b/output/cloud/expv2/flush.go @@ -126,7 +126,7 @@ outer: for ; workers != 0; workers-- { err := <-errs - if err != nil && finalErr != nil { + if err != nil && finalErr == nil { finalErr = err } }