Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement concurrent pushes across batches #3206

Merged
merged 10 commits into from
Jul 24, 2023
97 changes: 75 additions & 22 deletions output/cloud/expv2/flush.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package expv2

import (
"math"
"sync"
"time"

"github.com/sirupsen/logrus"
Expand All @@ -21,6 +23,7 @@ type metricsFlusher struct {
discardedLabels map[string]struct{}
aggregationPeriodInSeconds uint32
maxSeriesInBatch int
batchPushConcurrency int
}

// flush flushes the queued buckets sending them to the remote Cloud service.
Expand All @@ -43,60 +46,110 @@ func (f *metricsFlusher) flush() error {
// the metricSetBuilder is used for doing it during the traverse of the buckets.

var (
seriesCount int
batchesCount int
start = time.Now()
start = time.Now()
batches []*pbcloud.MetricSet
seriesCount int
)

defer func() {
f.logger.
WithField("t", time.Since(start)).
WithField("series", seriesCount).
WithField("buckets", len(buckets)).
WithField("batches", batchesCount).Debug("Flush the queued buckets")
WithField("batches", len(batches)).Debug("Flush the queued buckets")
}()

msb := newMetricSetBuilder(f.testRunID, f.aggregationPeriodInSeconds)
for i := 0; i < len(buckets); i++ {
for timeSeries, sink := range buckets[i].Sinks {
msb.addTimeSeries(buckets[i].Time, timeSeries, sink)
if len(msb.seriesIndex) < f.maxSeriesInBatch {
if len(msb.seriesIndex) <= f.maxSeriesInBatch {
continue
}

// we hit the batch size, let's flush
batchesCount++
// We hit the batch size, let's flush
seriesCount += len(msb.seriesIndex)
if err := f.push(msb); err != nil {
return err
}
batches = append(batches, msb.MetricSet)
f.reportDiscardedLabels(msb.discardedLabels)

// Reset the builder
msb = newMetricSetBuilder(f.testRunID, f.aggregationPeriodInSeconds)
}
}

if len(msb.seriesIndex) < 1 {
return nil
}

// send the last (or the unique) MetricSet chunk to the remote service
batchesCount++
seriesCount += len(msb.seriesIndex)
return f.push(msb)
batches = append(batches, msb.MetricSet)
f.reportDiscardedLabels(msb.discardedLabels)

return f.flushBatches(batches)
}

func (f *metricsFlusher) flushBatches(batches []*pbcloud.MetricSet) error {
var (
wg = sync.WaitGroup{}
errs = make(chan error)
done = make(chan struct{})
stop = make(chan struct{})

workers = int(math.Min(float64(len(batches)), float64(f.batchPushConcurrency)))
mstoykov marked this conversation as resolved.
Show resolved Hide resolved
chunkSize = len(batches) / workers
)

wg.Add(workers)
for workersIndex := 0; workersIndex < workers; workersIndex++ {
offset := (workersIndex * chunkSize)
end := offset + chunkSize
if workersIndex == workers-1 {
end = len(batches)
mstoykov marked this conversation as resolved.
Show resolved Hide resolved
}

go func(chunk []*pbcloud.MetricSet) {
defer wg.Done()

for i := 0; i < len(chunk); i++ {
select {
case <-stop:
return
default:
}
if err := f.client.push(chunk[i]); err != nil {
select {
case errs <- err:
case <-stop:
return
}
}
}
}(batches[offset:end])
}

go func() {
defer close(done)
wg.Wait()
}()

for {
select {
case err := <-errs:
close(stop)
<-done
return err
case <-done:
return nil
}
}
}

// push sends the metric set to the remote service.
// it also checks if the labels are discarded and logs a warning if so.
func (f *metricsFlusher) push(msb metricSetBuilder) error {
for key := range msb.discardedLabels {
func (f *metricsFlusher) reportDiscardedLabels(discardedLabels map[string]struct{}) {
for key := range discardedLabels {
if _, ok := f.discardedLabels[key]; ok {
continue
}

f.discardedLabels[key] = struct{}{}
f.logger.Warnf("Tag %s has been discarded since it is reserved for Cloud operations.", key)
}

return f.client.push(msb.MetricSet)
}

type metricSetBuilder struct {
Expand Down
124 changes: 99 additions & 25 deletions output/cloud/expv2/flush_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package expv2

import (
"errors"
"strconv"
"sync"
"sync/atomic"
"testing"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -37,6 +40,7 @@ func TestMetricSetBuilderAddTimeBucket(t *testing.T) {
require.Len(t, msb.MetricSet.Metrics, 1)
assert.Len(t, msb.MetricSet.Metrics[0].TimeSeries, 1)
}

func TestMetricsFlusherFlushInBatchWithinBucket(t *testing.T) {
t.Parallel()

Expand All @@ -56,11 +60,12 @@ func TestMetricsFlusherFlushInBatchWithinBucket(t *testing.T) {
bq := &bucketQ{}
pm := &pusherMock{}
mf := metricsFlusher{
bq: bq,
client: pm,
logger: logger,
discardedLabels: make(map[string]struct{}),
maxSeriesInBatch: 3,
bq: bq,
client: pm,
logger: logger,
discardedLabels: make(map[string]struct{}),
maxSeriesInBatch: 3,
batchPushConcurrency: 5,
}

bq.buckets = make([]timeBucket, 0, tc.series)
Expand All @@ -78,19 +83,19 @@ func TestMetricsFlusherFlushInBatchWithinBucket(t *testing.T) {
bq.Push([]timeBucket{{Time: 1, Sinks: sinks}})
err := mf.flush()
require.NoError(t, err)
assert.Equal(t, tc.expFlushCalls, pm.pushCalled)
assert.Equal(t, tc.expFlushCalls, pm.timesCalled())
}
}

func TestMetricsFlusherFlushInBatchAcrossBuckets(t *testing.T) {
t.Parallel()

testCases := []struct {
series int
expFlushCalls int
series int
expPushCalls int
}{
{series: 5, expFlushCalls: 2},
{series: 2, expFlushCalls: 1},
{series: 5, expPushCalls: 2},
{series: 2, expPushCalls: 1},
}

r := metrics.NewRegistry()
Expand All @@ -101,11 +106,12 @@ func TestMetricsFlusherFlushInBatchAcrossBuckets(t *testing.T) {
bq := &bucketQ{}
pm := &pusherMock{}
mf := metricsFlusher{
bq: bq,
client: pm,
logger: logger,
discardedLabels: make(map[string]struct{}),
maxSeriesInBatch: 3,
bq: bq,
client: pm,
logger: logger,
discardedLabels: make(map[string]struct{}),
maxSeriesInBatch: 3,
batchPushConcurrency: 5,
}

bq.buckets = make([]timeBucket, 0, tc.series)
Expand All @@ -127,7 +133,7 @@ func TestMetricsFlusherFlushInBatchAcrossBuckets(t *testing.T) {

err := mf.flush()
require.NoError(t, err)
assert.Equal(t, tc.expFlushCalls, pm.pushCalled)
assert.Equal(t, tc.expPushCalls, pm.timesCalled())
}
}

Expand All @@ -136,21 +142,25 @@ func TestFlushWithReservedLabels(t *testing.T) {

logger, hook := testutils.NewLoggerWithHook(t)

mutex := sync.Mutex{}
collected := make([]*pbcloud.MetricSet, 0)

bq := &bucketQ{}
pm := &pusherMock{
hook: func(ms *pbcloud.MetricSet) {
mutex.Lock()
collected = append(collected, ms)
mutex.Unlock()
},
}

mf := metricsFlusher{
bq: bq,
client: pm,
maxSeriesInBatch: 2,
logger: logger,
discardedLabels: make(map[string]struct{}),
bq: bq,
client: pm,
maxSeriesInBatch: 2,
logger: logger,
discardedLabels: make(map[string]struct{}),
batchPushConcurrency: 5,
}

r := metrics.NewRegistry()
Expand Down Expand Up @@ -186,7 +196,7 @@ func TestFlushWithReservedLabels(t *testing.T) {
require.NoError(t, err)

loglines := hook.Drain()
assert.Equal(t, 1, len(collected))
require.Len(t, collected, 1)

// check that warnings sown only once per label
assert.Len(t, testutils.FilterEntries(loglines, logrus.WarnLevel, "Tag __name__ has been discarded since it is reserved for Cloud operations."), 1)
Expand All @@ -206,15 +216,79 @@ func TestFlushWithReservedLabels(t *testing.T) {
}

type pusherMock struct {
hook func(*pbcloud.MetricSet)
pushCalled int
// hook is called when the push method is called.
hook func(*pbcloud.MetricSet)
// errFn if this defined, it is called at the end of push
// and result error is returned.
errFn func() error
pushCalled int64
}

func (pm *pusherMock) timesCalled() int {
return int(atomic.LoadInt64(&pm.pushCalled))
}

func (pm *pusherMock) push(ms *pbcloud.MetricSet) error {
if pm.hook != nil {
pm.hook(ms)
}

pm.pushCalled++
atomic.AddInt64(&pm.pushCalled, 1)

if pm.errFn != nil {
return pm.errFn()
}

return nil
}

func TestMetricsFlusherErrorCase(t *testing.T) {
t.Parallel()

r := metrics.NewRegistry()
m1 := r.MustNewMetric("metric1", metrics.Counter)

logger, _ := testutils.NewLoggerWithHook(t)

bq := &bucketQ{}
pm := &pusherMock{
errFn: func() error {
return errors.New("some error")
},
}
mf := metricsFlusher{
bq: bq,
client: pm,
logger: logger,
discardedLabels: make(map[string]struct{}),
maxSeriesInBatch: 3,
batchPushConcurrency: 2,
}

series := 7

bq.buckets = make([]timeBucket, 0, series)
for i := 0; i < series; i++ {
ts := metrics.TimeSeries{
Metric: m1,
Tags: r.RootTagSet().With("key1", "val"+strconv.Itoa(i)),
}
bq.Push([]timeBucket{
{
Time: int64(i) + 1,
Sinks: map[metrics.TimeSeries]metricValue{
ts: &counter{Sum: float64(1)},
},
},
})
}
require.Len(t, bq.buckets, series)

err := mf.flush()
require.Error(t, err)
// since the push happens concurrently the number of the calls could vary,
// but at least one call should happen and it should be less than the
// batchPushConcurrency
assert.LessOrEqual(t, pm.timesCalled(), mf.batchPushConcurrency)
assert.GreaterOrEqual(t, pm.timesCalled(), 1)
}
Loading