Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flow): mirror insert req to flow node #3858

Merged
merged 11 commits into from
May 6, 2024
10 changes: 5 additions & 5 deletions src/common/meta/src/key/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use snafu::{ensure, OptionExt};
use self::flow_info::FlowInfoValue;
use crate::ensure_values;
use crate::error::{self, Result};
use crate::key::flow::flow_info::FlowInfoManager;
use crate::key::flow::flow_name::FlowNameManager;
use crate::key::flow::flownode_flow::FlownodeFlowManager;
use crate::key::flow::table_flow::TableFlowManager;
use crate::key::flow::flow_info::{FlowInfoManager, FlowInfoManagerRef};
use crate::key::flow::flow_name::{FlowNameManager, FlowNameManagerRef};
use crate::key::flow::flownode_flow::{FlownodeFlowManager, FlownodeFlowManagerRef};
pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{FlowId, MetaKey};
use crate::kv_backend::txn::Txn;
Expand Down Expand Up @@ -306,7 +306,7 @@ mod tests {
for table_id in [1024, 1025, 1026] {
let nodes = flow_metadata_manager
.table_flow_manager()
.nodes(table_id)
.flows(table_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions src/common/meta/src/key/flow/flow_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use lazy_static::lazy_static;
use regex::Regex;
Expand Down Expand Up @@ -143,6 +144,8 @@ impl FlowInfoValue {
}
}

pub type FlowInfoManagerRef = Arc<FlowInfoManager>;

/// The manager of [FlowInfoKey].
pub struct FlowInfoManager {
kv_backend: KvBackendRef,
Expand Down
4 changes: 4 additions & 0 deletions src/common/meta/src/key/flow/flow_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use api::v1::flow::flow_server::Flow;
use lazy_static::lazy_static;
use regex::Regex;
Expand Down Expand Up @@ -141,6 +143,8 @@ impl FlowNameValue {
}
}

pub type FlowNameManagerRef = Arc<FlowNameManager>;

/// The manager of [FlowNameKey].
pub struct FlowNameManager {
kv_backend: KvBackendRef,
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/src/key/flow/flownode_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ impl<'a> MetaKey<'a, FlownodeFlowKeyInner> for FlownodeFlowKeyInner {
}
}

pub type FlownodeFlowManagerRef = Arc<FlownodeFlowManager>;

/// The manager of [FlownodeFlowKey].
pub struct FlownodeFlowManager {
kv_backend: KvBackendRef,
Expand Down
6 changes: 5 additions & 1 deletion src/common/meta/src/key/flow/table_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ pub fn table_flow_decoder(kv: KeyValue) -> Result<TableFlowKey> {
TableFlowKey::from_bytes(&kv.key)
}

pub type TableFlowManagerRef = Arc<TableFlowManager>;

/// The manager of [TableFlowKey].
pub struct TableFlowManager {
kv_backend: KvBackendRef,
Expand All @@ -188,7 +190,9 @@ impl TableFlowManager {
}

/// Retrieves all [TableFlowKey]s of the specified `table_id`.
pub fn nodes(&self, table_id: TableId) -> BoxStream<'static, Result<TableFlowKey>> {
///
/// TODO(discord9): add cache for it since range request does not support cache.
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
pub fn flows(&self, table_id: TableId) -> BoxStream<'static, Result<TableFlowKey>> {
let start_key = TableFlowKey::range_start_key(table_id);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
Expand Down
6 changes: 3 additions & 3 deletions src/common/meta/src/node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::flow::{FlowRequest, FlowResponse, InsertRequest};
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, QueryRequest, RegionRequest};
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;

Expand All @@ -40,7 +40,7 @@ pub type DatanodeRef = Arc<dyn Datanode>;
pub trait Flownode: Send + Sync {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;

async fn handle_insert(&self, request: InsertRequest) -> Result<FlowResponse>;
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
}

pub type FlownodeRef = Arc<dyn Flownode>;
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/instance/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::flow::TableFlowManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
Expand Down Expand Up @@ -101,10 +102,13 @@ impl FrontendBuilder {
let region_query_handler =
FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone());

let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend.clone()));

let inserter = Arc::new(Inserter::new(
self.catalog_manager.clone(),
partition_manager.clone(),
node_manager.clone(),
table_flow_manager,
));
let deleter = Arc::new(Deleter::new(
self.catalog_manager.clone(),
Expand Down
115 changes: 109 additions & 6 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::alter_expr::Kind;
use api::v1::region::{InsertRequests as RegionInsertRequests, RegionRequestHeader};
use api::v1::{
Expand All @@ -25,22 +26,25 @@ use catalog::CatalogManagerRef;
use client::{OutputData, OutputMeta};
use common_catalog::consts::default_engine;
use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
use common_meta::key::flow::TableFlowManagerRef;
use common_meta::node_manager::{AffectedRows, NodeManagerRef};
use common_meta::peer::Peer;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::Output;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info};
use datatypes::schema::Schema;
use futures_util::future;
use futures_util::{future, TryStreamExt};
use meter_macros::write_meter;
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
use snafu::prelude::*;
use snafu::ResultExt;
use sql::statements::insert::Insert;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
};
use store_api::storage::{RegionId, TableId};
use table::requests::InsertRequest as TableInsertRequest;
use table::table_reference::TableReference;
use table::TableRef;
Expand All @@ -58,6 +62,7 @@ pub struct Inserter {
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
table_flow_manager: TableFlowManagerRef,
}

pub type InserterRef = Arc<Inserter>;
Expand All @@ -67,11 +72,13 @@ impl Inserter {
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
table_flow_manager: TableFlowManagerRef,
) -> Self {
Self {
catalog_manager,
partition_manager,
node_manager,
table_flow_manager,
}
}

Expand Down Expand Up @@ -199,13 +206,34 @@ impl Inserter {
..Default::default()
});

// spawn all tasks that do job for mirror insert requests for flownode
let flow_tasks = self
.mirror_flow_node_requests(&requests)
.await?
.into_iter()
.map(|(peer, inserts)| {
let node_manager = self.node_manager.clone();
common_runtime::spawn_write(async move {
node_manager
.flownode(&peer)
.await
.handle_inserts(inserts)
.await
.map(|flow_response| RegionResponse {
affected_rows: flow_response.affected_rows as AffectedRows,
extension: flow_response.extension,
})
.context(RequestInsertsSnafu)
})
});

let tasks = self
.group_requests_by_peer(requests)
.await?
.into_iter()
.map(|(peer, inserts)| {
let request = request_factory.build_insert(inserts);
let node_manager = self.node_manager.clone();
let request = request_factory.build_insert(inserts);
common_runtime::spawn_write(async move {
node_manager
.datanode(&peer)
Expand All @@ -214,7 +242,8 @@ impl Inserter {
.await
.context(RequestInsertsSnafu)
})
});
})
.chain(flow_tasks);
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;

