Skip to content

Commit

Permalink
scripts: convert run_tool to asyncio
Browse files Browse the repository at this point in the history
This improves the handling of keyboard interrupt, and also makes it easy to
buffer the output and not mix errors from different subprocesses.  This
is useful for clang-tidy and will be used by clippy as well.  In addition,
the new code supports MESON_NUM_PROCESSES.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
  • Loading branch information
bonzini authored and dcbaker committed Dec 19, 2024
1 parent dafa6a7 commit 5dc537a
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 41 deletions.
3 changes: 2 additions & 1 deletion docs/markdown/snippets/num-processes.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ Previously, `meson test` checked the `MESON_TESTTHREADS` variable to control
the amount of parallel jobs to run; this was useful when `meson test` is
invoked through `ninja test` for example. With this version, a new variable
`MESON_NUM_PROCESSES` is supported with a broader scope: in addition to
`meson test`, it is also used by the `external_project` module.
`meson test`, it is also used by the `external_project` module and by
Ninja targets that invoke `clang-tidy` and `clang-format`.
9 changes: 4 additions & 5 deletions mesonbuild/scripts/clangformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@
from __future__ import annotations

import argparse
import subprocess
from pathlib import Path
import sys

from .run_tool import run_clang_tool
from .run_tool import run_clang_tool, run_with_buffered_output
from ..environment import detect_clangformat
from ..mesonlib import version_compare
from ..programs import ExternalProgram
import typing as T

def run_clang_format(fname: Path, exelist: T.List[str], options: argparse.Namespace, cformat_ver: T.Optional[str]) -> subprocess.CompletedProcess:
async def run_clang_format(fname: Path, exelist: T.List[str], options: argparse.Namespace, cformat_ver: T.Optional[str]) -> int:
clangformat_10 = False
if options.check and cformat_ver:
if version_compare(cformat_ver, '>=10'):
Expand All @@ -26,14 +25,14 @@ def run_clang_format(fname: Path, exelist: T.List[str], options: argparse.Namesp
else:
original = fname.read_bytes()
before = fname.stat().st_mtime
ret = subprocess.run(exelist + ['-style=file', '-i', str(fname)])
ret = await run_with_buffered_output(exelist + ['-style=file', '-i', str(fname)])
after = fname.stat().st_mtime
if before != after:
print('File reformatted: ', fname)
if options.check and not clangformat_10:
# Restore the original if only checking.
fname.write_bytes(original)
ret.returncode = 1
return 1
return ret

def run(args: T.List[str]) -> int:
Expand Down
6 changes: 3 additions & 3 deletions mesonbuild/scripts/clangtidy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@
import shutil
import sys

from .run_tool import run_clang_tool
from .run_tool import run_clang_tool, run_with_buffered_output
from ..environment import detect_clangtidy, detect_clangapply
import typing as T

def run_clang_tidy(fname: Path, tidyexe: list, builddir: Path, fixesdir: T.Optional[Path]) -> subprocess.CompletedProcess:
async def run_clang_tidy(fname: Path, tidyexe: list, builddir: Path, fixesdir: T.Optional[Path]) -> int:
args = []
if fixesdir is not None:
handle, name = tempfile.mkstemp(prefix=fname.name + '.', suffix='.yaml', dir=fixesdir)
os.close(handle)
args.extend(['-export-fixes', name])
return subprocess.run(tidyexe + args + ['-quiet', '-p', str(builddir), str(fname)])
return await run_with_buffered_output(tidyexe + args + ['-quiet', '-p', str(builddir), str(fname)])

def run(args: T.List[str]) -> int:
parser = argparse.ArgumentParser()
Expand Down
120 changes: 88 additions & 32 deletions mesonbuild/scripts/run_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,85 @@

from __future__ import annotations

import itertools
import asyncio.subprocess
import fnmatch
import concurrent.futures
import itertools
import signal
import sys
from pathlib import Path

from .. import mlog
from ..compilers import lang_suffixes
from ..mesonlib import quiet_git
from ..mesonlib import quiet_git, join_args, determine_worker_count
from ..mtest import complete_all
import typing as T

if T.TYPE_CHECKING:
import subprocess
Info = T.TypeVar("Info")

async def run_with_buffered_output(cmdlist: T.List[str]) -> int:
"""Run the command in cmdlist, buffering the output so that it is
not mixed for multiple child processes. Kill the child on
cancellation."""
quoted_cmdline = join_args(cmdlist)
p: T.Optional[asyncio.subprocess.Process] = None
try:
p = await asyncio.create_subprocess_exec(*cmdlist,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT)
stdo, _ = await p.communicate()
except FileNotFoundError as e:
print(mlog.blue('>>>'), quoted_cmdline, file=sys.stderr)
print(mlog.red('not found:'), e.filename, file=sys.stderr)
return 1
except asyncio.CancelledError:
if p:
p.kill()
await p.wait()
return p.returncode or 1
else:
return 0

if stdo:
print(mlog.blue('>>>'), quoted_cmdline, flush=True)
sys.stdout.buffer.write(stdo)
return p.returncode

async def _run_workers(infos: T.Iterable[Info],
fn: T.Callable[[Info], T.Iterable[T.Coroutine[None, None, int]]]) -> int:
futures: T.List[asyncio.Future[int]] = []
semaphore = asyncio.Semaphore(determine_worker_count())

async def run_one(worker_coro: T.Coroutine[None, None, int]) -> int:
try:
async with semaphore:
return await worker_coro
except asyncio.CancelledError as e:
worker_coro.throw(e)
return await worker_coro

def sigterm_handler() -> None:
for f in futures:
f.cancel()

if sys.platform != 'win32':
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, sigterm_handler)
loop.add_signal_handler(signal.SIGTERM, sigterm_handler)

