Skip to content

Commit

Permalink
Merge pull request #422 from GaetanLepage/parallel-eval
Browse files Browse the repository at this point in the history
Feat: add basic parallel support for listing packages
  • Loading branch information
Mic92 authored Oct 9, 2024
2 parents e363f5b + 4ceaeeb commit 4049c27
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 57 deletions.
4 changes: 2 additions & 2 deletions nixpkgs_review/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,10 @@ def common_flags() -> list[CommonFlag]:
help="Extra nixpkgs config to pass to `import <nixpkgs>`",
),
CommonFlag(
"--num-procs-eval",
"--num-parallel-evals",
type=int,
default=1,
help="Number of parallel `nix-env` processes to run simultaneously (warning, can imply heavy RAM usage)",
help="Number of parallel `nix-env`/`nix eval` processes to run simultaneously (warning, can imply heavy RAM usage)",
),
]

Expand Down
2 changes: 1 addition & 1 deletion nixpkgs_review/cli/pr.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def pr_command(args: argparse.Namespace) -> str:
build_graph=args.build_graph,
nixpkgs_config=nixpkgs_config,
extra_nixpkgs_config=args.extra_nixpkgs_config,
n_procs_eval=args.num_procs_eval,
num_parallel_evals=args.num_parallel_evals,
)
contexts.append((pr, builddir.path, review.build_pr(pr)))
except NixpkgsReviewError as e:
Expand Down
46 changes: 21 additions & 25 deletions nixpkgs_review/nix.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import concurrent.futures
import json
import multiprocessing as mp
import os
import shlex
import shutil
import subprocess
from dataclasses import dataclass, field
from functools import partial
from pathlib import Path
from sys import platform
from tempfile import NamedTemporaryFile
Expand Down Expand Up @@ -271,32 +270,29 @@ def nix_eval(
os.unlink(attr_json.name)


def nix_eval_thread(
system: System,
attr_names: set[str],
allow: AllowedFeatures,
nix_path: str,
) -> tuple[System, list[Attr]]:
return system, nix_eval(attr_names, system, allow, nix_path)


def multi_system_eval(
attr_names_per_system: dict[System, set[str]],
allow: AllowedFeatures,
nix_path: str,
n_procs: int,
n_threads: int,
) -> dict[System, list[Attr]]:
nix_eval_partial = partial(
nix_eval_thread,
allow=allow,
nix_path=nix_path,
)

args: list[tuple[System, set[str]]] = list(attr_names_per_system.items())
with mp.Pool(n_procs) as pool:
results: list[tuple[System, list[Attr]]] = pool.starmap(nix_eval_partial, args)

return {system: attrs for system, attrs in results}
results: dict[System, list[Attr]] = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor:
future_to_system = {
executor.submit(
nix_eval,
attrs=attrs,
system=system,
allow=allow,
nix_path=nix_path,
): system
for system, attrs in attr_names_per_system.items()
}
for future in concurrent.futures.as_completed(future_to_system):
system = future_to_system[future]
results[system] = future.result()

return results


def nix_build(
Expand All @@ -309,7 +305,7 @@ def nix_build(
build_graph: str,
nix_path: str,
nixpkgs_config: Path,
n_procs_eval: int,
n_threads: int,
) -> dict[System, list[Attr]]:
if not attr_names_per_system:
info("Nothing to be built.")
Expand All @@ -319,7 +315,7 @@ def nix_build(
attr_names_per_system,
allow,
nix_path,
n_procs=n_procs_eval,
n_threads=n_threads,
)

filtered_per_system: dict[System, list[str]] = {}
Expand Down
75 changes: 46 additions & 29 deletions nixpkgs_review/review.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import concurrent.futures
import os
import subprocess
import sys
Expand Down Expand Up @@ -107,7 +108,7 @@ def __init__(
skip_packages_regex: list[Pattern[str]] = [],
checkout: CheckoutOption = CheckoutOption.MERGE,
sandbox: bool = False,
n_procs_eval: int = 1,
num_parallel_evals: int = 1,
) -> None:
self.builddir = builddir
self.build_args = build_args
Expand Down Expand Up @@ -140,7 +141,7 @@ def __init__(
self.build_graph = build_graph
self.nixpkgs_config = nixpkgs_config
self.extra_nixpkgs_config = extra_nixpkgs_config
self.n_procs_eval = n_procs_eval
self.num_parallel_evals = num_parallel_evals

def worktree_dir(self) -> str:
return str(self.builddir.worktree_dir)
Expand Down Expand Up @@ -181,36 +182,26 @@ def build_commit(
self.git_worktree(base_commit)

# TODO: nix-eval-jobs ?
# parallel version: returning a dict[System, list[Package]]
# base_packages = list_packages(
# self.builddir.nix_path,
# self.systems,
# self.allow,
# )
base_packages = {
system: list_packages(
self.builddir.nix_path,
system,
self.allow,
)
for system in self.systems
}
base_packages: dict[System, list[Package]] = list_packages(
self.builddir.nix_path,
self.systems,
self.allow,
n_threads=self.num_parallel_evals,
)

if reviewed_commit is None:
self.apply_unstaged(staged)
else:
self.git_merge(reviewed_commit)

# TODO: nix-eval-jobs ?
merged_packages = {
system: list_packages(
self.builddir.nix_path,
system,
self.allow,
check_meta=True,
)
for system in self.systems
}
merged_packages: dict[System, list[Package]] = list_packages(
self.builddir.nix_path,
self.systems,
self.allow,
n_threads=self.num_parallel_evals,
check_meta=True,
)

# Systems ordered correctly (x86_64-linux, aarch64-linux, x86_64-darwin, aarch64-darwin)
sorted_systems: list[System] = sorted(
Expand Down Expand Up @@ -268,7 +259,7 @@ def build(
self.build_graph,
self.builddir.nix_path,
self.nixpkgs_config,
self.n_procs_eval,
self.num_parallel_evals,
)

def build_pr(self, pr_number: int) -> dict[System, list[Attr]]:
Expand Down Expand Up @@ -419,9 +410,9 @@ def parse_packages_xml(stdout: IO[str]) -> list[Package]:
return packages


def list_packages(
def _list_packages_system(
system: System,
nix_path: str,
system: str,
allow: AllowedFeatures,
check_meta: bool = False,
) -> list[Package]:
Expand Down Expand Up @@ -458,6 +449,32 @@ def list_packages(
return parse_packages_xml(f)


def list_packages(
nix_path: str,
systems: set[System],
allow: AllowedFeatures,
n_threads: int,
check_meta: bool = False,
) -> dict[System, list[Package]]:
results: dict[System, list[Package]] = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor:
future_to_system = {
executor.submit(
_list_packages_system,
system=system,
nix_path=nix_path,
allow=allow,
check_meta=check_meta,
): system
for system in systems
}
for future in concurrent.futures.as_completed(future_to_system):
system = future_to_system[future]
results[system] = future.result()

return results


def package_attrs(
package_set: set[str],
system: str,
Expand Down Expand Up @@ -633,7 +650,7 @@ def review_local_revision(
build_graph=args.build_graph,
nixpkgs_config=nixpkgs_config,
extra_nixpkgs_config=args.extra_nixpkgs_config,
n_procs_eval=args.num_procs_eval,
num_parallel_evals=args.num_parallel_evals,
)
review.review_commit(builddir.path, args.branch, commit, staged, print_result)
return builddir.path

0 comments on commit 4049c27

Please sign in to comment.