diff --git a/nixpkgs_review/cli/__init__.py b/nixpkgs_review/cli/__init__.py index 74671c0..ab441ed 100644 --- a/nixpkgs_review/cli/__init__.py +++ b/nixpkgs_review/cli/__init__.py @@ -250,10 +250,10 @@ def common_flags() -> list[CommonFlag]: help="Extra nixpkgs config to pass to `import `", ), CommonFlag( - "--num-procs-eval", + "--num-parallel-evals", type=int, default=1, - help="Number of parallel `nix-env` processes to run simultaneously (warning, can imply heavy RAM usage)", + help="Number of parallel `nix-env`/`nix eval` processes to run simultaneously (warning, can imply heavy RAM usage)", ), ] diff --git a/nixpkgs_review/cli/pr.py b/nixpkgs_review/cli/pr.py index 113aa88..d92b506 100644 --- a/nixpkgs_review/cli/pr.py +++ b/nixpkgs_review/cli/pr.py @@ -86,7 +86,7 @@ def pr_command(args: argparse.Namespace) -> str: build_graph=args.build_graph, nixpkgs_config=nixpkgs_config, extra_nixpkgs_config=args.extra_nixpkgs_config, - n_procs_eval=args.num_procs_eval, + num_parallel_evals=args.num_parallel_evals, ) contexts.append((pr, builddir.path, review.build_pr(pr))) except NixpkgsReviewError as e: diff --git a/nixpkgs_review/nix.py b/nixpkgs_review/nix.py index 8d00531..2008735 100644 --- a/nixpkgs_review/nix.py +++ b/nixpkgs_review/nix.py @@ -1,11 +1,10 @@ +import concurrent.futures import json -import multiprocessing as mp import os import shlex import shutil import subprocess from dataclasses import dataclass, field -from functools import partial from pathlib import Path from sys import platform from tempfile import NamedTemporaryFile @@ -271,32 +270,29 @@ def nix_eval( os.unlink(attr_json.name) -def nix_eval_thread( - system: System, - attr_names: set[str], - allow: AllowedFeatures, - nix_path: str, -) -> tuple[System, list[Attr]]: - return system, nix_eval(attr_names, system, allow, nix_path) - - def multi_system_eval( attr_names_per_system: dict[System, set[str]], allow: AllowedFeatures, nix_path: str, - n_procs: int, + n_threads: int, ) -> dict[System, list[Attr]]: - nix_eval_partial = partial( - nix_eval_thread, - allow=allow, - nix_path=nix_path, - ) - - args: list[tuple[System, set[str]]] = list(attr_names_per_system.items()) - with mp.Pool(n_procs) as pool: - results: list[tuple[System, list[Attr]]] = pool.starmap(nix_eval_partial, args) - - return {system: attrs for system, attrs in results} + results: dict[System, list[Attr]] = {} + with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor: + future_to_system = { + executor.submit( + nix_eval, + attrs=attrs, + system=system, + allow=allow, + nix_path=nix_path, + ): system + for system, attrs in attr_names_per_system.items() + } + for future in concurrent.futures.as_completed(future_to_system): + system = future_to_system[future] + results[system] = future.result() + + return results def nix_build( @@ -309,7 +305,7 @@ def nix_build( build_graph: str, nix_path: str, nixpkgs_config: Path, - n_procs_eval: int, + n_threads: int, ) -> dict[System, list[Attr]]: if not attr_names_per_system: info("Nothing to be built.") @@ -319,7 +315,7 @@ def nix_build( attr_names_per_system, allow, nix_path, - n_procs=n_procs_eval, + n_threads=n_threads, ) filtered_per_system: dict[System, list[str]] = {} diff --git a/nixpkgs_review/review.py b/nixpkgs_review/review.py index 34bdc2a..9625577 100644 --- a/nixpkgs_review/review.py +++ b/nixpkgs_review/review.py @@ -1,4 +1,5 @@ import argparse +import concurrent.futures import os import subprocess import sys @@ -107,7 +108,7 @@ def __init__( skip_packages_regex: list[Pattern[str]] = [], checkout: CheckoutOption = CheckoutOption.MERGE, sandbox: bool = False, - n_procs_eval: int = 1, + num_parallel_evals: int = 1, ) -> None: self.builddir = builddir self.build_args = build_args @@ -140,7 +141,7 @@ def __init__( self.build_graph = build_graph self.nixpkgs_config = nixpkgs_config self.extra_nixpkgs_config = extra_nixpkgs_config - self.n_procs_eval = n_procs_eval + self.num_parallel_evals = num_parallel_evals def worktree_dir(self) -> str: return str(self.builddir.worktree_dir) @@ -181,20 +182,12 @@ def build_commit( self.git_worktree(base_commit) # TODO: nix-eval-jobs ? - # parallel version: returning a dict[System, list[Package]] - # base_packages = list_packages( - # self.builddir.nix_path, - # self.systems, - # self.allow, - # ) - base_packages = { - system: list_packages( - self.builddir.nix_path, - system, - self.allow, - ) - for system in self.systems - } + base_packages: dict[System, list[Package]] = list_packages( + self.builddir.nix_path, + self.systems, + self.allow, + n_threads=self.num_parallel_evals, + ) if reviewed_commit is None: self.apply_unstaged(staged) @@ -202,15 +195,13 @@ def build_commit( self.git_merge(reviewed_commit) # TODO: nix-eval-jobs ? - merged_packages = { - system: list_packages( - self.builddir.nix_path, - system, - self.allow, - check_meta=True, - ) - for system in self.systems - } + merged_packages: dict[System, list[Package]] = list_packages( + self.builddir.nix_path, + self.systems, + self.allow, + n_threads=self.num_parallel_evals, + check_meta=True, + ) # Systems ordered correctly (x86_64-linux, aarch64-linux, x86_64-darwin, aarch64-darwin) sorted_systems: list[System] = sorted( @@ -268,7 +259,7 @@ def build( self.build_graph, self.builddir.nix_path, self.nixpkgs_config, - self.n_procs_eval, + self.num_parallel_evals, ) def build_pr(self, pr_number: int) -> dict[System, list[Attr]]: @@ -419,9 +410,9 @@ def parse_packages_xml(stdout: IO[str]) -> list[Package]: return packages -def list_packages( +def _list_packages_system( + system: System, nix_path: str, - system: str, allow: AllowedFeatures, check_meta: bool = False, ) -> list[Package]: @@ -458,6 +449,32 @@ def list_packages( return parse_packages_xml(f) +def list_packages( + nix_path: str, + systems: set[System], + allow: AllowedFeatures, + n_threads: int, + check_meta: bool = False, +) -> dict[System, list[Package]]: + results: dict[System, list[Package]] = {} + with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor: + future_to_system = { + executor.submit( + _list_packages_system, + system=system, + nix_path=nix_path, + allow=allow, + check_meta=check_meta, + ): system + for system in systems + } + for future in concurrent.futures.as_completed(future_to_system): + system = future_to_system[future] + results[system] = future.result() + + return results + + def package_attrs( package_set: set[str], system: str, @@ -633,7 +650,7 @@ def review_local_revision( build_graph=args.build_graph, nixpkgs_config=nixpkgs_config, extra_nixpkgs_config=args.extra_nixpkgs_config, - n_procs_eval=args.num_procs_eval, + num_parallel_evals=args.num_parallel_evals, ) review.review_commit(builddir.path, args.branch, commit, staged, print_result) return builddir.path