Skip to content

Commit

Permalink
Add option for async completion of StreamingWorkunitHandlers, disable…
Browse files Browse the repository at this point in the history
… by default in containers (Cherry-pick of #12392) (#12399)

When run inside of a container that exits as soon as the client returns, async completion of workunit handlers can lead to loss of metrics. Since async completion has significant performance benefits, we disable it conditionally based on whether it is likely that we are running inside of a container.

Fixes #11833.

[ci skip-rust]
[ci skip-build-wheels]
  • Loading branch information
stuhood authored Jul 22, 2021
1 parent 3293ed6 commit 699072e
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 11 deletions.
12 changes: 12 additions & 0 deletions src/python/pants/base/build_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import logging
import os
from pathlib import Path
from typing import Optional

from pants.base.build_root import BuildRoot
Expand Down Expand Up @@ -36,6 +37,17 @@ def get_default_pants_config_file() -> str:
return os.path.join(get_buildroot(), "pants.toml")


def is_in_container() -> bool:
"""Return true if this process is likely running inside of a container."""
# https://stackoverflow.com/a/49944991/38265 and https://github.com/containers/podman/issues/3586
cgroup = Path("/proc/self/cgroup")
return (
Path("/.dockerenv").exists()
or Path("/run/.containerenv").exists()
or (cgroup.exists() and "docker" in cgroup.read_text("utf-8"))
)


_Git: Optional[Git] = None


Expand Down
4 changes: 3 additions & 1 deletion src/python/pants/bin/local_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ def run(self, start_time: float) -> ExitCode:
options_bootstrapper=self.options_bootstrapper,
callbacks=self._get_workunits_callbacks(),
report_interval_seconds=global_options.streaming_workunits_report_interval,
pantsd=global_options.pantsd,
allow_async_completion=(
global_options.pantsd and global_options.streaming_workunits_complete_async
),
)
with streaming_reporter:
engine_result = PANTS_FAILED_EXIT_CODE
Expand Down
12 changes: 6 additions & 6 deletions src/python/pants/engine/internals/engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def _fixture_for_rules(
max_workunit_verbosity=max_workunit_verbosity,
specs=Specs.empty(),
options_bootstrapper=create_options_bootstrapper([]),
pantsd=False,
allow_async_completion=False,
)
return scheduler, tracker, handler

Expand Down Expand Up @@ -680,7 +680,7 @@ def test_more_complicated_engine_aware(rule_runner: RuleRunner, run_tracker: Run
max_workunit_verbosity=LogLevel.TRACE,
specs=Specs.empty(),
options_bootstrapper=create_options_bootstrapper([]),
pantsd=False,
allow_async_completion=False,
)
with handler:
input_1 = CreateDigest(
Expand Down Expand Up @@ -740,7 +740,7 @@ def test_process_digests_on_streaming_workunits(
max_workunit_verbosity=LogLevel.DEBUG,
specs=Specs.empty(),
options_bootstrapper=create_options_bootstrapper([]),
pantsd=False,
allow_async_completion=False,
)

stdout_process = Process(
Expand Down Expand Up @@ -771,7 +771,7 @@ def test_process_digests_on_streaming_workunits(
max_workunit_verbosity=LogLevel.DEBUG,
specs=Specs.empty(),
options_bootstrapper=create_options_bootstrapper([]),
pantsd=False,
allow_async_completion=False,
)
stderr_process = Process(
argv=("/bin/bash", "-c", "1>&2 /bin/echo 'stderr output'"), description="Stderr process"
Expand Down Expand Up @@ -834,7 +834,7 @@ def __call__(self, **kwargs) -> None:
max_workunit_verbosity=LogLevel.INFO,
specs=Specs.empty(),
options_bootstrapper=create_options_bootstrapper([]),
pantsd=False,
allow_async_completion=False,
)
stdout_process = Process(
argv=("/bin/bash", "-c", "/bin/echo 'stdout output'"), description="Stdout process"
Expand Down Expand Up @@ -896,7 +896,7 @@ def __call__(self, **kwargs) -> None:
options_bootstrapper=create_options_bootstrapper(
["--backend-packages=pants.backend.python"]
),
pantsd=False,
allow_async_completion=False,
)
stdout_process = Process(
argv=("/bin/bash", "-c", "/bin/echo 'stdout output'"), description="Stdout process"
Expand Down
15 changes: 11 additions & 4 deletions src/python/pants/engine/streaming_workunit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def __init__(
options_bootstrapper: OptionsBootstrapper,
specs: Specs,
report_interval_seconds: float,
pantsd: bool,
allow_async_completion: bool,
max_workunit_verbosity: LogLevel = LogLevel.TRACE,
) -> None:
scheduler = scheduler.isolated_shallow_clone("streaming_workunit_handler_session")
Expand All @@ -202,7 +202,7 @@ def __init__(
# TODO(10092) The max verbosity should be a per-client setting, rather than a global
# setting.
max_workunit_verbosity=max_workunit_verbosity,
pantsd=pantsd,
allow_async_completion=allow_async_completion,
)
if callbacks
else None
Expand All @@ -229,7 +229,7 @@ def __init__(
callbacks: Iterable[WorkunitsCallback],
report_interval: float,
max_workunit_verbosity: LogLevel,
pantsd: bool,
allow_async_completion: bool,
) -> None:
super().__init__(daemon=True)
self.scheduler = scheduler
Expand All @@ -240,7 +240,7 @@ def __init__(
self.max_workunit_verbosity = max_workunit_verbosity
# TODO: Have a thread per callback so that some callbacks can always finish async even
# if others must be finished synchronously.
self.block_until_complete = not pantsd or any(
self.block_until_complete = not allow_async_completion or any(
callback.can_finish_async is False for callback in self.callbacks
)
# Get the parent thread's logging destination. Note that this thread has not yet started
Expand Down Expand Up @@ -271,7 +271,14 @@ def run(self) -> None:
def end(self) -> None:
self.stop_request.set()
if self.block_until_complete:
logger.debug(
"Async completion is disabled: waiting for workunit callbacks to complete..."
)
super().join()
else:
logger.debug(
"Async completion is enabled: workunit callbacks will complete in the background."
)


def rules():
Expand Down
13 changes: 13 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
get_buildroot,
get_default_pants_config_file,
get_pants_cachedir,
is_in_container,
pants_version,
)
from pants.engine.environment import CompleteEnvironment
Expand Down Expand Up @@ -1347,6 +1348,18 @@ def register_options(cls, register):
advanced=True,
help="Interval in seconds between when streaming workunit event receivers will be polled.",
)
register(
"--streaming-workunits-complete-async",
advanced=True,
type=bool,
default=not is_in_container(),
help=(
"True if stats recording should be allowed to complete asynchronously when `pantsd` "
"is enabled. When `pantsd` is disabled, stats recording is always synchronous. "
"To reduce data loss, this flag defaults to false inside of containers, such as "
"when run with Docker."
),
)

@classmethod
def validate_instance(cls, opts):
Expand Down

0 comments on commit 699072e

Please sign in to comment.