Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flow): mirror insert req to flow node #3858

Merged
merged 11 commits into from
May 6, 2024
8 changes: 4 additions & 4 deletions src/common/meta/src/key/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use snafu::{ensure, OptionExt};
use self::flow_info::FlowInfoValue;
use crate::ensure_values;
use crate::error::{self, Result};
use crate::key::flow::flow_info::FlowInfoManager;
use crate::key::flow::flow_name::FlowNameManager;
use crate::key::flow::flownode_flow::FlownodeFlowManager;
use crate::key::flow::table_flow::TableFlowManager;
use crate::key::flow::flow_info::{FlowInfoManager, FlowInfoManagerRef};
use crate::key::flow::flow_name::{FlowNameManager, FlowNameManagerRef};
use crate::key::flow::flownode_flow::{FlownodeFlowManager, FlownodeFlowManagerRef};
pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{FlowId, MetaKey};
use crate::kv_backend::txn::Txn;
Expand Down
3 changes: 3 additions & 0 deletions src/common/meta/src/key/flow/flow_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use lazy_static::lazy_static;
use regex::Regex;
Expand Down Expand Up @@ -143,6 +144,8 @@ impl FlowInfoValue {
}
}

pub type FlowInfoManagerRef = Arc<FlowInfoManager>;

/// The manager of [FlowInfoKey].
pub struct FlowInfoManager {
kv_backend: KvBackendRef,
Expand Down
4 changes: 4 additions & 0 deletions src/common/meta/src/key/flow/flow_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use api::v1::flow::flow_server::Flow;
use lazy_static::lazy_static;
use regex::Regex;
Expand Down Expand Up @@ -141,6 +143,8 @@ impl FlowNameValue {
}
}

pub type FlowNameManagerRef = Arc<FlowNameManager>;

/// The manager of [FlowNameKey].
pub struct FlowNameManager {
kv_backend: KvBackendRef,
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/src/key/flow/flownode_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ impl<'a> MetaKey<'a, FlownodeFlowKeyInner> for FlownodeFlowKeyInner {
}
}

pub type FlownodeFlowManagerRef = Arc<FlownodeFlowManager>;

/// The manager of [FlownodeFlowKey].
pub struct FlownodeFlowManager {
kv_backend: KvBackendRef,
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/src/key/flow/table_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ pub fn table_flow_decoder(kv: KeyValue) -> Result<TableFlowKey> {
TableFlowKey::from_bytes(&kv.key)
}

pub type TableFlowManagerRef = Arc<TableFlowManager>;

/// The manager of [TableFlowKey].
pub struct TableFlowManager {
kv_backend: KvBackendRef,
Expand Down
6 changes: 3 additions & 3 deletions src/common/meta/src/node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::flow::{FlowRequest, FlowResponse, InsertRequest};
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, QueryRequest, RegionRequest};
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;

Expand All @@ -40,7 +40,7 @@ pub type DatanodeRef = Arc<dyn Datanode>;
pub trait Flownode: Send + Sync {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;

async fn handle_insert(&self, request: InsertRequest) -> Result<FlowResponse>;
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
}

pub type FlownodeRef = Arc<dyn Flownode>;
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/instance/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::flow::TableFlowManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
Expand Down Expand Up @@ -101,10 +102,13 @@ impl FrontendBuilder {
let region_query_handler =
FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone());

let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend.clone()));

let inserter = Arc::new(Inserter::new(
self.catalog_manager.clone(),
partition_manager.clone(),
node_manager.clone(),
table_flow_manager,
));
let deleter = Arc::new(Deleter::new(
self.catalog_manager.clone(),
Expand Down
76 changes: 63 additions & 13 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::alter_expr::Kind;
use api::v1::region::{InsertRequests as RegionInsertRequests, RegionRequestHeader};
use api::v1::{
Expand All @@ -25,14 +26,15 @@ use catalog::CatalogManagerRef;
use client::{OutputData, OutputMeta};
use common_catalog::consts::default_engine;
use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
use common_meta::key::flow::TableFlowManagerRef;
use common_meta::node_manager::{AffectedRows, NodeManagerRef};
use common_meta::peer::Peer;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::Output;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info};
use datatypes::schema::Schema;
use futures_util::future;
use futures_util::{future, TryStreamExt};
use meter_macros::write_meter;
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
Expand All @@ -41,6 +43,7 @@ use sql::statements::insert::Insert;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
};
use store_api::storage::RegionId;
use table::requests::InsertRequest as TableInsertRequest;
use table::table_reference::TableReference;
use table::TableRef;
Expand All @@ -58,6 +61,7 @@ pub struct Inserter {
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
table_flow_manager: TableFlowManagerRef,
}

pub type InserterRef = Arc<Inserter>;
Expand All @@ -67,11 +71,13 @@ impl Inserter {
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
table_flow_manager: TableFlowManagerRef,
) -> Self {
Self {
catalog_manager,
partition_manager,
node_manager,
table_flow_manager,
}
}

Expand Down Expand Up @@ -186,6 +192,13 @@ impl Inserter {
}
}

/// Peer type enum, determine the type of peer.
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
enum PeerTyped {
Datanode(Peer),
Flownode(Peer),
}

impl Inserter {
async fn do_request(
&self,
Expand All @@ -204,16 +217,32 @@ impl Inserter {
.await?
.into_iter()
.map(|(peer, inserts)| {
let request = request_factory.build_insert(inserts);
let node_manager = self.node_manager.clone();
common_runtime::spawn_write(async move {
node_manager
.datanode(&peer)
.await
.handle(request)
.await
.context(RequestInsertsSnafu)
})
match peer {
PeerTyped::Datanode(peer) => {
let request = request_factory.build_insert(inserts);
common_runtime::spawn_write(async move {
node_manager
.datanode(&peer)
.await
.handle(request)
.await
.context(RequestInsertsSnafu)
})
}
PeerTyped::Flownode(peer) => common_runtime::spawn_write(async move {
node_manager
.flownode(&peer)
.await
.handle_inserts(inserts)
.await
.map(|flow_response| RegionResponse {
affected_rows: flow_response.affected_rows as AffectedRows,
extension: flow_response.extension,
})
.context(RequestInsertsSnafu)
}),
}
});
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;

Expand All @@ -231,16 +260,37 @@ impl Inserter {
async fn group_requests_by_peer(
&self,
requests: RegionInsertRequests,
) -> Result<HashMap<Peer, RegionInsertRequests>> {
let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
) -> Result<HashMap<PeerTyped, RegionInsertRequests>> {
let mut inserts: HashMap<PeerTyped, RegionInsertRequests> = HashMap::new();

for req in requests.requests {
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
// TODO(discord9): impl proper FlowNodeId to Peer Conversion
if let Ok(flownodes) = self
.table_flow_manager
.nodes(RegionId::from_u64(req.region_id).table_id())
.map_ok(|key| Peer::new(key.flownode_id(), ""))
.try_collect::<Vec<_>>()
.await
{
for flownode in flownodes {
inserts
.entry(PeerTyped::Flownode(flownode))
.or_default()
.requests
.push(req.clone());
}
}

let peer = self
.partition_manager
.find_region_leader(req.region_id.into())
.await
.context(FindRegionLeaderSnafu)?;
inserts.entry(peer).or_default().requests.push(req);
inserts
.entry(PeerTyped::Datanode(peer))
.or_default()
.requests
.push(req);
}

Ok(inserts)
Expand Down
Loading