From e971f6957f814dd09054680089135380be262e8d Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 14 May 2024 17:56:19 +0800 Subject: [PATCH] refactor: error handling --- Cargo.lock | 13 ++ src/flow/Cargo.toml | 1 + src/flow/src/adapter.rs | 7 +- src/flow/src/adapter/error.rs | 29 +++- src/flow/src/adapter/worker.rs | 251 ++++++++++++++++----------------- 5 files changed, 165 insertions(+), 136 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b183afc2600..de280f7f2004 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3528,6 +3528,18 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" +[[package]] +name = "enum-as-inner" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ffccbb6966c05b32ef8fbac435df276c4ae4d3dc55a8cd0eb9745e6c12f546a" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 2.0.61", +] + [[package]] name = "enum-iterator" version = "1.5.0" @@ -3816,6 +3828,7 @@ dependencies = [ "datafusion-common 37.0.0", "datafusion-expr 37.0.0", "datatypes", + "enum-as-inner", "enum_dispatch", "futures", "greptime-proto", diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 510c92d6e8cf..8f9c2ba8af97 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -27,6 +27,7 @@ futures = "0.3" # it is the same with upstream repo async-trait.workspace = true common-meta.workspace = true +enum-as-inner = "0.6.0" greptime-proto.workspace = true hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" } itertools.workspace = true diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index aef545438918..8179ca5807f9 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -18,13 +18,8 @@ pub(crate) mod error; pub(crate) mod node_context; -pub(crate) use node_context::FlownodeContext; +pub(crate) use node_context::{FlowId, FlownodeContext, TableName}; mod worker; pub const PER_REQ_MAX_ROW_CNT: usize = 8192; - -// TODO: refactor common types for flow to a separate module -/// FlowId is a unique identifier for a flow task -pub type FlowId = u64; -pub type TableName = [String; 3]; diff --git a/src/flow/src/adapter/error.rs b/src/flow/src/adapter/error.rs index 3cc74b900d41..2406dc5ea79d 100644 --- a/src/flow/src/adapter/error.rs +++ b/src/flow/src/adapter/error.rs @@ -24,6 +24,7 @@ use datatypes::value::Value; use servers::define_into_tonic_status; use snafu::{Location, Snafu}; +use crate::adapter::FlowId; use crate::expr::EvalError; /// This error is used to represent all possible errors that can occur in the flow module. @@ -39,7 +40,11 @@ pub enum Error { }, #[snafu(display("Internal error"))] - Internal { location: Location, reason: String }, + Internal { + reason: String, + #[snafu(implicit)] + location: Location, + }, /// TODO(discord9): add detailed location of column #[snafu(display("Failed to eval stream"))] @@ -71,6 +76,20 @@ pub enum Error { location: Location, }, + #[snafu(display("Flow not found, id={id}"))] + FlowNotFound { + id: FlowId, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Flow already exist, id={id}"))] + FlowAlreadyExist { + id: FlowId, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to join task"))] JoinTask { #[snafu(source)] @@ -168,10 +187,12 @@ impl ErrorExt for Error { Self::Eval { .. } | &Self::JoinTask { .. } | &Self::Datafusion { .. } => { StatusCode::Internal } - &Self::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists, - Self::TableNotFound { .. } | Self::TableNotFoundMeta { .. } => { - StatusCode::TableNotFound + &Self::TableAlreadyExist { .. } | Self::FlowAlreadyExist { .. } => { + StatusCode::TableAlreadyExists } + Self::TableNotFound { .. } + | Self::TableNotFoundMeta { .. } + | Self::FlowNotFound { .. } => StatusCode::TableNotFound, Self::InvalidQueryPlan { .. } | Self::InvalidQuerySubstrait { .. } | Self::InvalidQueryProst { .. } diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index ca5f06a98dba..8e7a78146783 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -15,16 +15,17 @@ //! For single-thread flow worker use std::collections::{BTreeMap, VecDeque}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use enum_as_inner::EnumAsInner; use hydroflow::scheduled::graph::Hydroflow; -use snafu::ResultExt; +use snafu::{ensure, ResultExt}; use tokio::sync::{broadcast, mpsc, Mutex}; -use crate::adapter::error::{Error, EvalSnafu}; +use crate::adapter::error::{Error, EvalSnafu, FlowAlreadyExistSnafu, InternalSnafu}; use crate::adapter::FlowId; use crate::compute::{Context, DataflowState, ErrCollector}; -use crate::expr::error::InternalSnafu; use crate::expr::GlobalId; use crate::plan::TypedPlan; use crate::repr::{self, DiffRow}; @@ -45,7 +46,6 @@ pub fn create_worker<'a>() -> (WorkerHandle, Worker<'a>) { } /// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState` - pub(crate) struct ActiveDataflowState<'subgraph> { df: Hydroflow<'subgraph>, state: DataflowState, @@ -109,50 +109,35 @@ impl WorkerHandle { /// create task, return task id /// #[allow(clippy::too_many_arguments)] - pub async fn create_flow( - &self, - task_id: FlowId, - plan: TypedPlan, - sink_id: GlobalId, - sink_sender: mpsc::UnboundedSender, - source_ids: &[GlobalId], - src_recvs: Vec>, - expire_when: Option, - create_if_not_exist: bool, - err_collector: ErrCollector, - ) -> Result, Error> { - let req = Request::Create { - task_id, - plan, - sink_id, - sink_sender, - source_ids: source_ids.to_vec(), - src_recvs, - expire_when, - create_if_not_exist, - err_collector, - }; + pub async fn create_flow(&self, create_reqs: Request) -> Result, Error> { + if !matches!(create_reqs, Request::Create { .. }) { + return InternalSnafu { + reason: format!( + "Flow Node/Worker itc failed, expect Request::Create, found {create_reqs:?}" + ), + } + .fail(); + } - let ret = self.itc_client.lock().await.call_blocking(req).await?; - if let Response::Create { - result: task_create_result, - } = ret - { - task_create_result - } else { + let ret = self + .itc_client + .lock() + .await + .call_blocking(create_reqs) + .await?; + ret.into_create().map_err(|ret| { InternalSnafu { reason: format!( "Flow Node/Worker itc failed, expect Response::Create, found {ret:?}" ), } - .fail() - .with_context(|_| EvalSnafu {}) - } + .build() + })? } /// remove task, return task id - pub async fn remove_flow(&self, task_id: FlowId) -> Result { - let req = Request::Remove { task_id }; + pub async fn remove_flow(&self, flow_id: FlowId) -> Result { + let req = Request::Remove { flow_id }; let ret = self.itc_client.lock().await.call_blocking(req).await?; if let Response::Remove { result } = ret { Ok(result) @@ -161,30 +146,23 @@ impl WorkerHandle { reason: format!("Flow Node/Worker failed, expect Response::Remove, found {ret:?}"), } .fail() - .with_context(|_| EvalSnafu {}) } } /// trigger running the worker, will not block, and will run the worker parallelly /// /// will set the current timestamp to `now` for all dataflows before running them - pub async fn run_available(&self, now: repr::Timestamp) { + pub async fn run_available(&self, now: repr::Timestamp) -> Result<(), Error> { self.itc_client .lock() .await .call_non_blocking(Request::RunAvail { now }) - .await; + .await } - pub async fn contains_flow(&self, task_id: FlowId) -> Result { - let req = Request::ContainTask { task_id }; - let ret = self - .itc_client - .lock() - .await - .call_blocking(req) - .await - .unwrap(); + pub async fn contains_flow(&self, flow_id: FlowId) -> Result { + let req = Request::ContainTask { flow_id }; + let ret = self.itc_client.lock().await.call_blocking(req).await?; if let Response::ContainTask { result: task_contain_result, } = ret @@ -197,17 +175,16 @@ impl WorkerHandle { ), } .fail() - .with_context(|_| EvalSnafu {}) } } /// shutdown the worker - pub async fn shutdown(&self) { + pub async fn shutdown(&self) -> Result<(), Error> { self.itc_client .lock() .await .call_non_blocking(Request::Shutdown) - .await; + .await } } @@ -223,7 +200,7 @@ impl<'s> Worker<'s> { #[allow(clippy::too_many_arguments)] pub fn create_flow( &mut self, - task_id: FlowId, + flow_id: FlowId, plan: TypedPlan, sink_id: GlobalId, sink_sender: mpsc::UnboundedSender, @@ -235,12 +212,12 @@ impl<'s> Worker<'s> { err_collector: ErrCollector, ) -> Result, Error> { let _ = expire_when; - if create_if_not_exist { - // check if the task already exists - if self.task_states.contains_key(&task_id) { - return Ok(None); - } - } + let already_exist = self.task_states.contains_key(&flow_id); + match (already_exist, create_if_not_exist) { + (true, true) => return Ok(None), + (true, false) => FlowAlreadyExistSnafu { id: flow_id }.fail()?, + (false, _) => (), + }; let mut cur_task_state = ActiveDataflowState::<'s> { err_collector, @@ -257,24 +234,37 @@ impl<'s> Worker<'s> { let rendered = ctx.render_plan(plan.plan)?; ctx.render_unbounded_sink(rendered, sink_sender); } - self.task_states.insert(task_id, cur_task_state); - Ok(Some(task_id)) + self.task_states.insert(flow_id, cur_task_state); + Ok(Some(flow_id)) } /// remove task, return true if a task is removed - pub fn remove_flow(&mut self, task_id: FlowId) -> bool { - self.task_states.remove(&task_id).is_some() + pub fn remove_flow(&mut self, flow_id: FlowId) -> bool { + self.task_states.remove(&flow_id).is_some() } /// Run the worker, blocking, until shutdown signal is received pub fn run(&mut self) { loop { - let (req_id, req) = self.itc_server.blocking_lock().blocking_recv().unwrap(); + let (req_id, req) = if let Some(ret) = self.itc_server.blocking_lock().blocking_recv() { + ret + } else { + common_telemetry::error!( + "Worker's itc server has been closed unexpectedly, shutting down worker now." + ); + break; + }; let ret = self.handle_req(req_id, req); match ret { Ok(Some((id, resp))) => { - self.itc_server.blocking_lock().resp(id, resp); + if let Err(err) = self.itc_server.blocking_lock().resp(id, resp) { + common_telemetry::error!( + "Worker's itc server has been closed unexpectedly, shutting down worker: {}", + err + ); + break; + }; } Ok(None) => continue, Err(()) => { @@ -287,16 +277,18 @@ impl<'s> Worker<'s> { /// run with tick acquired from tick manager(usually means system time) /// TODO(discord9): better tick management pub fn run_tick(&mut self, now: repr::Timestamp) { - for (_task_id, task_state) in self.task_states.iter_mut() { + for (_flow_id, task_state) in self.task_states.iter_mut() { task_state.set_current_ts(now); task_state.run_available(); } } /// handle request, return response if any, Err if receive shutdown signal + /// + /// return `Err(())` if receive shutdown request fn handle_req(&mut self, req_id: usize, req: Request) -> Result, ()> { let ret = match req { Request::Create { - task_id, + flow_id, plan, sink_id, sink_sender, @@ -307,7 +299,7 @@ impl<'s> Worker<'s> { err_collector, } => { let task_create_result = self.create_flow( - task_id, + flow_id, plan, sink_id, sink_sender, @@ -324,16 +316,16 @@ impl<'s> Worker<'s> { }, )) } - Request::Remove { task_id } => { - let ret = self.remove_flow(task_id); + Request::Remove { flow_id } => { + let ret = self.remove_flow(flow_id); Some((req_id, Response::Remove { result: ret })) } Request::RunAvail { now } => { self.run_tick(now); None } - Request::ContainTask { task_id } => { - let ret = self.task_states.contains_key(&task_id); + Request::ContainTask { flow_id } => { + let ret = self.task_states.contains_key(&flow_id); Some((req_id, Response::ContainTask { result: ret })) } Request::Shutdown => return Err(()), @@ -342,10 +334,10 @@ impl<'s> Worker<'s> { } } -#[derive(Debug)] -enum Request { +#[derive(Debug, EnumAsInner)] +pub enum Request { Create { - task_id: FlowId, + flow_id: FlowId, plan: TypedPlan, sink_id: GlobalId, sink_sender: mpsc::UnboundedSender, @@ -356,19 +348,19 @@ enum Request { err_collector: ErrCollector, }, Remove { - task_id: FlowId, + flow_id: FlowId, }, /// Trigger the worker to run, useful after input buffer is full RunAvail { now: repr::Timestamp, }, ContainTask { - task_id: FlowId, + flow_id: FlowId, }, Shutdown, } -#[derive(Debug)] +#[derive(Debug, EnumAsInner)] enum Response { Create { result: Result, Error>, @@ -386,7 +378,7 @@ fn create_inter_thread_call() -> (InterThreadCallClient, InterThreadCallServer) let (arg_send, arg_recv) = mpsc::unbounded_channel(); let (ret_send, ret_recv) = mpsc::unbounded_channel(); let client = InterThreadCallClient { - call_id: Arc::new(Mutex::new(0)), + call_id: AtomicUsize::new(0), arg_sender: arg_send, ret_recv, }; @@ -396,49 +388,49 @@ fn create_inter_thread_call() -> (InterThreadCallClient, InterThreadCallServer) }; (client, server) } - +type ReqId = usize; #[derive(Debug)] struct InterThreadCallClient { - call_id: Arc>, - arg_sender: mpsc::UnboundedSender<(usize, Request)>, - ret_recv: mpsc::UnboundedReceiver<(usize, Response)>, + call_id: AtomicUsize, + arg_sender: mpsc::UnboundedSender<(ReqId, Request)>, + ret_recv: mpsc::UnboundedReceiver<(ReqId, Response)>, } impl InterThreadCallClient { /// call without expecting responses or blocking - async fn call_non_blocking(&self, req: Request) { - let call_id = { - let mut call_id = self.call_id.lock().await; - *call_id += 1; - *call_id - }; - self.arg_sender.send((call_id, req)).unwrap(); + async fn call_non_blocking(&self, req: Request) -> Result<(), Error> { + // TODO(discord9): relax memory order later + let call_id = self.call_id.fetch_add(1, Ordering::SeqCst); + self.arg_sender + .send((call_id, req)) + .map_err(from_send_error) } /// call blocking, and return the result async fn call_blocking(&mut self, req: Request) -> Result { - let call_id = { - let mut call_id = self.call_id.lock().await; - *call_id += 1; - *call_id - }; - self.arg_sender.send((call_id, req)).unwrap(); - // TODO(discord9): better inter thread call impl - let (ret_call_id, ret) = self.ret_recv.recv().await.unwrap(); - if ret_call_id != call_id { - return InternalSnafu { + // TODO(discord9): relax memory order later + let call_id = self.call_id.fetch_add(1, Ordering::SeqCst); + self.arg_sender + .send((call_id, req)) + .map_err(from_send_error)?; + // TODO(discord9): better inter thread call impl, i.e. support multiple client(also consider if it's necessary) + // since one node manger might manage multiple worker, but one worker should only belong to one node manager + let (ret_call_id, ret) = self.ret_recv.recv().await.ok_or_else(||InternalSnafu { + reason: "InterThreadCallClient call_blocking failed, ret_recv has been closed and there are no remaining messages in the channel's buffer", + }.build())?; + ensure!( + ret_call_id == call_id, + InternalSnafu { reason: "call id mismatch, worker/worker handler should be in sync", } - .fail() - .with_context(|_| EvalSnafu {}); - } + ); Ok(ret) } } #[derive(Debug)] struct InterThreadCallServer { - pub arg_recv: mpsc::UnboundedReceiver<(usize, Request)>, - pub ret_sender: mpsc::UnboundedSender<(usize, Response)>, + pub arg_recv: mpsc::UnboundedReceiver<(ReqId, Request)>, + pub ret_sender: mpsc::UnboundedSender<(ReqId, Response)>, } impl InterThreadCallServer { @@ -451,9 +443,18 @@ impl InterThreadCallServer { } /// Send response back to the client - pub fn resp(&self, call_id: usize, resp: Response) { - self.ret_sender.send((call_id, resp)).unwrap(); + pub fn resp(&self, call_id: usize, resp: Response) -> Result<(), Error> { + self.ret_sender + .send((call_id, resp)) + .map_err(from_send_error) + } +} + +fn from_send_error(err: mpsc::error::SendError) -> Error { + InternalSnafu { + reason: format!("InterThreadCallServer resp failed: {}", err), } + .build() } #[cfg(test)] @@ -476,7 +477,7 @@ mod test { let src_ids = vec![GlobalId::User(1)]; let (tx, rx) = broadcast::channel::(1024); let (sink_tx, mut sink_rx) = mpsc::unbounded_channel::(); - let (task_id, plan) = ( + let (flow_id, plan) = ( 1, TypedPlan { plan: Plan::Get { @@ -485,24 +486,22 @@ mod test { typ: RelationType::new(vec![]), }, ); - handle - .create_flow( - task_id, - plan, - GlobalId::User(1), - sink_tx, - &src_ids, - vec![rx], - None, - true, - ErrCollector::default(), - ) - .await - .unwrap(); + let create_reqs = Request::Create { + flow_id, + plan, + sink_id: GlobalId::User(1), + sink_sender: sink_tx, + source_ids: src_ids, + src_recvs: vec![rx], + expire_when: None, + create_if_not_exist: true, + err_collector: ErrCollector::default(), + }; + handle.create_flow(create_reqs).await.unwrap(); tx.send((Row::empty(), 0, 0)).unwrap(); - handle.run_available(0).await; + handle.run_available(0).await.unwrap(); assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty()); - handle.shutdown().await; + handle.shutdown().await.unwrap(); worker_thread_handle.join().unwrap(); } }