diff --git a/.github/workflows/labeler.yaml b/.github/workflows/labeler.yaml index af629198..fe643107 100644 --- a/.github/workflows/labeler.yaml +++ b/.github/workflows/labeler.yaml @@ -2,6 +2,9 @@ name: "Pull request labeler" on: - pull_request +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref }} + cancel-in-progress: true jobs: triage: permissions: diff --git a/.github/workflows/pr-description-enforcer.yaml b/.github/workflows/pr-description-enforcer.yaml index dbaadd7e..cf7d6054 100644 --- a/.github/workflows/pr-description-enforcer.yaml +++ b/.github/workflows/pr-description-enforcer.yaml @@ -6,6 +6,9 @@ on: - edited - reopened +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref }} + cancel-in-progress: true jobs: enforce: runs-on: ubuntu-latest diff --git a/.github/workflows/semantic-pr.yaml b/.github/workflows/semantic-pr.yaml index 90e13381..22e678c7 100644 --- a/.github/workflows/semantic-pr.yaml +++ b/.github/workflows/semantic-pr.yaml @@ -11,6 +11,9 @@ on: - ready_for_review - synchronize +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref }} + cancel-in-progress: true jobs: main: name: title diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 3df759bd..81efe804 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -6,6 +6,9 @@ on: - main - "release/*" pull_request: +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.sha }} + cancel-in-progress: true jobs: unit: name: unit diff --git a/.github/workflows/verify.yml b/.github/workflows/verify.yml index 3ece2276..d4f90c0e 100644 --- a/.github/workflows/verify.yml +++ b/.github/workflows/verify.yml @@ -7,6 +7,9 @@ on: - master - main pull_request: +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.sha }} + cancel-in-progress: true jobs: generate: name: generated files diff --git a/sync/group.go b/sync/group.go new file mode 100644 index 00000000..1a76cb5f --- /dev/null +++ b/sync/group.go @@ -0,0 +1,99 @@ +package sync + +import ( + "context" + "sync" +) + +// A EagerGroup is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// Use NewEagerGroup to create a new group. +type EagerGroup struct { + ctx context.Context + cancel context.CancelCauseFunc + wg sync.WaitGroup + sem chan struct{} + errOnce sync.Once + err error +} + +// NewEagerGroup returns a new eager group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +// +// limit < 1 means no limit on the number of active goroutines. +func NewEagerGroup(ctx context.Context, limit int) (*EagerGroup, context.Context) { + ctx, cancel := context.WithCancelCause(ctx) + g := &EagerGroup{ + ctx: ctx, + cancel: cancel, + } + if limit > 0 { + g.sem = make(chan struct{}, limit) + } + return g, ctx +} + +// Go calls the given function in a new goroutine. +// It blocks until the new goroutine can be added without the number of +// active goroutines in the group exceeding the configured limit. +// +// The first call to return a non-nil error cancels the group's context. +// The error will be returned by Wait. +// +// If the group was created by calling NewEagerGroup with limit < 1, there is no +// limit on the number of active goroutines. +// +// If the group's context is canceled, routines that have not executed yet due to the limit won't be executed. +// Additionally, there is a best effort not to execute `f()` once the context is canceled +// and that happens whether or not a limit has been specified. +func (g *EagerGroup) Go(f func() error) { + if err := g.ctx.Err(); err != nil { + g.errOnce.Do(func() { + g.err = g.ctx.Err() + g.cancel(g.err) + }) + return + } + + if g.sem != nil { + select { + case <-g.ctx.Done(): + g.errOnce.Do(func() { + g.err = g.ctx.Err() + g.cancel(g.err) + }) + return + case g.sem <- struct{}{}: + } + } + + g.wg.Add(1) + go func() { + err := g.ctx.Err() + if err == nil { + err = f() + } + if err != nil { + g.errOnce.Do(func() { + g.err = err + g.cancel(g.err) + }) + } + if g.sem != nil { + <-g.sem + } + g.wg.Done() + }() +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *EagerGroup) Wait() error { + g.wg.Wait() + g.cancel(g.err) + return g.err +} diff --git a/sync/group_test.go b/sync/group_test.go new file mode 100644 index 00000000..d5f1ba4a --- /dev/null +++ b/sync/group_test.go @@ -0,0 +1,91 @@ +package sync + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestEagerGroupWithLimit(t *testing.T) { + g, ctx := NewEagerGroup(context.Background(), 2) + var count atomic.Int64 + // One of the following three goroutines should DEFINITELY NOT be executed due to the limit of 2 and the context being cancelled. + // The context should get cancelled automatically because the first two routines returned an error. + g.Go(func() error { + t.Log("one") + count.Add(1) + return fmt.Errorf("one") + }) + g.Go(func() error { + t.Log("two") + count.Add(1) + return fmt.Errorf("two") + }) + g.Go(func() error { + t.Log("three") + count.Add(1) + return fmt.Errorf("three") + }) + require.Error(t, g.Wait(), "We expect group.Wait() to return an error") + ok := true + select { + case <-ctx.Done(): + _, ok = <-ctx.Done() + case <-time.After(time.Second): + } + require.False(t, ok, "We expect the context to be cancelled") + require.True(t, 1 <= count.Load() && count.Load() <= 2, "We expect count to be between 1 and 2") +} + +func TestEagerGroupWithNoLimit(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + g, ctx := NewEagerGroup(ctx, 0) + funcCounter := &atomic.Int64{} + + go func() { + for { + if funcCounter.Load() > 10 { + cancel() + return + } + } + }() + + for i := 0; i < 10000; i++ { + g.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + funcCounter.Add(1) + return nil + }) + } + require.ErrorIs(t, g.Wait(), ctx.Err(), "We expect group.Wait() to return the context error") + _, ok := <-ctx.Done() + require.False(t, ok, "We expect the context to be cancelled") + t.Log(funcCounter.Load(), "funcs executed") + // We expect between 10 and 10000 funcs to be executed + // because group tries to return early if context is cancelled + require.Less( + t, + funcCounter.Load(), + int64(10000), + "Expected less than 1000 funcs to be executed", + ) +} + +func TestNoInitEagerGroup(t *testing.T) { + g := &EagerGroup{} + f := func() error { return nil } + require.Panics( + t, + func() { g.Go(f) }, + "We expect a panic when calling Go on a group that has not been initialized with NewEagerGroup", + ) +}