Skip to content

Commit

Permalink
Wait for all Sessions during pantsd shutdown (#11929)
Browse files Browse the repository at this point in the history
As described in #11618, when `pantsd` intentionally exits due to low memory, a few types of work can be cut short:
1. if the run ends in Ctrl+C, processes that were cancelled may not have had time to be dropped before `pantsd exits.
2. async StreamingWorkunitHandler threads might still be running.

This change adds orderly-shutdown mechanisms to the `Scheduler`/`Core` to join all ongoing `Sessions` (including the SWH), and improves tests to ensure that the SWH is waited for.

Additionally, in the last commit, added purging of the `pantsd` metadata as soon as we decide to restart, which should reduce (but probably not eliminate) the incidence of item 1. from #11618. Work for #11831 will likely further harden this path.

[ci skip-build-wheels]
  • Loading branch information
stuhood authored Apr 16, 2021
1 parent c62d05d commit c443d71
Show file tree
Hide file tree
Showing 13 changed files with 175 additions and 43 deletions.
1 change: 1 addition & 0 deletions src/python/pants/engine/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ python_tests(
# Loaded reflectively as a backend in `streaming_workunit_handler_integration_test.py`.
"testprojects/pants-plugins/src/python/workunit_logger",
],
timeout=120,
)
4 changes: 3 additions & 1 deletion src/python/pants/engine/internals/native_engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def scheduler_execute(
scheduler: PyScheduler, session: PySession, execution_request: PyExecutionRequest
) -> tuple: ...
def scheduler_metrics(scheduler: PyScheduler, session: PySession) -> dict[str, int]: ...
def scheduler_shutdown(scheduler: PyScheduler, timeout_secs: int) -> None: ...
def session_new_run_id(session: PySession) -> None: ...
def session_poll_workunits(
scheduler: PyScheduler, session: PySession, max_log_verbosity_level: int
Expand All @@ -121,7 +122,7 @@ def session_get_observation_histograms(scheduler: PyScheduler, session: PySessio
def session_record_test_observation(
scheduler: PyScheduler, session: PySession, value: int
) -> None: ...
def session_isolated_shallow_clone(session: PySession) -> PySession: ...
def session_isolated_shallow_clone(session: PySession, build_id: str) -> PySession: ...
def all_counter_names() -> list[str]: ...
def graph_len(scheduler: PyScheduler) -> int: ...
def graph_visualize(scheduler: PyScheduler, session: PySession, path: str) -> None: ...
Expand Down Expand Up @@ -200,6 +201,7 @@ class PySession:
session_values: SessionValues,
cancellation_latch: PySessionCancellationLatch,
) -> None: ...
def cancel(self) -> None: ...

class PySessionCancellationLatch:
def __init__(self) -> None: ...
Expand Down
11 changes: 9 additions & 2 deletions src/python/pants/engine/internals/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ def new_session(
),
)

def shutdown(self, timeout_secs: int = 60) -> None:
native_engine.scheduler_shutdown(self.py_scheduler, timeout_secs)


class _PathGlobsAndRootCollection(Collection[PathGlobsAndRoot]):
pass
Expand Down Expand Up @@ -339,9 +342,10 @@ def py_scheduler(self) -> PyScheduler:
def py_session(self) -> PySession:
return self._py_session

def isolated_shallow_clone(self) -> SchedulerSession:
def isolated_shallow_clone(self, build_id: str) -> SchedulerSession:
return SchedulerSession(
self._scheduler, native_engine.session_isolated_shallow_clone(self._py_session)
self._scheduler,
native_engine.session_isolated_shallow_clone(self._py_session, build_id),
)

def poll_workunits(self, max_log_verbosity: LogLevel) -> PolledWorkunits:
Expand Down Expand Up @@ -614,6 +618,9 @@ def get_observation_histograms(self) -> dict:
def record_test_observation(self, value: int) -> None:
native_engine.session_record_test_observation(self.py_scheduler, self.py_session, value)

def cancel(self) -> None:
self.py_session.cancel()


def register_rules(rule_index: RuleIndex, union_membership: UnionMembership) -> PyTasks:
"""Create a native Tasks object loaded with given RuleIndex."""
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/engine/streaming_workunit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def __init__(
pantsd: bool,
max_workunit_verbosity: LogLevel = LogLevel.TRACE,
) -> None:
scheduler = scheduler.isolated_shallow_clone()
scheduler = scheduler.isolated_shallow_clone("streaming_workunit_handler_session")
self.callbacks = callbacks
self.context = StreamingWorkunitContext(
_scheduler=scheduler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,19 @@ def run(
dest = os.path.join(tmpdir, "dest.log")
normalized_args = [arg.format(tmpdir=tmpdir) for arg in args]
pants_run = run_pants(normalized_args, config=workunit_logger_config(dest))
log_content = maybe_read_file(dest)
if success:
pants_run.assert_success()
assert log_content
assert FINISHED_SUCCESSFULLY in log_content
confirm_eventual_success(dest)
else:
pants_run.assert_failure()
return pants_run, log_content
return pants_run, maybe_read_file(dest)


def confirm_eventual_success(log_dest: str) -> None:
for _ in attempts("The log should eventually show that the SWH shut down."):
content = maybe_read_file(log_dest)
if content and FINISHED_SUCCESSFULLY in content:
break


def test_list() -> None:
Expand All @@ -64,7 +69,9 @@ def test_ctrl_c() -> None:
os.kill(client_pid, signal.SIGINT)

# Confirm that finish is still called (even though it may be backgrounded in the server).
for _ in attempts("The log should eventually show that the SWH shut down."):
content = maybe_read_file(dest)
if content and FINISHED_SUCCESSFULLY in content:
break
confirm_eventual_success(dest)


def test_restart() -> None:
# Will trigger a restart
run(["--pantsd-max-memory-usage=1", "roots"])
7 changes: 6 additions & 1 deletion src/python/pants/pantsd/pants_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,14 @@ def run_sync(self):
while self._core.is_valid():
time.sleep(self.JOIN_TIMEOUT_SECONDS)

# We're exiting: join the server to avoid interrupting ongoing runs.
# We're exiting: purge our metadata to prevent new connections, then join the server
# to avoid interrupting ongoing runs.
self.purge_metadata(force=True)
self._logger.info("Waiting for ongoing runs to complete before exiting...")
native_engine.nailgun_server_await_shutdown(self._server)
# Then shutdown the PantsDaemonCore, which will shut down any live Scheduler.
self._logger.info("Waiting for Sessions to complete before exiting...")
self._core.shutdown()
self._logger.info("Exiting pantsd")


Expand Down
9 changes: 9 additions & 0 deletions src/python/pants/pantsd/pants_daemon_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,12 @@ def prepare(
self._initialize(options_fingerprint, options_bootstrapper, env)
assert self._scheduler is not None
return self._scheduler, self._options_initializer

def shutdown(self) -> None:
with self._lifecycle_lock:
if self._services is not None:
self._services.shutdown()
self._services = None
if self._scheduler is not None:
self._scheduler.scheduler.shutdown()
self._scheduler = None
1 change: 1 addition & 0 deletions src/python/pants/pantsd/service/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,4 @@ def run(self):
# Watcher failed for some reason
self._logger.critical(f"The scheduler was invalidated: {e!r}")
self.terminate()
self._scheduler_session.cancel()
1 change: 1 addition & 0 deletions src/python/pants/pantsd/service/store_gc_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,4 @@ def run(self):
except Exception as e:
self._logger.critical(f"GC failed: {e!r}")
self.terminate()
self._scheduler_session.cancel()
13 changes: 13 additions & 0 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,19 @@ impl Core {
pub fn store(&self) -> Store {
self.store.clone()
}

///
/// Shuts down this Core.
///
pub async fn shutdown(&self, timeout: Duration) {
// Shutdown the Sessions, which will prevent new work from starting and then await any ongoing
// work.
if let Err(msg) = self.sessions.shutdown(timeout).await {
log::warn!("During shutdown: {}", msg);
}
// Then clear the Graph to ensure that drop handlers run (particular for running processes).
self.graph.clear();
}
}

pub struct InvalidatableGraph(Graph<NodeKey>);
Expand Down
41 changes: 35 additions & 6 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ py_module_initializer!(native_engine, |py, m| {
m.add(
py,
"session_isolated_shallow_clone",
py_fn!(py, session_isolated_shallow_clone(a: PySession)),
py_fn!(py, session_isolated_shallow_clone(a: PySession, b: String)),
)?;

m.add(py, "all_counter_names", py_fn!(py, all_counter_names()))?;
Expand Down Expand Up @@ -398,6 +398,11 @@ py_module_initializer!(native_engine, |py, m| {
)
),
)?;
m.add(
py,
"scheduler_shutdown",
py_fn!(py, scheduler_shutdown(a: PyScheduler, b: u64)),
)?;

m.add(
py,
Expand Down Expand Up @@ -628,14 +633,19 @@ py_class!(class PySession |py| {
session_values: PyObject,
cancellation_latch: PySessionCancellationLatch,
) -> CPyResult<Self> {
Self::create_instance(py, Session::new(
let session = Session::new(
scheduler.scheduler(py),
should_render_ui,
build_id,
session_values.into(),
cancellation_latch.cancelled(py).clone(),
)
)
).map_err(|err_str| PyErr::new::<exc::Exception, _>(py, (err_str,)))?;
Self::create_instance(py, session)
}

def cancel(&self) -> PyUnitResult {
self.session(py).cancel();
Ok(None)
}
});

Expand Down Expand Up @@ -1189,6 +1199,18 @@ fn scheduler_metrics(
})
}

fn scheduler_shutdown(py: Python, scheduler_ptr: PyScheduler, timeout_secs: u64) -> PyUnitResult {
with_scheduler(py, scheduler_ptr, |scheduler| {
py.allow_threads(|| {
scheduler
.core
.executor
.block_on(scheduler.core.shutdown(Duration::from_secs(timeout_secs)));
})
});
Ok(None)
}

fn all_counter_names(_: Python) -> CPyResult<Vec<String>> {
Ok(Metric::all_metrics())
}
Expand Down Expand Up @@ -1428,9 +1450,16 @@ fn session_record_test_observation(
})
}

fn session_isolated_shallow_clone(py: Python, session_ptr: PySession) -> CPyResult<PySession> {
fn session_isolated_shallow_clone(
py: Python,
session_ptr: PySession,
build_id: String,
) -> CPyResult<PySession> {
with_session(py, session_ptr, |session| {
PySession::create_instance(py, session.isolated_shallow_clone())
let session_clone = session
.isolated_shallow_clone(build_id)
.map_err(|e| PyErr::new::<exc::Exception, _>(py, (e,)))?;
PySession::create_instance(py, session_clone)
})
}

Expand Down
Loading

0 comments on commit c443d71

Please sign in to comment.