From a44fe627ce6a16b703756e4ffb7e48559062b079 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Mon, 24 Jun 2024 23:06:33 +0800 Subject: [PATCH] feat: heartbeat task&peer lookup in proc (#4179) * feat: herat beat task * feat: use real flow peer allocator when building * feat: add peer look up in ddl context * fix: drop flow test * refactor: per review(WIP) * refactor: not check if is alive * refactor: per review * refactor: remove useless `reset` * refactor: per bot advices * refactor: alive peer * chore: bot review --- Cargo.lock | 1 + src/cmd/src/standalone.rs | 2 + src/common/meta/src/ddl.rs | 3 + .../meta/src/ddl/create_flow/metadata.rs | 3 +- src/common/meta/src/ddl/drop_flow.rs | 16 +- src/common/meta/src/ddl/flow_meta.rs | 26 ++- src/common/meta/src/ddl_manager.rs | 3 +- src/common/meta/src/peer.rs | 51 +++++ src/common/meta/src/test_util.rs | 3 +- src/flow/Cargo.toml | 1 + src/flow/src/adapter.rs | 6 +- src/flow/src/heartbeat.rs | 198 ++++++++++++++++++ src/flow/src/lib.rs | 3 +- src/meta-srv/src/flow_meta_alloc.rs | 55 +++++ src/meta-srv/src/lease.rs | 80 ++++++- src/meta-srv/src/lib.rs | 1 + src/meta-srv/src/metasrv.rs | 4 +- src/meta-srv/src/metasrv/builder.rs | 25 ++- src/meta-srv/src/procedure/utils.rs | 3 +- tests-integration/src/standalone.rs | 2 + 20 files changed, 459 insertions(+), 27 deletions(-) create mode 100644 src/flow/src/heartbeat.rs create mode 100644 src/meta-srv/src/flow_meta_alloc.rs diff --git a/Cargo.lock b/Cargo.lock index a6777c3c84a0..3c0924d682b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3917,6 +3917,7 @@ dependencies = [ "greptime-proto", "hydroflow", "itertools 0.10.5", + "meta-client", "minstant", "nom", "num-traits", diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index d4a9a7d46352..b48951062fc2 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -31,6 +31,7 @@ use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::NodeManagerRef; +use common_meta::peer::StandalonePeerLookupService; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; use common_meta::wal_options_allocator::{WalOptionsAllocator, WalOptionsAllocatorRef}; @@ -557,6 +558,7 @@ impl StartCommand { table_metadata_allocator, flow_metadata_manager, flow_metadata_allocator, + peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), }, procedure_manager, true, diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index c00b6df08e6b..3e9443d88d4b 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -26,6 +26,7 @@ use crate::key::flow::FlowMetadataManagerRef; use crate::key::table_route::PhysicalTableRouteValue; use crate::key::TableMetadataManagerRef; use crate::node_manager::NodeManagerRef; +use crate::peer::PeerLookupServiceRef; use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; @@ -118,4 +119,6 @@ pub struct DdlContext { pub flow_metadata_manager: FlowMetadataManagerRef, /// Allocator for flow metadata. pub flow_metadata_allocator: FlowMetadataAllocatorRef, + /// look up peer by id. + pub peer_lookup_service: PeerLookupServiceRef, } diff --git a/src/common/meta/src/ddl/create_flow/metadata.rs b/src/common/meta/src/ddl/create_flow/metadata.rs index 1681479d9173..40cf99ccc4b2 100644 --- a/src/common/meta/src/ddl/create_flow/metadata.rs +++ b/src/common/meta/src/ddl/create_flow/metadata.rs @@ -23,10 +23,11 @@ impl CreateFlowProcedure { pub(crate) async fn allocate_flow_id(&mut self) -> Result<()> { //TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now. let partitions = 1; + let cluster_id = self.data.cluster_id; let (flow_id, peers) = self .context .flow_metadata_allocator - .create(partitions) + .create(cluster_id, partitions) .await?; self.data.flow_id = Some(flow_id); self.data.peers = peers; diff --git a/src/common/meta/src/ddl/drop_flow.rs b/src/common/meta/src/ddl/drop_flow.rs index db5fa7901cbf..51b10451bcd8 100644 --- a/src/common/meta/src/ddl/drop_flow.rs +++ b/src/common/meta/src/ddl/drop_flow.rs @@ -25,18 +25,17 @@ use common_procedure::{ use common_telemetry::info; use futures::future::join_all; use serde::{Deserialize, Serialize}; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use strum::AsRefStr; use super::utils::{add_peer_context_if_needed, handle_retry_error}; use crate::cache_invalidator::Context; use crate::ddl::DdlContext; -use crate::error::{self, Result}; +use crate::error::{self, Result, UnexpectedSnafu}; use crate::flow_name::FlowName; use crate::instruction::{CacheIdent, DropFlow}; use crate::key::flow::flow_info::FlowInfoValue; use crate::lock_key::{CatalogLock, FlowLock}; -use crate::peer::Peer; use crate::rpc::ddl::DropFlowTask; use crate::{metrics, ClusterId}; @@ -103,10 +102,17 @@ impl DropFlowProcedure { let flownode_ids = &self.data.flow_info_value.as_ref().unwrap().flownode_ids; let flow_id = self.data.task.flow_id; let mut drop_flow_tasks = Vec::with_capacity(flownode_ids.len()); + let cluster_id = self.data.cluster_id; for flownode in flownode_ids.values() { - // TODO(weny): use the real peer. - let peer = Peer::new(*flownode, ""); + let peer = self + .context + .peer_lookup_service + .flownode(cluster_id, *flownode) + .await? + .with_context(|| UnexpectedSnafu { + err_msg: "Attempted to drop flow on a node that could not be found. Consider verifying node availability.", + })?; let requester = self.context.node_manager.flownode(&peer).await; let request = FlowRequest { body: Some(flow_request::Body::Drop(DropRequest { diff --git a/src/common/meta/src/ddl/flow_meta.rs b/src/common/meta/src/ddl/flow_meta.rs index d7aca9b84eaf..f92f4048822c 100644 --- a/src/common/meta/src/ddl/flow_meta.rs +++ b/src/common/meta/src/ddl/flow_meta.rs @@ -20,6 +20,7 @@ use crate::error::Result; use crate::key::FlowId; use crate::peer::Peer; use crate::sequence::SequenceRef; +use crate::ClusterId; /// The reference of [FlowMetadataAllocator]. pub type FlowMetadataAllocatorRef = Arc; @@ -42,6 +43,16 @@ impl FlowMetadataAllocator { } } + pub fn with_peer_allocator( + flow_id_sequence: SequenceRef, + peer_allocator: Arc, + ) -> Self { + Self { + flow_id_sequence, + partition_peer_allocator: peer_allocator, + } + } + /// Allocates a the [FlowId]. pub(crate) async fn allocate_flow_id(&self) -> Result { let flow_id = self.flow_id_sequence.next().await? as FlowId; @@ -49,9 +60,16 @@ impl FlowMetadataAllocator { } /// Allocates the [FlowId] and [Peer]s. - pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec)> { + pub async fn create( + &self, + cluster_id: ClusterId, + partitions: usize, + ) -> Result<(FlowId, Vec)> { let flow_id = self.allocate_flow_id().await?; - let peers = self.partition_peer_allocator.alloc(partitions).await?; + let peers = self + .partition_peer_allocator + .alloc(cluster_id, partitions) + .await?; Ok((flow_id, peers)) } @@ -61,7 +79,7 @@ impl FlowMetadataAllocator { #[async_trait] pub trait PartitionPeerAllocator: Send + Sync { /// Allocates [Peer] nodes for storing partitions. - async fn alloc(&self, partitions: usize) -> Result>; + async fn alloc(&self, cluster_id: ClusterId, partitions: usize) -> Result>; } /// [PartitionPeerAllocatorRef] allocates [Peer]s for partitions. @@ -71,7 +89,7 @@ struct NoopPartitionPeerAllocator; #[async_trait] impl PartitionPeerAllocator for NoopPartitionPeerAllocator { - async fn alloc(&self, partitions: usize) -> Result> { + async fn alloc(&self, _cluster_id: ClusterId, partitions: usize) -> Result> { Ok(vec![Peer::default(); partitions]) } } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index ba6199c73854..1228de66dab7 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -810,7 +810,7 @@ mod tests { use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager}; - use crate::peer::Peer; + use crate::peer::{Peer, StandalonePeerLookupService}; use crate::region_keeper::MemoryRegionKeeper; use crate::sequence::SequenceBuilder; use crate::state_store::KvStateStore; @@ -855,6 +855,7 @@ mod tests { flow_metadata_manager, flow_metadata_allocator, memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), + peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), }, procedure_manager.clone(), true, diff --git a/src/common/meta/src/peer.rs b/src/common/meta/src/peer.rs index 6879e55dd48f..6151bc6d3c9b 100644 --- a/src/common/meta/src/peer.rs +++ b/src/common/meta/src/peer.rs @@ -13,10 +13,14 @@ // limitations under the License. use std::fmt::{Display, Formatter}; +use std::sync::Arc; use api::v1::meta::Peer as PbPeer; use serde::{Deserialize, Serialize}; +use crate::error::Error; +use crate::{ClusterId, DatanodeId, FlownodeId}; + #[derive(Debug, Default, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)] pub struct Peer { /// Node identifier. Unique in a cluster. @@ -64,3 +68,50 @@ impl Display for Peer { write!(f, "peer-{}({})", self.id, self.addr) } } + +/// can query peer given a node id +#[async_trait::async_trait] +pub trait PeerLookupService { + async fn datanode(&self, cluster_id: ClusterId, id: DatanodeId) -> Result, Error>; + async fn flownode(&self, cluster_id: ClusterId, id: FlownodeId) -> Result, Error>; +} + +pub type PeerLookupServiceRef = Arc; + +/// always return `Peer::new(0, "")` for any query +pub struct StandalonePeerLookupService { + default_peer: Peer, +} + +impl StandalonePeerLookupService { + pub fn new() -> Self { + Self { + default_peer: Peer::new(0, ""), + } + } +} + +impl Default for StandalonePeerLookupService { + fn default() -> Self { + Self::new() + } +} + +#[async_trait::async_trait] +impl PeerLookupService for StandalonePeerLookupService { + async fn datanode( + &self, + _cluster_id: ClusterId, + _id: DatanodeId, + ) -> Result, Error> { + Ok(Some(self.default_peer.clone())) + } + + async fn flownode( + &self, + _cluster_id: ClusterId, + _id: FlownodeId, + ) -> Result, Error> { + Ok(Some(self.default_peer.clone())) + } +} diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 2abfb5f24e09..e544cececec4 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -33,7 +33,7 @@ use crate::kv_backend::KvBackendRef; use crate::node_manager::{ Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef, }; -use crate::peer::Peer; +use crate::peer::{Peer, StandalonePeerLookupService}; use crate::region_keeper::MemoryRegionKeeper; use crate::sequence::SequenceBuilder; use crate::wal_options_allocator::WalOptionsAllocator; @@ -180,5 +180,6 @@ pub fn new_ddl_context_with_kv_backend( table_metadata_manager, flow_metadata_allocator, flow_metadata_manager, + peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), } } diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 8283f0595f9b..285f8dbeec41 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -38,6 +38,7 @@ greptime-proto.workspace = true # otherwise it is the same with upstream repo hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" } itertools.workspace = true +meta-client.workspace = true minstant = "0.1.7" nom = "7.1.3" num-traits = "0.2" diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 0fd6af199101..373f1fb35558 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -34,6 +34,7 @@ use greptime_proto::v1; use itertools::Itertools; use query::{QueryEngine, QueryEngineFactory}; use serde::{Deserialize, Serialize}; +use servers::grpc::GrpcOptions; use session::context::QueryContext; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ConcreteDataType, RegionId}; @@ -78,8 +79,8 @@ pub type TableName = [String; 3]; #[derive(Clone, Default, Debug, Serialize, Deserialize)] #[serde(default)] pub struct FlownodeOptions { - /// rpc address - pub rpc_addr: String, + pub node_id: Option, + pub grpc: GrpcOptions, } /// Flownode Builder @@ -497,6 +498,7 @@ impl FlownodeManager { /// run in common_runtime background runtime pub fn run_background(self: Arc) -> JoinHandle<()> { info!("Starting flownode manager's background task"); + // TODO(discord9): add heartbeat tasks here common_runtime::spawn_bg(async move { self.run().await; }) diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs new file mode 100644 index 000000000000..4777fe9849f5 --- /dev/null +++ b/src/flow/src/heartbeat.rs @@ -0,0 +1,198 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Send heartbeat from flownode to metasrv + +use std::sync::Arc; + +use api::v1::meta::{HeartbeatRequest, Peer}; +use common_error::ext::BoxedError; +use common_meta::heartbeat::handler::{ + HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, +}; +use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage}; +use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; +use common_telemetry::{debug, error, info}; +use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; +use servers::heartbeat_options::HeartbeatOptions; +use snafu::ResultExt; +use tokio::sync::mpsc; +use tokio::time::{Duration, Instant}; + +use crate::adapter::error::ExternalSnafu; +use crate::{Error, FlownodeOptions}; + +/// The flownode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background. +#[derive(Clone)] +pub struct HeartbeatTask { + node_id: u64, + server_addr: String, + meta_client: Arc, + report_interval: Duration, + retry_interval: Duration, + resp_handler_executor: HeartbeatResponseHandlerExecutorRef, +} + +impl HeartbeatTask { + pub fn new( + opts: &FlownodeOptions, + meta_client: Arc, + heartbeat_opts: HeartbeatOptions, + resp_handler_executor: HeartbeatResponseHandlerExecutorRef, + ) -> Self { + Self { + node_id: opts.node_id.unwrap_or(0), + server_addr: opts.grpc.addr.clone(), + meta_client, + report_interval: heartbeat_opts.interval, + retry_interval: heartbeat_opts.retry_interval, + resp_handler_executor, + } + } + + pub async fn start(&self) -> Result<(), Error> { + info!("Start to establish the heartbeat connection to metasrv."); + let (req_sender, resp_stream) = self + .meta_client + .heartbeat() + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + info!("Flownode's heartbeat connection has been established with metasrv"); + + let (outgoing_tx, outgoing_rx) = mpsc::channel(16); + let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx)); + + self.start_handle_resp_stream(resp_stream, mailbox); + + self.start_heartbeat_report(req_sender, outgoing_rx); + + Ok(()) + } + + fn create_heartbeat_request( + message: OutgoingMessage, + self_peer: &Option, + ) -> Option { + match outgoing_message_to_mailbox_message(message) { + Ok(message) => { + let req = HeartbeatRequest { + mailbox_message: Some(message), + peer: self_peer.clone(), + ..Default::default() + }; + Some(req) + } + Err(e) => { + error!(e; "Failed to encode mailbox messages"); + None + } + } + } + + fn start_heartbeat_report( + &self, + req_sender: HeartbeatSender, + mut outgoing_rx: mpsc::Receiver, + ) { + let report_interval = self.report_interval; + let self_peer = Some(Peer { + id: self.node_id, + addr: self.server_addr.clone(), + }); + + common_runtime::spawn_hb(async move { + // note that using interval will cause it to first immediately send + // a heartbeat + let mut interval = tokio::time::interval(report_interval); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + + loop { + let req = tokio::select! { + message = outgoing_rx.recv() => { + if let Some(message) = message { + Self::create_heartbeat_request(message, &self_peer) + } else { + // Receives None that means Sender was dropped, we need to break the current loop + break + } + } + _ = interval.tick() => { + let req = HeartbeatRequest { + peer: self_peer.clone(), + ..Default::default() + }; + Some(req) + } + }; + + if let Some(req) = req { + if let Err(e) = req_sender.send(req.clone()).await { + error!(e; "Failed to send heartbeat to metasrv"); + break; + } else { + debug!("Send a heartbeat request to metasrv, content: {:?}", req); + } + } + } + }); + } + + fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) { + let capture_self = self.clone(); + let retry_interval = self.retry_interval; + + let _handle = common_runtime::spawn_hb(async move { + loop { + match resp_stream.message().await { + Ok(Some(resp)) => { + debug!("Receiving heartbeat response: {:?}", resp); + let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp); + if let Err(e) = capture_self.handle_response(ctx).await { + error!(e; "Error while handling heartbeat response"); + } + } + Ok(None) => break, + Err(e) => { + error!(e; "Occur error while reading heartbeat response"); + capture_self.start_with_retry(retry_interval).await; + + break; + } + } + } + }); + } + + async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<(), Error> { + self.resp_handler_executor + .handle(ctx) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu) + } + + async fn start_with_retry(&self, retry_interval: Duration) { + loop { + tokio::time::sleep(retry_interval).await; + + info!("Try to re-establish the heartbeat connection to metasrv."); + + if self.start().await.is_ok() { + break; + } + } + } +} diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index e0ebae2bd226..606ae65144ed 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -20,16 +20,17 @@ #![feature(duration_abs_diff)] #![allow(dead_code)] #![allow(unused_imports)] -#![warn(missing_docs)] #![warn(clippy::missing_docs_in_private_items)] #![warn(clippy::too_many_lines)] // allow unused for now because it should be use later mod adapter; mod compute; mod expr; +mod heartbeat; mod plan; mod repr; mod transform; mod utils; +pub use adapter::error::{Error, Result}; pub use adapter::{FlownodeBuilder, FlownodeManager, FlownodeManagerRef, FlownodeOptions}; diff --git a/src/meta-srv/src/flow_meta_alloc.rs b/src/meta-srv/src/flow_meta_alloc.rs new file mode 100644 index 000000000000..1fac6efab11a --- /dev/null +++ b/src/meta-srv/src/flow_meta_alloc.rs @@ -0,0 +1,55 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_error::ext::BoxedError; +use common_meta::ddl::flow_meta::PartitionPeerAllocator; +use common_meta::peer::Peer; +use common_meta::ClusterId; +use snafu::ResultExt; + +use crate::metasrv::{SelectorContext, SelectorRef}; +use crate::selector::SelectorOptions; + +pub struct FlowPeerAllocator { + ctx: SelectorContext, + selector: SelectorRef, +} + +impl FlowPeerAllocator { + pub fn new(ctx: SelectorContext, selector: SelectorRef) -> Self { + Self { ctx, selector } + } +} + +#[async_trait::async_trait] +impl PartitionPeerAllocator for FlowPeerAllocator { + async fn alloc( + &self, + cluster_id: ClusterId, + partitions: usize, + ) -> common_meta::error::Result> { + self.selector + .select( + cluster_id, + &self.ctx, + SelectorOptions { + min_required_items: partitions, + allow_duplication: true, + }, + ) + .await + .map_err(BoxedError::new) + .context(common_meta::error::ExternalSnafu) + } +} diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs index 70ebb10c90a2..ef28c2ed7431 100644 --- a/src/meta-srv/src/lease.rs +++ b/src/meta-srv/src/lease.rs @@ -15,10 +15,12 @@ use std::collections::HashMap; use std::hash::Hash; +use common_error::ext::BoxedError; use common_meta::kv_backend::KvBackend; -use common_meta::peer::Peer; -use common_meta::{util, ClusterId}; +use common_meta::peer::{Peer, PeerLookupService}; +use common_meta::{util, ClusterId, DatanodeId, FlownodeId}; use common_time::util as time_util; +use snafu::ResultExt; use crate::cluster::MetaPeerClientRef; use crate::error::{Error, Result}; @@ -31,9 +33,10 @@ fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool { } } -pub async fn lookup_alive_datanode_peer( +/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], will only return if it's alive under given `lease_secs` +pub async fn lookup_datanode_peer( cluster_id: ClusterId, - datanode_id: u64, + datanode_id: DatanodeId, meta_peer_client: &MetaPeerClientRef, lease_secs: u64, ) -> Result> { @@ -47,7 +50,8 @@ pub async fn lookup_alive_datanode_peer( return Ok(None); }; let lease_value: LeaseValue = kv.value.try_into()?; - if lease_filter(&lease_value) { + let is_alive = lease_filter(&lease_value); + if is_alive { Ok(Some(Peer { id: lease_key.node_id, addr: lease_value.node_addr, @@ -57,6 +61,7 @@ pub async fn lookup_alive_datanode_peer( } } +/// Find all alive datanodes pub async fn alive_datanodes( cluster_id: ClusterId, meta_peer_client: &MetaPeerClientRef, @@ -71,6 +76,36 @@ pub async fn alive_datanodes( .await } +/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], only return if it's alive under given `lease_secs` +pub async fn lookup_flownode_peer( + cluster_id: ClusterId, + flownode_id: FlownodeId, + meta_peer_client: &MetaPeerClientRef, + lease_secs: u64, +) -> Result> { + let lease_filter = build_lease_filter(lease_secs); + let lease_key = FlownodeLeaseKey { + cluster_id, + node_id: flownode_id, + }; + let lease_key_bytes: Vec = lease_key.clone().try_into()?; + let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else { + return Ok(None); + }; + let lease_value: LeaseValue = kv.value.try_into()?; + + let is_alive = lease_filter(&lease_value); + if is_alive { + Ok(Some(Peer { + id: lease_key.node_id, + addr: lease_value.node_addr, + })) + } else { + Ok(None) + } +} + +/// Find all alive flownodes pub async fn alive_flownodes( cluster_id: ClusterId, meta_peer_client: &MetaPeerClientRef, @@ -114,3 +149,38 @@ where Ok(lease_kvs) } + +#[derive(Clone)] +pub struct MetaPeerLookupService { + pub meta_peer_client: MetaPeerClientRef, +} + +impl MetaPeerLookupService { + pub fn new(meta_peer_client: MetaPeerClientRef) -> Self { + Self { meta_peer_client } + } +} + +#[async_trait::async_trait] +impl PeerLookupService for MetaPeerLookupService { + async fn datanode( + &self, + cluster_id: ClusterId, + id: DatanodeId, + ) -> common_meta::error::Result> { + lookup_datanode_peer(cluster_id, id, &self.meta_peer_client, u64::MAX) + .await + .map_err(BoxedError::new) + .context(common_meta::error::ExternalSnafu) + } + async fn flownode( + &self, + cluster_id: ClusterId, + id: FlownodeId, + ) -> common_meta::error::Result> { + lookup_flownode_peer(cluster_id, id, &self.meta_peer_client, u64::MAX) + .await + .map_err(BoxedError::new) + .context(common_meta::error::ExternalSnafu) + } +} diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 2994b1240331..c39cde24f3b0 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -24,6 +24,7 @@ pub mod cluster; pub mod election; pub mod error; mod failure_detector; +pub mod flow_meta_alloc; pub mod handler; pub mod key; pub mod lease; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 853ac3e50413..0dbc4235fb9d 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -50,7 +50,7 @@ use crate::error::{ }; use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::handler::HeartbeatHandlerGroup; -use crate::lease::lookup_alive_datanode_peer; +use crate::lease::lookup_datanode_peer; use crate::lock::DistLockRef; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; use crate::pubsub::{PublisherRef, SubscriptionManagerRef}; @@ -484,7 +484,7 @@ impl Metasrv { cluster_id: ClusterId, peer_id: u64, ) -> Result> { - lookup_alive_datanode_peer( + lookup_datanode_peer( cluster_id, peer_id, &self.meta_peer_client, diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 6f516751d5d3..8759e7e975e3 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -42,6 +42,7 @@ use super::{SelectTarget, FLOW_ID_SEQ}; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::error::{self, Result}; +use crate::flow_meta_alloc::FlowPeerAllocator; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::check_leader_handler::CheckLeaderHandler; use crate::handler::collect_cluster_info_handler::{ @@ -58,6 +59,7 @@ use crate::handler::publish_heartbeat_handler::PublishHeartbeatHandler; use crate::handler::region_lease_handler::RegionLeaseHandler; use crate::handler::response_header_handler::ResponseHeaderHandler; use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pushers}; +use crate::lease::MetaPeerLookupService; use crate::lock::memory::MemLock; use crate::lock::DistLockRef; use crate::metasrv::{ @@ -240,14 +242,26 @@ impl MetasrvBuilder { peer_allocator, )) }); - // TODO(weny): use the real allocator. - let flow_metadata_allocator = - Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(Arc::new( + let flow_metadata_allocator = { + // for now flownode just use round robin selector + let flow_selector = RoundRobinSelector::new(SelectTarget::Flownode); + let flow_selector_ctx = selector_ctx.clone(); + let peer_allocator = Arc::new(FlowPeerAllocator::new( + flow_selector_ctx, + Arc::new(flow_selector), + )); + let seq = Arc::new( SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone()) .initial(MIN_USER_FLOW_ID as u64) .step(10) .build(), - ))); + ); + + Arc::new(FlowMetadataAllocator::with_peer_allocator( + seq, + peer_allocator, + )) + }; let memory_region_keeper = Arc::new(MemoryRegionKeeper::default()); let node_manager = node_manager.unwrap_or_else(|| { let datanode_client_channel_config = ChannelConfig::new() @@ -276,6 +290,9 @@ impl MetasrvBuilder { table_metadata_allocator: table_metadata_allocator.clone(), flow_metadata_manager: flow_metadata_manager.clone(), flow_metadata_allocator: flow_metadata_allocator.clone(), + peer_lookup_service: Arc::new(MetaPeerLookupService::new( + meta_peer_client.clone(), + )), }, procedure_manager.clone(), true, diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 8ef02b411d04..2812294781a6 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -119,7 +119,7 @@ pub mod test_data { use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::node_manager::NodeManagerRef; - use common_meta::peer::Peer; + use common_meta::peer::{Peer, StandalonePeerLookupService}; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::rpc::router::RegionRoute; use common_meta::sequence::SequenceBuilder; @@ -225,6 +225,7 @@ pub mod test_data { flow_metadata_manager, flow_metadata_allocator, memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), + peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), } } } diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 35a14e261260..2e761d52a280 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -28,6 +28,7 @@ use common_meta::ddl_manager::DdlManager; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; +use common_meta::peer::StandalonePeerLookupService; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; use common_meta::wal_options_allocator::WalOptionsAllocator; @@ -197,6 +198,7 @@ impl GreptimeDbStandaloneBuilder { table_metadata_allocator, flow_metadata_manager, flow_metadata_allocator, + peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), }, procedure_manager.clone(), register_procedure_loaders,