diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index 44fae286815f..33db460271be 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -26,10 +26,10 @@ use snafu::{ensure, OptionExt}; use self::flow_info::FlowInfoValue; use crate::ensure_values; use crate::error::{self, Result}; -use crate::key::flow::flow_info::FlowInfoManager; -use crate::key::flow::flow_name::FlowNameManager; -use crate::key::flow::flownode_flow::FlownodeFlowManager; -use crate::key::flow::table_flow::TableFlowManager; +use crate::key::flow::flow_info::{FlowInfoManager, FlowInfoManagerRef}; +use crate::key::flow::flow_name::{FlowNameManager, FlowNameManagerRef}; +use crate::key::flow::flownode_flow::{FlownodeFlowManager, FlownodeFlowManagerRef}; +pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef}; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::key::{FlowId, MetaKey}; use crate::kv_backend::txn::Txn; @@ -306,7 +306,7 @@ mod tests { for table_id in [1024, 1025, 1026] { let nodes = flow_metadata_manager .table_flow_manager() - .nodes(table_id) + .flows(table_id) .try_collect::>() .await .unwrap(); diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index 20d0e4598780..ffa0cbad273c 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; use lazy_static::lazy_static; use regex::Regex; @@ -143,6 +144,8 @@ impl FlowInfoValue { } } +pub type FlowInfoManagerRef = Arc; + /// The manager of [FlowInfoKey]. pub struct FlowInfoManager { kv_backend: KvBackendRef, diff --git a/src/common/meta/src/key/flow/flow_name.rs b/src/common/meta/src/key/flow/flow_name.rs index 24608498c4f1..311c187dd61e 100644 --- a/src/common/meta/src/key/flow/flow_name.rs +++ b/src/common/meta/src/key/flow/flow_name.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use api::v1::flow::flow_server::Flow; use lazy_static::lazy_static; use regex::Regex; @@ -141,6 +143,8 @@ impl FlowNameValue { } } +pub type FlowNameManagerRef = Arc; + /// The manager of [FlowNameKey]. pub struct FlowNameManager { kv_backend: KvBackendRef, diff --git a/src/common/meta/src/key/flow/flownode_flow.rs b/src/common/meta/src/key/flow/flownode_flow.rs index cffafaa870c7..8bc33c3ef965 100644 --- a/src/common/meta/src/key/flow/flownode_flow.rs +++ b/src/common/meta/src/key/flow/flownode_flow.rs @@ -156,6 +156,8 @@ impl<'a> MetaKey<'a, FlownodeFlowKeyInner> for FlownodeFlowKeyInner { } } +pub type FlownodeFlowManagerRef = Arc; + /// The manager of [FlownodeFlowKey]. pub struct FlownodeFlowManager { kv_backend: KvBackendRef, diff --git a/src/common/meta/src/key/flow/table_flow.rs b/src/common/meta/src/key/flow/table_flow.rs index 9fdde5c95c8c..d9aa9cff0b56 100644 --- a/src/common/meta/src/key/flow/table_flow.rs +++ b/src/common/meta/src/key/flow/table_flow.rs @@ -176,6 +176,8 @@ pub fn table_flow_decoder(kv: KeyValue) -> Result { TableFlowKey::from_bytes(&kv.key) } +pub type TableFlowManagerRef = Arc; + /// The manager of [TableFlowKey]. pub struct TableFlowManager { kv_backend: KvBackendRef, @@ -188,7 +190,9 @@ impl TableFlowManager { } /// Retrieves all [TableFlowKey]s of the specified `table_id`. - pub fn nodes(&self, table_id: TableId) -> BoxStream<'static, Result> { + /// + /// TODO(discord9): add cache for it since range request does not support cache. + pub fn flows(&self, table_id: TableId) -> BoxStream<'static, Result> { let start_key = TableFlowKey::range_start_key(table_id); let req = RangeRequest::new().with_prefix(start_key); let stream = PaginationStream::new( diff --git a/src/common/meta/src/node_manager.rs b/src/common/meta/src/node_manager.rs index f990bef330ee..3d6bca6416b5 100644 --- a/src/common/meta/src/node_manager.rs +++ b/src/common/meta/src/node_manager.rs @@ -15,8 +15,8 @@ use std::sync::Arc; use api::region::RegionResponse; -use api::v1::flow::{FlowRequest, FlowResponse, InsertRequest}; -use api::v1::region::{QueryRequest, RegionRequest}; +use api::v1::flow::{FlowRequest, FlowResponse}; +use api::v1::region::{InsertRequests, QueryRequest, RegionRequest}; pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; @@ -40,7 +40,7 @@ pub type DatanodeRef = Arc; pub trait Flownode: Send + Sync { async fn handle(&self, request: FlowRequest) -> Result; - async fn handle_insert(&self, request: InsertRequest) -> Result; + async fn handle_inserts(&self, request: InsertRequests) -> Result; } pub type FlownodeRef = Arc; diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 719c5f33cc64..45a749a43635 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -18,6 +18,7 @@ use catalog::CatalogManagerRef; use common_base::Plugins; use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; use common_meta::ddl::ProcedureExecutorRef; +use common_meta::key::flow::TableFlowManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::NodeManagerRef; @@ -101,10 +102,13 @@ impl FrontendBuilder { let region_query_handler = FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone()); + let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend.clone())); + let inserter = Arc::new(Inserter::new( self.catalog_manager.clone(), partition_manager.clone(), node_manager.clone(), + table_flow_manager, )); let deleter = Arc::new(Deleter::new( self.catalog_manager.clone(), diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 5b2ac304d96e..63727b663f0b 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; +use api::region::RegionResponse; use api::v1::alter_expr::Kind; use api::v1::region::{InsertRequests as RegionInsertRequests, RegionRequestHeader}; use api::v1::{ @@ -25,6 +26,7 @@ use catalog::CatalogManagerRef; use client::{OutputData, OutputMeta}; use common_catalog::consts::default_engine; use common_grpc_expr::util::{extract_new_columns, ColumnExpr}; +use common_meta::key::flow::TableFlowManagerRef; use common_meta::node_manager::{AffectedRows, NodeManagerRef}; use common_meta::peer::Peer; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; @@ -32,15 +34,17 @@ use common_query::Output; use common_telemetry::tracing_context::TracingContext; use common_telemetry::{error, info}; use datatypes::schema::Schema; -use futures_util::future; +use futures_util::{future, TryStreamExt}; use meter_macros::write_meter; use partition::manager::PartitionRuleManagerRef; use session::context::QueryContextRef; use snafu::prelude::*; +use snafu::ResultExt; use sql::statements::insert::Insert; use store_api::metric_engine_consts::{ LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY, }; +use store_api::storage::{RegionId, TableId}; use table::requests::InsertRequest as TableInsertRequest; use table::table_reference::TableReference; use table::TableRef; @@ -58,6 +62,7 @@ pub struct Inserter { catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, node_manager: NodeManagerRef, + table_flow_manager: TableFlowManagerRef, } pub type InserterRef = Arc; @@ -67,11 +72,13 @@ impl Inserter { catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, node_manager: NodeManagerRef, + table_flow_manager: TableFlowManagerRef, ) -> Self { Self { catalog_manager, partition_manager, node_manager, + table_flow_manager, } } @@ -199,13 +206,34 @@ impl Inserter { ..Default::default() }); + // spawn all tasks that do job for mirror insert requests for flownode + let flow_tasks = self + .mirror_flow_node_requests(&requests) + .await? + .into_iter() + .map(|(peer, inserts)| { + let node_manager = self.node_manager.clone(); + common_runtime::spawn_write(async move { + node_manager + .flownode(&peer) + .await + .handle_inserts(inserts) + .await + .map(|flow_response| RegionResponse { + affected_rows: flow_response.affected_rows as AffectedRows, + extension: flow_response.extension, + }) + .context(RequestInsertsSnafu) + }) + }); + let tasks = self .group_requests_by_peer(requests) .await? .into_iter() .map(|(peer, inserts)| { - let request = request_factory.build_insert(inserts); let node_manager = self.node_manager.clone(); + let request = request_factory.build_insert(inserts); common_runtime::spawn_write(async move { node_manager .datanode(&peer) @@ -214,7 +242,8 @@ impl Inserter { .await .context(RequestInsertsSnafu) }) - }); + }) + .chain(flow_tasks); let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?; let affected_rows = results @@ -228,19 +257,93 @@ impl Inserter { )) } + /// Mirror requests for source table to flownode + async fn mirror_flow_node_requests( + &self, + requests: &RegionInsertRequests, + ) -> Result> { + // store partial source table requests used by flow node(only store what's used) + let mut src_table_reqs: HashMap, RegionInsertRequests)>> = + HashMap::new(); + for req in &requests.requests { + match src_table_reqs.get_mut(&RegionId::from_u64(req.region_id).table_id()) { + Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()), + // already know this is not source table + Some(None) => continue, + _ => { + let table_id = RegionId::from_u64(req.region_id).table_id(); + let peers = self + .table_flow_manager + .flows(table_id) + // TODO(discord9): determine where to store the flow node address in distributed mode + .map_ok(|key| Peer::new(key.flownode_id(), "")) + .try_collect::>() + .await + .map(|mut v| { + v.dedup(); + v + }) + .context(RequestInsertsSnafu)?; + + if !peers.is_empty() { + let mut reqs = RegionInsertRequests::default(); + reqs.requests.push(req.clone()); + src_table_reqs.insert(table_id, Some((peers, reqs))); + } else { + // insert a empty entry to avoid repeat query + src_table_reqs.insert(table_id, None); + } + } + } + } + + let mut inserts: HashMap = HashMap::new(); + + for (_table_id, (peers, reqs)) in src_table_reqs + .into_iter() + .filter_map(|(k, v)| v.map(|v| (k, v))) + { + for flownode in peers { + inserts + .entry(flownode.clone()) + .or_default() + .requests + .extend(reqs.requests.clone()); + } + } + Ok(inserts) + } + async fn group_requests_by_peer( &self, requests: RegionInsertRequests, ) -> Result> { - let mut inserts: HashMap = HashMap::new(); + // group by region ids first to reduce repeatedly call `find_region_leader` + // TODO(discord9): determine if a addition clone is worth it + let mut requests_per_region: HashMap = HashMap::new(); for req in requests.requests { + let region_id = RegionId::from_u64(req.region_id); + requests_per_region + .entry(region_id) + .or_default() + .requests + .push(req); + } + + let mut inserts: HashMap = HashMap::new(); + + for (region_id, reqs) in requests_per_region { let peer = self .partition_manager - .find_region_leader(req.region_id.into()) + .find_region_leader(region_id) .await .context(FindRegionLeaderSnafu)?; - inserts.entry(peer).or_default().requests.push(req); + inserts + .entry(peer) + .or_default() + .requests + .extend(reqs.requests); } Ok(inserts)