diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 0ad43df5dc74..00ac75068d9a 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -108,7 +108,7 @@ impl StreamService for StreamServiceImpl { ) -> std::result::Result, Status> { let req = request.into_inner(); let actors = req.actor_ids; - self.mgr.drop_actor(&actors).await?; + self.mgr.drop_actors(&actors).await?; Ok(Response::new(DropActorsResponse { request_id: req.request_id, status: None, diff --git a/src/stream/src/executor/subtask.rs b/src/stream/src/executor/subtask.rs index 011d495b9208..4cc5cc512d82 100644 --- a/src/stream/src/executor/subtask.rs +++ b/src/stream/src/executor/subtask.rs @@ -19,7 +19,8 @@ use tokio::sync::mpsc::error::SendError; use tokio_stream::wrappers::ReceiverStream; use super::actor::spawn_blocking_drop_stream; -use super::{BoxedExecutor, Executor, ExecutorInfo, MessageStreamItem}; +use super::{BoxedExecutor, Executor, ExecutorInfo, Message, MessageStreamItem}; +use crate::task::ActorId; /// Handle used to drive the subtask. pub type SubtaskHandle = impl Future + Send + 'static; @@ -60,7 +61,7 @@ impl Executor for SubtaskRxExecutor { /// Used when there're multiple stateful executors in an actor. These subtasks can be concurrently /// executed to improve the I/O performance, while the computing resource can be still bounded to a /// single thread. -pub fn wrap(input: BoxedExecutor) -> (SubtaskHandle, SubtaskRxExecutor) { +pub fn wrap(input: BoxedExecutor, actor_id: ActorId) -> (SubtaskHandle, SubtaskRxExecutor) { let (tx, rx) = mpsc::channel(1); let rx_executor = SubtaskRxExecutor { info: ExecutorInfo { @@ -72,7 +73,18 @@ pub fn wrap(input: BoxedExecutor) -> (SubtaskHandle, SubtaskRxExecutor) { let handle = async move { let mut input = input.execute(); + while let Some(item) = input.next().await { + // Decide whether to stop the subtask. We explicitly do this instead of relying on the + // termination of the input stream, because we don't want to exhaust the stream, which + // causes the stream dropped in the scope of the current async task and blocks other + // actors. See `spawn_blocking_drop_stream` for more details. + let to_stop = match &item { + Ok(Message::Barrier(barrier)) => barrier.is_stop_or_update_drop_actor(actor_id), + Ok(_) => false, + Err(_) => true, + }; + // It's possible that the downstream itself yields an error (e.g. from remote input) and // finishes, so we may fail to send the message. In this case, we can simply ignore the // send error and exit as well. If the message itself is another error, log it. @@ -85,7 +97,12 @@ pub fn wrap(input: BoxedExecutor) -> (SubtaskHandle, SubtaskRxExecutor) { } break; } + + if to_stop { + break; + } } + spawn_blocking_drop_stream(input).await; } .instrument_await("Subtask"); diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index c9e26b70b4e6..3edac0199086 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -310,18 +310,19 @@ impl LocalStreamManager { Ok(()) } - pub async fn drop_actor(&self, actors: &[ActorId]) -> StreamResult<()> { + /// Drop the resources of the given actors. + pub async fn drop_actors(&self, actors: &[ActorId]) -> StreamResult<()> { let mut core = self.core.lock().await; - for id in actors { - core.drop_actor(*id); + for &id in actors { + core.drop_actor(id); } tracing::debug!(actors = ?actors, "drop actors"); Ok(()) } - /// Force stop all actors on this worker. + /// Force stop all actors on this worker, and then drop their resources. pub async fn stop_all_actors(&self) -> StreamResult<()> { - self.core.lock().await.drop_all_actors().await; + self.core.lock().await.stop_all_actors().await; // Clear shared buffer in storage to release memory self.clear_storage_buffer().await; self.clear_all_senders_and_collect_rx(); @@ -557,7 +558,7 @@ impl LocalStreamManagerCore { // If there're multiple stateful executors in this actor, we will wrap it into a subtask. let executor = if has_stateful && is_stateful { - let (subtask, executor) = subtask::wrap(executor); + let (subtask, executor) = subtask::wrap(executor, actor_context.id); subtasks.push(subtask); executor.boxed() } else { @@ -781,14 +782,16 @@ impl LocalStreamManagerCore { .inspect(|handle| handle.abort()); self.context.actor_infos.write().remove(&actor_id); self.actors.remove(&actor_id); - // Task should have already stopped when this method is invoked. - self.handles - .remove(&actor_id) - .inspect(|handle| handle.abort()); + + // Task should have already stopped when this method is invoked. There might be some + // clean-up work left (like dropping in-memory data structures), but we don't have to wait + // for them to finish, in order to make this request non-blocking. + self.handles.remove(&actor_id); } - /// `drop_all_actors` is invoked by meta node via RPC for recovery purpose. - async fn drop_all_actors(&mut self) { + /// `stop_all_actors` is invoked by meta node via RPC for recovery purpose. Different from the + /// `drop_actor`, the execution of the actors will be aborted. + async fn stop_all_actors(&mut self) { for (actor_id, handle) in &self.handles { tracing::debug!("force stopping actor {}", actor_id); handle.abort();