Skip to content

Commit

Permalink
defer making worker goroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
rhysd committed Jul 17, 2021
1 parent 70e400e commit 2ab2146
Showing 1 changed file with 31 additions and 30 deletions.
61 changes: 31 additions & 30 deletions process.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package actionlint

import (
"context"
"fmt"
"io"
"os/exec"
"sync"

"golang.org/x/sync/semaphore"
)

type processRunRequest struct {
Expand All @@ -23,29 +20,21 @@ type processRunRequest struct {
// "pipe: too many files to open". To avoid it, this class manages how many processes are run at
// the same time.
type concurrentProcess struct {
ctx context.Context
sema *semaphore.Weighted
wg sync.WaitGroup
ch chan *processRunRequest
done chan struct{}
err error
errMu sync.Mutex
wg sync.WaitGroup
ch chan *processRunRequest
done chan struct{}
numWorkers int
maxWorkers int
err error
errMu sync.Mutex
}

func newConcurrentProcess(par int) *concurrentProcess {
proc := &concurrentProcess{
ctx: context.Background(),
sema: semaphore.NewWeighted(int64(par)),
ch: make(chan *processRunRequest),
done: make(chan struct{}),
}

// Setup worker goroutines
for i := 0; i < par; i++ {
proc.newWorker()
return &concurrentProcess{
ch: make(chan *processRunRequest),
done: make(chan struct{}),
maxWorkers: par,
}

return proc
}

func runProcessWithStdin(exe string, args []string, stdin string) ([]byte, error) {
Expand Down Expand Up @@ -81,20 +70,25 @@ func runProcessWithStdin(exe string, args []string, stdin string) ([]byte, error
return stdout, nil
}

func (proc *concurrentProcess) handleRequest(req *processRunRequest) {
stdout, err := runProcessWithStdin(req.exe, req.args, req.stdin)
if err := req.callback(stdout, err); err != nil {
proc.errMu.Lock()
if proc.err == nil {
proc.err = err
}
proc.errMu.Unlock()
}
}

func (proc *concurrentProcess) newWorker() {
proc.numWorkers++
proc.wg.Add(1)
go func(recv <-chan *processRunRequest, done <-chan struct{}) {
for {
select {
case req := <-recv:
stdout, err := runProcessWithStdin(req.exe, req.args, req.stdin)
if err := req.callback(stdout, err); err != nil {
proc.errMu.Lock()
if proc.err == nil {
proc.err = err
}
proc.errMu.Unlock()
}
proc.handleRequest(req)
case <-done:
proc.wg.Done()
return
Expand All @@ -110,11 +104,18 @@ func (proc *concurrentProcess) hasError() bool {
}

func (proc *concurrentProcess) run(exe string, args []string, stdin string, callback func([]byte, error) error) {
// This works fine since it is single-producer-multi-consumers
if proc.numWorkers < proc.maxWorkers {
// Defer making new workers. This is efficient when no worker is necessary. It happens when shellcheck
// and pyflakes are never run (e.g. they're disabled).
proc.newWorker()
}
proc.ch <- &processRunRequest{exe, args, stdin, callback}
}

func (proc *concurrentProcess) wait() error {
close(proc.done) // Request workers to shutdown
proc.wg.Wait() // Wait for workers completing to shutdown
proc.numWorkers = 0
return proc.err
}

0 comments on commit 2ab2146

Please sign in to comment.