diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 85ddd317f96d..69795d7000af 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -315,11 +315,11 @@ impl Cluster { pub fn start_session(&mut self) -> Session { let (query_tx, mut query_rx) = mpsc::channel::(0); - let join_handle = self.client.spawn(async move { - let mut session = RisingWave::connect("frontend".into(), "dev".into()).await?; + self.client.spawn(async move { + let mut client = RisingWave::connect("frontend".into(), "dev".into()).await?; while let Some((sql, tx)) = query_rx.next().await { - let result = session + let result = client .run(&sql) .await .map(|output| match output { @@ -343,10 +343,7 @@ impl Cluster { Ok::<_, anyhow::Error>(()) }); - Session { - query_tx, - join_handle: Arc::new(join_handle), - } + Session { query_tx } } /// Run a SQL query on a **new** session of the client node. @@ -544,15 +541,11 @@ type SessionRequest = ( #[derive(Debug, Clone)] pub struct Session { query_tx: mpsc::Sender, - join_handle: Arc>>, } impl Session { /// Run the given SQL query on the session. pub async fn run(&mut self, sql: impl Into) -> Result { - if self.join_handle.is_finished() { - bail!("session is finished"); - } let (tx, rx) = oneshot::channel(); self.query_tx.send((sql.into(), tx)).await?; rx.await?