From 892f87d1252253c346386efb7dd8cbf6356c8047 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Tue, 22 Jun 2021 09:28:11 -0700 Subject: [PATCH] flightcontrol: reduce contention between goroutines Signed-off-by: Tonis Tiigi --- util/flightcontrol/flightcontrol.go | 14 ++++++++++--- util/flightcontrol/flightcontrol_test.go | 26 ++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/util/flightcontrol/flightcontrol.go b/util/flightcontrol/flightcontrol.go index fc9f7272a42bc..201dbc58b4bb1 100644 --- a/util/flightcontrol/flightcontrol.go +++ b/util/flightcontrol/flightcontrol.go @@ -39,7 +39,7 @@ func (g *Group) Do(ctx context.Context, key string, fn func(ctx context.Context) return v, err } // backoff logic - if backoff >= 3*time.Second { + if backoff >= 15*time.Second { err = errors.Wrapf(errRetryTimeout, "flightcontrol") return v, err } @@ -132,8 +132,16 @@ func (c *call) wait(ctx context.Context) (v interface{}, err error) { select { case <-c.ready: c.mu.Unlock() - <-c.cleaned - return nil, errRetry + if c.err != nil { // on error retry + <-c.cleaned + return nil, errRetry + } + pw, ok, _ := progress.FromContext(ctx) + if ok { + c.progressState.add(pw) + } + return c.result, nil + case <-c.ctx.done: // could return if no error c.mu.Unlock() <-c.cleaned diff --git a/util/flightcontrol/flightcontrol_test.go b/util/flightcontrol/flightcontrol_test.go index bb02386f8e83f..9c6d7ff1e0d4e 100644 --- a/util/flightcontrol/flightcontrol_test.go +++ b/util/flightcontrol/flightcontrol_test.go @@ -2,6 +2,7 @@ package flightcontrol import ( "context" + "sync" "sync/atomic" "testing" "time" @@ -203,6 +204,31 @@ func TestCancelBoth(t *testing.T) { assert.Equal(t, counter, int64(4)) } +func TestContention(t *testing.T) { + perthread := 1000 + threads := 100 + + wg := sync.WaitGroup{} + wg.Add(threads) + + g := &Group{} + + for i := 0; i < threads; i++ { + go func(s int) { + for i := 0; i < s; i++ { + _, err := g.Do(context.TODO(), "foo", func(ctx context.Context) (interface{}, error) { + time.Sleep(time.Microsecond) + return nil, nil + }) + require.NoError(t, err) + } + wg.Done() + }(perthread) + } + + wg.Wait() +} + func testFunc(wait time.Duration, ret string, counter *int64) func(ctx context.Context) (interface{}, error) { return func(ctx context.Context) (interface{}, error) { atomic.AddInt64(counter, 1)