for i in infos:
futures.extend((asyncio.ensure_future(run_one(x)) for x in fn(i)))
if not futures:
return 0

try:
await complete_all(futures)
except BaseException:
for f in futures:
f.cancel()
raise

return max(f.result() for f in futures if f.done() and not f.cancelled())

def parse_pattern_file(fname: Path) -> T.List[str]:
patterns = []
Expand All @@ -27,7 +95,7 @@ def parse_pattern_file(fname: Path) -> T.List[str]:
pass
return patterns

def run_clang_tool(name: str, srcdir: Path, builddir: Path, fn: T.Callable[..., subprocess.CompletedProcess], *args: T.Any) -> int:
def all_clike_files(name: str, srcdir: Path, builddir: Path) -> T.Iterable[Path]:
patterns = parse_pattern_file(srcdir / f'.{name}-include')
globs: T.Union[T.List[T.List[Path]], T.List[T.Generator[Path, None, None]]]
if patterns:
Expand All @@ -44,29 +112,17 @@ def run_clang_tool(name: str, srcdir: Path, builddir: Path, fn: T.Callable[...,
suffixes = set(lang_suffixes['c']).union(set(lang_suffixes['cpp']))
suffixes.add('h')
suffixes = {f'.{s}' for s in suffixes}
futures = []
returncode = 0
e = concurrent.futures.ThreadPoolExecutor()
try:
for f in itertools.chain(*globs):
strf = str(f)
if f.is_dir() or f.suffix not in suffixes or \
any(fnmatch.fnmatch(strf, i) for i in ignore):
continue
futures.append(e.submit(fn, f, *args))
concurrent.futures.wait(
futures,
return_when=concurrent.futures.FIRST_EXCEPTION
)
finally:
# We try to prevent new subprocesses from being started by canceling
# the futures, but this is not water-tight: some may have started
# between the wait being interrupted or exited and the futures being
# canceled. (A fundamental fix would probably require the ability to
# terminate such subprocesses upon cancellation of the future.)
for x in futures: # Python >=3.9: e.shutdown(cancel_futures=True)
x.cancel()
e.shutdown()
if futures:
returncode = max(x.result().returncode for x in futures)
return returncode
for f in itertools.chain.from_iterable(globs):
strf = str(f)
if f.is_dir() or f.suffix not in suffixes or \
any(fnmatch.fnmatch(strf, i) for i in ignore):
continue
yield f

def run_clang_tool(name: str, srcdir: Path, builddir: Path, fn: T.Callable[..., T.Coroutine[None, None, int]], *args: T.Any) -> int:
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())

def wrapper(path: Path) -> T.Iterable[T.Coroutine[None, None, int]]:
yield fn(path, *args)
return asyncio.run(_run_workers(all_clike_files(name, srcdir, builddir), wrapper))

0 comments on commit 5dc537a

Please sign in to comment.