let affected_rows = results
Expand All @@ -228,19 +257,93 @@ impl Inserter {
))
}

/// Mirror requests for source table to flownode
async fn mirror_flow_node_requests(
&self,
requests: &RegionInsertRequests,
) -> Result<HashMap<Peer, RegionInsertRequests>> {
// store partial source table requests used by flow node(only store what's used)
let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
HashMap::new();
for req in &requests.requests {
match src_table_reqs.get_mut(&RegionId::from_u64(req.region_id).table_id()) {
Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
// already know this is not source table
Some(None) => continue,
_ => {
let table_id = RegionId::from_u64(req.region_id).table_id();
let peers = self
.table_flow_manager
.flows(table_id)
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
// TODO(discord9): determine where to store the flow node address in distributed mode
.map_ok(|key| Peer::new(key.flownode_id(), ""))
.try_collect::<Vec<_>>()
.await
.map(|mut v| {
v.dedup();
v
})
.context(RequestInsertsSnafu)?;

if !peers.is_empty() {
let mut reqs = RegionInsertRequests::default();
reqs.requests.push(req.clone());
src_table_reqs.insert(table_id, Some((peers, reqs)));
} else {
// insert a empty entry to avoid repeat query
src_table_reqs.insert(table_id, None);
}
}
}
}

let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();

for (_table_id, (peers, reqs)) in src_table_reqs
.into_iter()
.filter_map(|(k, v)| v.map(|v| (k, v)))
{
for flownode in peers {
inserts
.entry(flownode.clone())
.or_default()
.requests
.extend(reqs.requests.clone());
}
}
Ok(inserts)
}

async fn group_requests_by_peer(
&self,
requests: RegionInsertRequests,
) -> Result<HashMap<Peer, RegionInsertRequests>> {
let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
// group by region ids first to reduce repeatedly call `find_region_leader`
// TODO(discord9): determine if a addition clone is worth it
let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();

for req in requests.requests {
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
let region_id = RegionId::from_u64(req.region_id);
requests_per_region
.entry(region_id)
.or_default()
.requests
.push(req);
}

let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();

for (region_id, reqs) in requests_per_region {
let peer = self
.partition_manager
.find_region_leader(req.region_id.into())
.find_region_leader(region_id)
.await
.context(FindRegionLeaderSnafu)?;
inserts.entry(peer).or_default().requests.push(req);
inserts
.entry(peer)
.or_default()
.requests
.extend(reqs.requests);
}

Ok(inserts)
Expand Down
Loading