From 4e3c588224d382a54956ae683897fab66d705403 Mon Sep 17 00:00:00 2001 From: rhysd Date: Sun, 18 Jul 2021 03:36:52 +0900 Subject: [PATCH] bound number of concurrent processes with semaphore and errgroup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit by making goroutine per process. This makes many goroutines, but number of them at the same time is bounded to number of CPUs. This approach may introduce additional overhead of creating/deleting goroutines. But as far as checking the benchmarks, it is as efficient as or slightly more efficient than previous implementation. This might happen due to channel overhead. - Previous implementation (with mocking running process) ``` goos: darwin goarch: amd64 pkg: github.com/rhysd/actionlint cpu: Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz BenchmarkLintWorkflows/small-1-20 14172 83750 ns/op 32274 B/op 482 allocs/op BenchmarkLintWorkflows/small-3-20 8744 133699 ns/op 95600 B/op 1433 allocs/op BenchmarkLintWorkflows/small-10-20 4669 251667 ns/op 316652 B/op 4750 allocs/op BenchmarkLintWorkflows/small-shellcheck-1-20 8743 126736 ns/op 35104 B/op 532 allocs/op BenchmarkLintWorkflows/small-shellcheck-3-20 6404 181138 ns/op 104130 B/op 1585 allocs/op BenchmarkLintWorkflows/small-shellcheck-10-20 2337 497045 ns/op 347283 B/op 5273 allocs/op BenchmarkLintWorkflows/large-shellcheck-1-20 1831 656964 ns/op 224186 B/op 4529 allocs/op BenchmarkLintWorkflows/large-shellcheck-3-20 949 1262167 ns/op 672339 B/op 13584 allocs/op BenchmarkLintWorkflows/large-shellcheck-10-20 283 4220101 ns/op 2240366 B/op 45258 allocs/op PASS ok github.com/rhysd/actionlint 12.409s ``` - This commit (with mocking running process) ``` goos: darwin goarch: amd64 pkg: github.com/rhysd/actionlint cpu: Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz BenchmarkLintWorkflows/small-1-20 14246 83775 ns/op 32162 B/op 481 allocs/op BenchmarkLintWorkflows/small-3-20 8832 133214 ns/op 95495 B/op 1432 allocs/op BenchmarkLintWorkflows/small-10-20 4738 250377 ns/op 316532 B/op 4749 allocs/op BenchmarkLintWorkflows/small-shellcheck-1-20 8955 121658 ns/op 35030 B/op 531 allocs/op BenchmarkLintWorkflows/small-shellcheck-3-20 6835 173217 ns/op 104112 B/op 1583 allocs/op BenchmarkLintWorkflows/small-shellcheck-10-20 2593 459949 ns/op 347263 B/op 5269 allocs/op BenchmarkLintWorkflows/large-shellcheck-1-20 2109 562768 ns/op 225346 B/op 4526 allocs/op BenchmarkLintWorkflows/large-shellcheck-3-20 1353 880209 ns/op 677864 B/op 13603 allocs/op BenchmarkLintWorkflows/large-shellcheck-10-20 482 2528021 ns/op 2282275 B/op 45760 allocs/op PASS ok github.com/rhysd/actionlint 12.238s ``` - Ref: `hyperfine` results ``` Benchmark #1: ./actionlint.before testdata/bench/many_scripts.yaml Time (mean ± σ): 3.7 ms ± 0.4 ms [User: 2.8 ms, System: 1.5 ms] Range (min … max): 3.2 ms … 5.3 ms 406 runs Warning: Command took less than 5 ms to complete. Results might be inaccurate. Warning: Statistical outliers were detected. Consider re-running this benchmark on a quiet PC without any interferences from other programs. It might help to use the '--warmup' or '--prepare' options. Benchmark #2: ./actionlint.after testdata/bench/many_scripts.yaml Time (mean ± σ): 3.6 ms ± 0.4 ms [User: 2.5 ms, System: 1.4 ms] Range (min … max): 3.2 ms … 5.5 ms 489 runs Warning: Command took less than 5 ms to complete. Results might be inaccurate. Warning: Statistical outliers were detected. Consider re-running this benchmark on a quiet PC without any interferences from other programs. It might help to use the '--warmup' or '--prepare' options. Summary './actionlint.after testdata/bench/many_scripts.yaml' ran 1.04 ± 0.17 times faster than './actionlint.before testdata/bench/many_scripts.yaml' ``` --- process.go | 71 +++++++++++----------------------------------- rule_pyflakes.go | 4 --- rule_shellcheck.go | 4 --- 3 files changed, 16 insertions(+), 63 deletions(-) diff --git a/process.go b/process.go index 02b369821f..3281b7f1f4 100644 --- a/process.go +++ b/process.go @@ -1,10 +1,13 @@ package actionlint import ( + "context" "fmt" "io" "os/exec" - "sync" + + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" ) type processRunRequest struct { @@ -20,20 +23,15 @@ type processRunRequest struct { // "pipe: too many files to open". To avoid it, this class manages how many processes are run at // the same time. type concurrentProcess struct { - wg sync.WaitGroup - ch chan *processRunRequest - done chan struct{} - numWorkers int - maxWorkers int - err error - errMu sync.Mutex + ctx context.Context + sema *semaphore.Weighted + eg errgroup.Group } func newConcurrentProcess(par int) *concurrentProcess { return &concurrentProcess{ - ch: make(chan *processRunRequest), - done: make(chan struct{}), - maxWorkers: par, + ctx: context.Background(), + sema: semaphore.NewWeighted(int64(par)), } } @@ -70,52 +68,15 @@ func runProcessWithStdin(exe string, args []string, stdin string) ([]byte, error return stdout, nil } -func (proc *concurrentProcess) handleRequest(req *processRunRequest) { - stdout, err := runProcessWithStdin(req.exe, req.args, req.stdin) - if err := req.callback(stdout, err); err != nil { - proc.errMu.Lock() - if proc.err == nil { - proc.err = err - } - proc.errMu.Unlock() - } -} - -func (proc *concurrentProcess) newWorker() { - proc.numWorkers++ - proc.wg.Add(1) - go func(recv <-chan *processRunRequest, done <-chan struct{}) { - for { - select { - case req := <-recv: - proc.handleRequest(req) - case <-done: - proc.wg.Done() - return - } - } - }(proc.ch, proc.done) -} - -func (proc *concurrentProcess) hasError() bool { - proc.errMu.Lock() - defer proc.errMu.Unlock() - return proc.err != nil -} - func (proc *concurrentProcess) run(exe string, args []string, stdin string, callback func([]byte, error) error) { - // This works fine since it is single-producer-multi-consumers - if proc.numWorkers < proc.maxWorkers { - // Defer making new workers. This is efficient when no worker is necessary. It happens when shellcheck - // and pyflakes are never run (e.g. they're disabled). - proc.newWorker() - } - proc.ch <- &processRunRequest{exe, args, stdin, callback} + proc.sema.Acquire(proc.ctx, 1) + proc.eg.Go(func() error { + stdout, err := runProcessWithStdin(exe, args, stdin) + proc.sema.Release(1) + return callback(stdout, err) + }) } func (proc *concurrentProcess) wait() error { - close(proc.done) // Request workers to shutdown - proc.wg.Wait() // Wait for workers completing to shutdown - proc.numWorkers = 0 - return proc.err + return proc.eg.Wait() // Wait for workers completing to shutdown } diff --git a/rule_pyflakes.go b/rule_pyflakes.go index 9b6682c99f..458d53b34e 100644 --- a/rule_pyflakes.go +++ b/rule_pyflakes.go @@ -100,10 +100,6 @@ func (rule *RulePyflakes) VisitStep(n *Step) error { return nil } - if rule.proc.hasError() { - return nil - } - rule.runPyflakes(rule.cmd, run.Run.Value, run.RunPos) return nil } diff --git a/rule_shellcheck.go b/rule_shellcheck.go index 1a38ddb825..3e1a76ca6a 100644 --- a/rule_shellcheck.go +++ b/rule_shellcheck.go @@ -63,10 +63,6 @@ func (rule *RuleShellcheck) VisitStep(n *Step) error { return nil } - if rule.proc.hasError() { - return nil - } - rule.runShellcheck(rule.cmd, run.Run.Value, name, run.RunPos) return nil }