From e4395b22aca136c288529b07c934ab03eb28fa81 Mon Sep 17 00:00:00 2001 From: alitto Date: Tue, 31 Jan 2023 15:25:28 -0300 Subject: [PATCH 1/2] Return Context error if it is canceled before at least 1 task failed --- group.go | 37 +++++++++++++++++++++---------------- group_blackbox_test.go | 31 +++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 16 deletions(-) diff --git a/group.go b/group.go index 707d829..908362a 100644 --- a/group.go +++ b/group.go @@ -50,26 +50,16 @@ func (g *TaskGroupWithContext) Submit(task func() error) { defer g.waitGroup.Done() // If context has already been cancelled, skip task execution - if g.ctx != nil { - select { - case <-g.ctx.Done(): - return - default: - } + select { + case <-g.ctx.Done(): + return + default: } // don't actually ignore errors err := task() if err != nil { - g.errSync.once.Do(func() { - g.errSync.guard.Lock() - g.err = err - g.errSync.guard.Unlock() - - if g.cancel != nil { - g.cancel() - } - }) + g.setError(err) } }) } @@ -91,11 +81,26 @@ func (g *TaskGroupWithContext) Wait() error { // If context was provided, cancel it to signal all running tasks to stop g.cancel() case <-g.ctx.Done(): + g.setError(g.ctx.Err()) } + return g.getError() +} + +func (g *TaskGroupWithContext) getError() error { g.errSync.guard.RLock() err := g.err g.errSync.guard.RUnlock() - return err } + +func (g *TaskGroupWithContext) setError(err error) { + g.errSync.once.Do(func() { + g.errSync.guard.Lock() + g.err = err + g.errSync.guard.Unlock() + + // Cancel execution of any pending task in this group + g.cancel() + }) +} diff --git a/group_blackbox_test.go b/group_blackbox_test.go index 0f04f11..b0a0144 100644 --- a/group_blackbox_test.go +++ b/group_blackbox_test.go @@ -120,3 +120,34 @@ func TestGroupContextWithNilContext(t *testing.T) { assertEqual(t, "a non-nil context needs to be specified when using GroupContext", thrownPanic) } + +func TestGroupContextWithCanceledContext(t *testing.T) { + + pool := pond.New(3, 100) + assertEqual(t, 0, pool.RunningWorkers()) + + // Submit a group of tasks + var doneCount, startedCount int32 + userCtx, cancel := context.WithCancel(context.Background()) + group, ctx := pool.GroupContext(userCtx) + for i := 0; i < 10; i++ { + group.Submit(func() error { + atomic.AddInt32(&startedCount, 1) + + select { + case <-time.After(10 * time.Millisecond): + atomic.AddInt32(&doneCount, 1) + case <-ctx.Done(): + } + + return nil + }) + } + + // Cancel context right after submitting tasks + cancel() + + err := group.Wait() + assertEqual(t, context.Canceled, err) + assertEqual(t, int32(0), atomic.LoadInt32(&doneCount)) +} From ec4323663e7b1b445b7dc74a3760ee441bb96062 Mon Sep 17 00:00:00 2001 From: alitto Date: Tue, 31 Jan 2023 15:32:00 -0300 Subject: [PATCH 2/2] Upgrade codecov action version --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 7a1bc90..d374fe4 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -34,7 +34,7 @@ jobs: uses: actions/checkout@v2 - name: Test run: make coverage - - uses: codecov/codecov-action@v2 + - uses: codecov/codecov-action@v3 with: files: coverage.out fail_ci_if_error: true