Skip to content

Commit

Permalink
chore: Remove unnecessary spawn (risingwavelabs#8722)
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 authored Mar 23, 2023
1 parent a1d084d commit 1b008f4
Showing 1 changed file with 8 additions and 10 deletions.
18 changes: 8 additions & 10 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@

use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use std::time::Duration;

use futures::StreamExt;
use futures::{FutureExt, StreamExt};
use minitrace::prelude::*;
use parking_lot::Mutex;
use risingwave_common::array::DataChunk;
Expand Down Expand Up @@ -370,7 +371,6 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {

// Clone `self` to make compiler happy because of the move block.
let t_1 = self.clone();
let t_2 = self.clone();
// Spawn task for real execution.
let fut = async move {
trace!("Executing plan [{:?}]", task_id);
Expand All @@ -394,9 +394,9 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {

if let Some(task_metrics) = task_metrics {
let monitor = TaskMonitor::new();
let join_handle = t_2.runtime.spawn(monitor.instrument(task(task_id.clone())));
if let Err(join_error) = join_handle.await && join_error.is_panic() {
error!("Batch task {:?} panic!", task_id);
let instrumented_task = AssertUnwindSafe(monitor.instrument(task(task_id.clone())));
if let Err(error) = instrumented_task.catch_unwind().await {
error!("Batch task {:?} panic: {:?}", task_id, error);
}
let cumulative = monitor.cumulative();
let labels = &task_metrics.task_labels();
Expand Down Expand Up @@ -425,11 +425,9 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
.task_slow_poll_duration
.with_label_values(labels)
.set(cumulative.total_slow_poll_duration.as_secs_f64());
} else {
let join_handle = t_2.runtime.spawn(task(task_id.clone()));
if let Err(join_error) = join_handle.await && join_error.is_panic() {
error!("Batch task {:?} panic!", task_id);
}
} else if let Err(error) = AssertUnwindSafe(task(task_id.clone())).catch_unwind().await
{
error!("Batch task {:?} panic: {:?}", task_id, error);
}
};

Expand Down

0 comments on commit 1b008f4

Please sign in to comment.