From 72b60acaedd7882065866a20fe701b1e2ad68f8c Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Wed, 21 Jul 2021 16:16:42 -0700 Subject: [PATCH] Add option for async completion of StreamingWorkunitHandlers, disable by default in containers (#12392) When run inside of a container that exits as soon as the client returns, async completion of workunit handlers can lead to loss of metrics. Since async completion has significant performance benefits, we disable it conditionally based on whether it is likely that we are running inside of a container. Fixes #11833. [ci skip-rust] --- src/python/pants/base/build_environment.py | 12 ++++++++++++ src/python/pants/bin/local_pants_runner.py | 4 +++- src/python/pants/engine/internals/engine_test.py | 12 ++++++------ .../pants/engine/streaming_workunit_handler.py | 15 +++++++++++---- src/python/pants/option/global_options.py | 13 +++++++++++++ 5 files changed, 45 insertions(+), 11 deletions(-) diff --git a/src/python/pants/base/build_environment.py b/src/python/pants/base/build_environment.py index 90c93a4e175..5b3c8863300 100644 --- a/src/python/pants/base/build_environment.py +++ b/src/python/pants/base/build_environment.py @@ -3,6 +3,7 @@ import logging import os +from pathlib import Path from typing import Optional from pants.base.build_root import BuildRoot @@ -36,6 +37,17 @@ def get_default_pants_config_file() -> str: return os.path.join(get_buildroot(), "pants.toml") +def is_in_container() -> bool: + """Return true if this process is likely running inside of a container.""" + # https://stackoverflow.com/a/49944991/38265 and https://github.com/containers/podman/issues/3586 + cgroup = Path("/proc/self/cgroup") + return ( + Path("/.dockerenv").exists() + or Path("/run/.containerenv").exists() + or (cgroup.exists() and "docker" in cgroup.read_text("utf-8")) + ) + + _Git: Optional[Git] = None diff --git a/src/python/pants/bin/local_pants_runner.py b/src/python/pants/bin/local_pants_runner.py index d4e4630a19d..27225cbf252 100644 --- a/src/python/pants/bin/local_pants_runner.py +++ b/src/python/pants/bin/local_pants_runner.py @@ -253,7 +253,9 @@ def run(self, start_time: float) -> ExitCode: options_bootstrapper=self.options_bootstrapper, callbacks=self._get_workunits_callbacks(), report_interval_seconds=global_options.streaming_workunits_report_interval, - pantsd=global_options.pantsd, + allow_async_completion=( + global_options.pantsd and global_options.streaming_workunits_complete_async + ), ) with streaming_reporter: engine_result = PANTS_FAILED_EXIT_CODE diff --git a/src/python/pants/engine/internals/engine_test.py b/src/python/pants/engine/internals/engine_test.py index 974e588bcd8..98673ffca38 100644 --- a/src/python/pants/engine/internals/engine_test.py +++ b/src/python/pants/engine/internals/engine_test.py @@ -314,7 +314,7 @@ def _fixture_for_rules( max_workunit_verbosity=max_workunit_verbosity, specs=Specs.empty(), options_bootstrapper=create_options_bootstrapper([]), - pantsd=False, + allow_async_completion=False, ) return scheduler, tracker, handler @@ -680,7 +680,7 @@ def test_more_complicated_engine_aware(rule_runner: RuleRunner, run_tracker: Run max_workunit_verbosity=LogLevel.TRACE, specs=Specs.empty(), options_bootstrapper=create_options_bootstrapper([]), - pantsd=False, + allow_async_completion=False, ) with handler: input_1 = CreateDigest( @@ -740,7 +740,7 @@ def test_process_digests_on_streaming_workunits( max_workunit_verbosity=LogLevel.DEBUG, specs=Specs.empty(), options_bootstrapper=create_options_bootstrapper([]), - pantsd=False, + allow_async_completion=False, ) stdout_process = Process( @@ -771,7 +771,7 @@ def test_process_digests_on_streaming_workunits( max_workunit_verbosity=LogLevel.DEBUG, specs=Specs.empty(), options_bootstrapper=create_options_bootstrapper([]), - pantsd=False, + allow_async_completion=False, ) stderr_process = Process( argv=("/bin/bash", "-c", "1>&2 /bin/echo 'stderr output'"), description="Stderr process" @@ -834,7 +834,7 @@ def __call__(self, **kwargs) -> None: max_workunit_verbosity=LogLevel.INFO, specs=Specs.empty(), options_bootstrapper=create_options_bootstrapper([]), - pantsd=False, + allow_async_completion=False, ) stdout_process = Process( argv=("/bin/bash", "-c", "/bin/echo 'stdout output'"), description="Stdout process" @@ -896,7 +896,7 @@ def __call__(self, **kwargs) -> None: options_bootstrapper=create_options_bootstrapper( ["--backend-packages=pants.backend.python"] ), - pantsd=False, + allow_async_completion=False, ) stdout_process = Process( argv=("/bin/bash", "-c", "/bin/echo 'stdout output'"), description="Stdout process" diff --git a/src/python/pants/engine/streaming_workunit_handler.py b/src/python/pants/engine/streaming_workunit_handler.py index c912f9b79ce..636fffcf674 100644 --- a/src/python/pants/engine/streaming_workunit_handler.py +++ b/src/python/pants/engine/streaming_workunit_handler.py @@ -182,7 +182,7 @@ def __init__( options_bootstrapper: OptionsBootstrapper, specs: Specs, report_interval_seconds: float, - pantsd: bool, + allow_async_completion: bool, max_workunit_verbosity: LogLevel = LogLevel.TRACE, ) -> None: scheduler = scheduler.isolated_shallow_clone("streaming_workunit_handler_session") @@ -202,7 +202,7 @@ def __init__( # TODO(10092) The max verbosity should be a per-client setting, rather than a global # setting. max_workunit_verbosity=max_workunit_verbosity, - pantsd=pantsd, + allow_async_completion=allow_async_completion, ) if callbacks else None @@ -229,7 +229,7 @@ def __init__( callbacks: Iterable[WorkunitsCallback], report_interval: float, max_workunit_verbosity: LogLevel, - pantsd: bool, + allow_async_completion: bool, ) -> None: super().__init__(daemon=True) self.scheduler = scheduler @@ -240,7 +240,7 @@ def __init__( self.max_workunit_verbosity = max_workunit_verbosity # TODO: Have a thread per callback so that some callbacks can always finish async even # if others must be finished synchronously. - self.block_until_complete = not pantsd or any( + self.block_until_complete = not allow_async_completion or any( callback.can_finish_async is False for callback in self.callbacks ) # Get the parent thread's logging destination. Note that this thread has not yet started @@ -271,7 +271,14 @@ def run(self) -> None: def end(self) -> None: self.stop_request.set() if self.block_until_complete: + logger.debug( + "Async completion is disabled: waiting for workunit callbacks to complete..." + ) super().join() + else: + logger.debug( + "Async completion is enabled: workunit callbacks will complete in the background." + ) def rules(): diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index 21798b985ba..f32c5b40a7e 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -20,6 +20,7 @@ get_buildroot, get_default_pants_config_file, get_pants_cachedir, + is_in_container, pants_version, ) from pants.engine.environment import CompleteEnvironment @@ -1347,6 +1348,18 @@ def register_options(cls, register): advanced=True, help="Interval in seconds between when streaming workunit event receivers will be polled.", ) + register( + "--streaming-workunits-complete-async", + advanced=True, + type=bool, + default=not is_in_container(), + help=( + "True if stats recording should be allowed to complete asynchronously when `pantsd` " + "is enabled. When `pantsd` is disabled, stats recording is always synchronous. " + "To reduce data loss, this flag defaults to false inside of containers, such as " + "when run with Docker." + ), + ) @classmethod def validate_instance(cls, opts):