Skip to content

Commit

Permalink
fix(streaming): drop subtask on another blocking thread (risingwavela…
Browse files Browse the repository at this point in the history
…bs#8672)

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao authored Mar 21, 2023
1 parent 83057e5 commit 16c9708
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl StreamService for StreamServiceImpl {
) -> std::result::Result<Response<DropActorsResponse>, Status> {
let req = request.into_inner();
let actors = req.actor_ids;
self.mgr.drop_actor(&actors).await?;
self.mgr.drop_actors(&actors).await?;
Ok(Response::new(DropActorsResponse {
request_id: req.request_id,
status: None,
Expand Down
21 changes: 19 additions & 2 deletions src/stream/src/executor/subtask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use tokio::sync::mpsc::error::SendError;
use tokio_stream::wrappers::ReceiverStream;

use super::actor::spawn_blocking_drop_stream;
use super::{BoxedExecutor, Executor, ExecutorInfo, MessageStreamItem};
use super::{BoxedExecutor, Executor, ExecutorInfo, Message, MessageStreamItem};
use crate::task::ActorId;

/// Handle used to drive the subtask.
pub type SubtaskHandle = impl Future<Output = ()> + Send + 'static;
Expand Down Expand Up @@ -60,7 +61,7 @@ impl Executor for SubtaskRxExecutor {
/// Used when there're multiple stateful executors in an actor. These subtasks can be concurrently
/// executed to improve the I/O performance, while the computing resource can be still bounded to a
/// single thread.
pub fn wrap(input: BoxedExecutor) -> (SubtaskHandle, SubtaskRxExecutor) {
pub fn wrap(input: BoxedExecutor, actor_id: ActorId) -> (SubtaskHandle, SubtaskRxExecutor) {
let (tx, rx) = mpsc::channel(1);
let rx_executor = SubtaskRxExecutor {
info: ExecutorInfo {
Expand All @@ -72,7 +73,18 @@ pub fn wrap(input: BoxedExecutor) -> (SubtaskHandle, SubtaskRxExecutor) {

let handle = async move {
let mut input = input.execute();

while let Some(item) = input.next().await {
// Decide whether to stop the subtask. We explicitly do this instead of relying on the
// termination of the input stream, because we don't want to exhaust the stream, which
// causes the stream dropped in the scope of the current async task and blocks other
// actors. See `spawn_blocking_drop_stream` for more details.
let to_stop = match &item {
Ok(Message::Barrier(barrier)) => barrier.is_stop_or_update_drop_actor(actor_id),
Ok(_) => false,
Err(_) => true,
};

// It's possible that the downstream itself yields an error (e.g. from remote input) and
// finishes, so we may fail to send the message. In this case, we can simply ignore the
// send error and exit as well. If the message itself is another error, log it.
Expand All @@ -85,7 +97,12 @@ pub fn wrap(input: BoxedExecutor) -> (SubtaskHandle, SubtaskRxExecutor) {
}
break;
}

if to_stop {
break;
}
}

spawn_blocking_drop_stream(input).await;
}
.instrument_await("Subtask");
Expand Down
27 changes: 15 additions & 12 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,18 +310,19 @@ impl LocalStreamManager {
Ok(())
}

pub async fn drop_actor(&self, actors: &[ActorId]) -> StreamResult<()> {
/// Drop the resources of the given actors.
pub async fn drop_actors(&self, actors: &[ActorId]) -> StreamResult<()> {
let mut core = self.core.lock().await;
for id in actors {
core.drop_actor(*id);
for &id in actors {
core.drop_actor(id);
}
tracing::debug!(actors = ?actors, "drop actors");
Ok(())
}

/// Force stop all actors on this worker.
/// Force stop all actors on this worker, and then drop their resources.
pub async fn stop_all_actors(&self) -> StreamResult<()> {
self.core.lock().await.drop_all_actors().await;
self.core.lock().await.stop_all_actors().await;
// Clear shared buffer in storage to release memory
self.clear_storage_buffer().await;
self.clear_all_senders_and_collect_rx();
Expand Down Expand Up @@ -557,7 +558,7 @@ impl LocalStreamManagerCore {

// If there're multiple stateful executors in this actor, we will wrap it into a subtask.
let executor = if has_stateful && is_stateful {
let (subtask, executor) = subtask::wrap(executor);
let (subtask, executor) = subtask::wrap(executor, actor_context.id);
subtasks.push(subtask);
executor.boxed()
} else {
Expand Down Expand Up @@ -781,14 +782,16 @@ impl LocalStreamManagerCore {
.inspect(|handle| handle.abort());
self.context.actor_infos.write().remove(&actor_id);
self.actors.remove(&actor_id);
// Task should have already stopped when this method is invoked.
self.handles
.remove(&actor_id)
.inspect(|handle| handle.abort());

// Task should have already stopped when this method is invoked. There might be some
// clean-up work left (like dropping in-memory data structures), but we don't have to wait
// for them to finish, in order to make this request non-blocking.
self.handles.remove(&actor_id);
}

/// `drop_all_actors` is invoked by meta node via RPC for recovery purpose.
async fn drop_all_actors(&mut self) {
/// `stop_all_actors` is invoked by meta node via RPC for recovery purpose. Different from the
/// `drop_actor`, the execution of the actors will be aborted.
async fn stop_all_actors(&mut self) {
for (actor_id, handle) in &self.handles {
tracing::debug!("force stopping actor {}", actor_id);
handle.abort();
Expand Down

0 comments on commit 16c9708

Please sign in to comment.