Skip to content

Commit

Permalink
commit
Browse files Browse the repository at this point in the history
  • Loading branch information
bufdev committed Oct 31, 2024
1 parent ad40d14 commit 85ff5ec
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
7 changes: 0 additions & 7 deletions private/buf/bufgen/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,6 @@ func (g *generator) execPlugins(
jobs,
thread.ParallelizeWithCancelOnFailure(),
); err != nil {
// Handle errors joined with `errors.Join`, which is documented to fulfill this interface.
if unwrappableErr, ok := err.(interface{ Unwrap() []error }); ok {
errs := unwrappableErr.Unwrap()
if len(errs) > 0 {
return nil, errs[0]
}
}
return nil, err
}
if err := validateResponses(responses, pluginConfigs); err != nil {
Expand Down
17 changes: 12 additions & 5 deletions private/pkg/thread/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func Parallelize(ctx context.Context, jobs []func(context.Context) error, option
defer cancel()
}
semaphoreC := make(chan struct{}, Parallelism()*multiplier)
var retErr error
var errs []error
var wg sync.WaitGroup
var lock sync.Mutex
var stop bool
Expand All @@ -90,19 +90,19 @@ func Parallelize(ctx context.Context, jobs []func(context.Context) error, option
select {
case <-ctx.Done():
stop = true
retErr = errors.Join(retErr, ctx.Err())
errs = append(errs, ctx.Err())
case semaphoreC <- struct{}{}:
select {
case <-ctx.Done():
stop = true
retErr = errors.Join(retErr, ctx.Err())
errs = append(errs, ctx.Err())
default:
job := job
wg.Add(1)
go func() {
if err := job(ctx); err != nil {
lock.Lock()
retErr = errors.Join(retErr, err)
errs = append(errs, err)
lock.Unlock()
if cancel != nil {
cancel()
Expand All @@ -116,7 +116,14 @@ func Parallelize(ctx context.Context, jobs []func(context.Context) error, option
}
}
wg.Wait()
return retErr
switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return errors.Join(errs...)
}
}

// ParallelizeOption is an option to Parallelize.
Expand Down

0 comments on commit 85ff5ec

Please sign in to comment.