diff --git a/src/python/pants/core/goals/fmt.py b/src/python/pants/core/goals/fmt.py index 8e2c7fa4da70..3aef75e0eb62 100644 --- a/src/python/pants/core/goals/fmt.py +++ b/src/python/pants/core/goals/fmt.py @@ -8,7 +8,7 @@ from dataclasses import dataclass from typing import TypeVar, cast -from pants.core.goals.style_request import StyleRequest +from pants.core.goals.style_request import StyleRequest, style_batch_size_help from pants.core.util_rules.source_files import SourceFiles, SourceFilesRequest from pants.engine.console import Console from pants.engine.engine_aware import EngineAwareReturnType @@ -139,7 +139,11 @@ def register_options(cls, register) -> None: removal_version="2.11.0.dev0", removal_hint=( "Formatters are now broken into multiple batches by default using the " - "`--batch-size` argument." + "`--batch-size` argument.\n" + "\n" + "To keep (roughly) this option's behavior, set [fmt].batch_size = 1. However, " + "you'll likely get better performance by using a larger batch size because of " + "reduced overhead launching processes." ), help=( "Rather than formatting all files in a single batch, format each file as a " @@ -156,20 +160,7 @@ def register_options(cls, register) -> None: advanced=True, type=int, default=128, - help=( - "The target minimum number of files that will be included in each formatter batch.\n" - "\n" - "Formatter processes are batched for a few reasons:\n" - "\n" - "1. to avoid OS argument length limits (in processes which don't support argument " - "files)\n" - "2. to support more stable cache keys than would be possible if all files were " - "operated on in a single batch.\n" - "3. to allow for parallelism in formatter processes which don't have internal " - "parallelism, or -- if they do support internal parallelism -- to improve scheduling " - "behavior when multiple processes are competing for cores and so internal " - "parallelism cannot be used perfectly.\n" - ), + help=style_batch_size_help(uppercase="Formatter", lowercase="formatter"), ) @property diff --git a/src/python/pants/core/goals/lint.py b/src/python/pants/core/goals/lint.py index 8985dd6fc6d3..609bd9c84f14 100644 --- a/src/python/pants/core/goals/lint.py +++ b/src/python/pants/core/goals/lint.py @@ -8,7 +8,7 @@ from dataclasses import dataclass from typing import Any, Iterable, cast -from pants.core.goals.style_request import StyleRequest, write_reports +from pants.core.goals.style_request import StyleRequest, style_batch_size_help, write_reports from pants.core.util_rules.distdir import DistDir from pants.engine.console import Console from pants.engine.engine_aware import EngineAwareReturnType @@ -157,7 +157,11 @@ def register_options(cls, register) -> None: removal_version="2.11.0.dev0", removal_hint=( "Linters are now broken into multiple batches by default using the " - "`--batch-size` argument." + "`--batch-size` argument.\n" + "\n" + "To keep (roughly) this option's behavior, set [lint].batch_size = 1. However, " + "you'll likely get better performance by using a larger batch size because of " + "reduced overhead launching processes." ), help=( "Rather than linting all files in a single batch, lint each file as a " @@ -174,20 +178,7 @@ def register_options(cls, register) -> None: advanced=True, type=int, default=128, - help=( - "The target minimum number of files that will be included in each linter batch.\n" - "\n" - "Linter processes are batched for a few reasons:\n" - "\n" - "1. to avoid OS argument length limits (in processes which don't support argument " - "files)\n" - "2. to support more stable cache keys than would be possible if all files were " - "operated on in a single batch.\n" - "3. to allow for parallelism in linter processes which don't have internal " - "parallelism, or -- if they do support internal parallelism -- to improve scheduling " - "behavior when multiple processes are competing for cores and so internal " - "parallelism cannot be used perfectly.\n" - ), + help=style_batch_size_help(uppercase="Linter", lowercase="linter"), ) @property @@ -235,10 +226,10 @@ def address_str(fs: FieldSet) -> str: return fs.address.spec all_batch_results = await MultiGet( - Get(LintResults, LintRequest, request.__class__(field_sets)) + Get(LintResults, LintRequest, request.__class__(field_set_batch)) for request in requests if request.field_sets - for field_sets in partition_sequentially( + for field_set_batch in partition_sequentially( request.field_sets, key=address_str, size_min=lint_subsystem.batch_size ) ) @@ -262,7 +253,7 @@ def key_fn(results: LintResults): sorted_all_batch_results, key=key_fn ) ), - key=lambda results: results.linter_name, + key=key_fn, ) ) diff --git a/src/python/pants/core/goals/style_request.py b/src/python/pants/core/goals/style_request.py index 51a7a08d4dab..0fe9ef49e19b 100644 --- a/src/python/pants/core/goals/style_request.py +++ b/src/python/pants/core/goals/style_request.py @@ -24,6 +24,23 @@ _FS = TypeVar("_FS", bound=FieldSet) +def style_batch_size_help(uppercase: str, lowercase: str) -> str: + return ( + f"The target minimum number of files that will be included in each {lowercase} batch.\n" + "\n" + f"{uppercase} processes are batched for a few reasons:\n" + "\n" + "1. to avoid OS argument length limits (in processes which don't support argument " + "files)\n" + "2. to support more stable cache keys than would be possible if all files were " + "operated on in a single batch.\n" + f"3. to allow for parallelism in {lowercase} processes which don't have internal " + "parallelism, or -- if they do support internal parallelism -- to improve scheduling " + "behavior when multiple processes are competing for cores and so internal " + "parallelism cannot be used perfectly.\n" + ) + + @frozen_after_init @dataclass(unsafe_hash=True) class StyleRequest(Generic[_FS], EngineAwareParameter, metaclass=ABCMeta): diff --git a/src/python/pants/util/collections.py b/src/python/pants/util/collections.py index c665784048a4..79d882892a4f 100644 --- a/src/python/pants/util/collections.py +++ b/src/python/pants/util/collections.py @@ -6,7 +6,7 @@ import collections import collections.abc import math -from typing import Any, Callable, Iterable, Iterator, MutableMapping, Sequence, TypeVar +from typing import Any, Callable, Iterable, Iterator, MutableMapping, TypeVar from pants.engine.internals import native_engine @@ -77,7 +77,7 @@ def ensure_str_list(val: str | Iterable[str], *, allow_single_str: bool = False) def partition_sequentially( - items: Sequence[_T], + items: Iterable[_T], *, key: Callable[[_T], str], size_min: int, @@ -95,7 +95,12 @@ def partition_sequentially( # To stably partition the arguments into ranges of at least `size_min`, we sort them, and # create a new batch sequentially once we have the minimum number of entries, _and_ we encounter # an item hash prefixed with a threshold of zeros. - zero_prefix_threshold = math.log(size_min // 8, 2) + # + # The hashes act like a (deterministic) series of rolls of an evenly distributed die. The + # probability of a hash prefixed with Z zero bits is 1/2^Z, and so to break after N items on + # average, we look for `Z == log2(N)` zero bits. + # + zero_prefix_threshold = math.log(max(4, size_min) // 4, 2) size_max = size_min * 2 if size_max is None else size_max batch: list[_T] = [] diff --git a/src/python/pants/util/collections_test.py b/src/python/pants/util/collections_test.py index fb69eab83f90..2acf605f94e0 100644 --- a/src/python/pants/util/collections_test.py +++ b/src/python/pants/util/collections_test.py @@ -12,6 +12,7 @@ assert_single_element, ensure_list, ensure_str_list, + partition_sequentially, recursively_update, ) @@ -85,3 +86,23 @@ def test_ensure_str_list() -> None: ensure_str_list(0) # type: ignore[arg-type] with pytest.raises(ValueError): ensure_str_list([0, 1]) # type: ignore[list-item] + + +@pytest.mark.parametrize("size_min", [0, 1, 16, 32, 64, 128]) +def test_partition_sequentially(size_min: int) -> None: + # Adding an item at any position in the input sequence should affect either 1 or 2 (if the added + # item becomes a boundary) buckets in the output. + + def partitioned_buckets(items: list[str]) -> set[tuple[str, ...]]: + return set(tuple(p) for p in partition_sequentially(items, key=str, size_min=size_min)) + + # We start with base items containing every other element from a sorted sequence. + all_items = sorted((f"item{i}" for i in range(0, 64))) + base_items = [item for i, item in enumerate(all_items) if i % 2 == 0] + base_partitions = partitioned_buckets(base_items) + + # Then test that adding any of the remaining items elements (which will be interspersed in the + # base items) only affects 1 or 2 buckets in the output. + for to_add in [item for i, item in enumerate(all_items) if i % 2 == 1]: + updated_partitions = partitioned_buckets([to_add, *base_items]) + assert 1 <= len(base_partitions ^ updated_partitions) <= 2