Skip to content

Commit

Permalink
feat: heartbeat task&peer lookup in proc (#4179)
Browse files Browse the repository at this point in the history
* feat: herat beat task

* feat: use real flow peer allocator when building

* feat: add peer look up in ddl context

* fix: drop flow test

* refactor: per review(WIP)

* refactor: not check if is alive

* refactor: per review

* refactor: remove useless `reset`

* refactor: per bot advices

* refactor: alive peer

* chore: bot review
  • Loading branch information
discord9 authored Jun 24, 2024
1 parent 77904ad commit a44fe62
Show file tree
Hide file tree
Showing 20 changed files with 459 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::StandalonePeerLookupService;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{WalOptionsAllocator, WalOptionsAllocatorRef};
Expand Down Expand Up @@ -557,6 +558,7 @@ impl StartCommand {
table_metadata_allocator,
flow_metadata_manager,
flow_metadata_allocator,
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
},
procedure_manager,
true,
Expand Down
3 changes: 3 additions & 0 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::key::flow::FlowMetadataManagerRef;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
use crate::peer::PeerLookupServiceRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
Expand Down Expand Up @@ -118,4 +119,6 @@ pub struct DdlContext {
pub flow_metadata_manager: FlowMetadataManagerRef,
/// Allocator for flow metadata.
pub flow_metadata_allocator: FlowMetadataAllocatorRef,
/// look up peer by id.
pub peer_lookup_service: PeerLookupServiceRef,
}
3 changes: 2 additions & 1 deletion src/common/meta/src/ddl/create_flow/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ impl CreateFlowProcedure {
pub(crate) async fn allocate_flow_id(&mut self) -> Result<()> {
//TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now.
let partitions = 1;
let cluster_id = self.data.cluster_id;
let (flow_id, peers) = self
.context
.flow_metadata_allocator
.create(partitions)
.create(cluster_id, partitions)
.await?;
self.data.flow_id = Some(flow_id);
self.data.peers = peers;
Expand Down
16 changes: 11 additions & 5 deletions src/common/meta/src/ddl/drop_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,17 @@ use common_procedure::{
use common_telemetry::info;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use strum::AsRefStr;

use super::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::cache_invalidator::Context;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::error::{self, Result, UnexpectedSnafu};
use crate::flow_name::FlowName;
use crate::instruction::{CacheIdent, DropFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::lock_key::{CatalogLock, FlowLock};
use crate::peer::Peer;
use crate::rpc::ddl::DropFlowTask;
use crate::{metrics, ClusterId};

Expand Down Expand Up @@ -103,10 +102,17 @@ impl DropFlowProcedure {
let flownode_ids = &self.data.flow_info_value.as_ref().unwrap().flownode_ids;
let flow_id = self.data.task.flow_id;
let mut drop_flow_tasks = Vec::with_capacity(flownode_ids.len());
let cluster_id = self.data.cluster_id;

for flownode in flownode_ids.values() {
// TODO(weny): use the real peer.
let peer = Peer::new(*flownode, "");
let peer = self
.context
.peer_lookup_service
.flownode(cluster_id, *flownode)
.await?
.with_context(|| UnexpectedSnafu {
err_msg: "Attempted to drop flow on a node that could not be found. Consider verifying node availability.",
})?;
let requester = self.context.node_manager.flownode(&peer).await;
let request = FlowRequest {
body: Some(flow_request::Body::Drop(DropRequest {
Expand Down
26 changes: 22 additions & 4 deletions src/common/meta/src/ddl/flow_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::error::Result;
use crate::key::FlowId;
use crate::peer::Peer;
use crate::sequence::SequenceRef;
use crate::ClusterId;

/// The reference of [FlowMetadataAllocator].
pub type FlowMetadataAllocatorRef = Arc<FlowMetadataAllocator>;
Expand All @@ -42,16 +43,33 @@ impl FlowMetadataAllocator {
}
}

pub fn with_peer_allocator(
flow_id_sequence: SequenceRef,
peer_allocator: Arc<dyn PartitionPeerAllocator>,
) -> Self {
Self {
flow_id_sequence,
partition_peer_allocator: peer_allocator,
}
}

/// Allocates a the [FlowId].
pub(crate) async fn allocate_flow_id(&self) -> Result<FlowId> {
let flow_id = self.flow_id_sequence.next().await? as FlowId;
Ok(flow_id)
}

/// Allocates the [FlowId] and [Peer]s.
pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec<Peer>)> {
pub async fn create(
&self,
cluster_id: ClusterId,
partitions: usize,
) -> Result<(FlowId, Vec<Peer>)> {
let flow_id = self.allocate_flow_id().await?;
let peers = self.partition_peer_allocator.alloc(partitions).await?;
let peers = self
.partition_peer_allocator
.alloc(cluster_id, partitions)
.await?;

Ok((flow_id, peers))
}
Expand All @@ -61,7 +79,7 @@ impl FlowMetadataAllocator {
#[async_trait]
pub trait PartitionPeerAllocator: Send + Sync {
/// Allocates [Peer] nodes for storing partitions.
async fn alloc(&self, partitions: usize) -> Result<Vec<Peer>>;
async fn alloc(&self, cluster_id: ClusterId, partitions: usize) -> Result<Vec<Peer>>;
}

/// [PartitionPeerAllocatorRef] allocates [Peer]s for partitions.
Expand All @@ -71,7 +89,7 @@ struct NoopPartitionPeerAllocator;

#[async_trait]
impl PartitionPeerAllocator for NoopPartitionPeerAllocator {
async fn alloc(&self, partitions: usize) -> Result<Vec<Peer>> {
async fn alloc(&self, _cluster_id: ClusterId, partitions: usize) -> Result<Vec<Peer>> {
Ok(vec![Peer::default(); partitions])
}
}
3 changes: 2 additions & 1 deletion src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ mod tests {
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
use crate::peer::Peer;
use crate::peer::{Peer, StandalonePeerLookupService};
use crate::region_keeper::MemoryRegionKeeper;
use crate::sequence::SequenceBuilder;
use crate::state_store::KvStateStore;
Expand Down Expand Up @@ -855,6 +855,7 @@ mod tests {
flow_metadata_manager,
flow_metadata_allocator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
},
procedure_manager.clone(),
true,
Expand Down
51 changes: 51 additions & 0 deletions src/common/meta/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
// limitations under the License.

use std::fmt::{Display, Formatter};
use std::sync::Arc;

use api::v1::meta::Peer as PbPeer;
use serde::{Deserialize, Serialize};

use crate::error::Error;
use crate::{ClusterId, DatanodeId, FlownodeId};

#[derive(Debug, Default, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
pub struct Peer {
/// Node identifier. Unique in a cluster.
Expand Down Expand Up @@ -64,3 +68,50 @@ impl Display for Peer {
write!(f, "peer-{}({})", self.id, self.addr)
}
}

/// can query peer given a node id
#[async_trait::async_trait]
pub trait PeerLookupService {
async fn datanode(&self, cluster_id: ClusterId, id: DatanodeId) -> Result<Option<Peer>, Error>;
async fn flownode(&self, cluster_id: ClusterId, id: FlownodeId) -> Result<Option<Peer>, Error>;
}

pub type PeerLookupServiceRef = Arc<dyn PeerLookupService + Send + Sync>;

/// always return `Peer::new(0, "")` for any query
pub struct StandalonePeerLookupService {
default_peer: Peer,
}

impl StandalonePeerLookupService {
pub fn new() -> Self {
Self {
default_peer: Peer::new(0, ""),
}
}
}

impl Default for StandalonePeerLookupService {
fn default() -> Self {
Self::new()
}
}

#[async_trait::async_trait]
impl PeerLookupService for StandalonePeerLookupService {
async fn datanode(
&self,
_cluster_id: ClusterId,
_id: DatanodeId,
) -> Result<Option<Peer>, Error> {
Ok(Some(self.default_peer.clone()))
}

async fn flownode(
&self,
_cluster_id: ClusterId,
_id: FlownodeId,
) -> Result<Option<Peer>, Error> {
Ok(Some(self.default_peer.clone()))
}
}
3 changes: 2 additions & 1 deletion src/common/meta/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::kv_backend::KvBackendRef;
use crate::node_manager::{
Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef,
};
use crate::peer::Peer;
use crate::peer::{Peer, StandalonePeerLookupService};
use crate::region_keeper::MemoryRegionKeeper;
use crate::sequence::SequenceBuilder;
use crate::wal_options_allocator::WalOptionsAllocator;
Expand Down Expand Up @@ -180,5 +180,6 @@ pub fn new_ddl_context_with_kv_backend(
table_metadata_manager,
flow_metadata_allocator,
flow_metadata_manager,
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
}
}
1 change: 1 addition & 0 deletions src/flow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ greptime-proto.workspace = true
# otherwise it is the same with upstream repo
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
itertools.workspace = true
meta-client.workspace = true
minstant = "0.1.7"
nom = "7.1.3"
num-traits = "0.2"
Expand Down
6 changes: 4 additions & 2 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use greptime_proto::v1;
use itertools::Itertools;
use query::{QueryEngine, QueryEngineFactory};
use serde::{Deserialize, Serialize};
use servers::grpc::GrpcOptions;
use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{ConcreteDataType, RegionId};
Expand Down Expand Up @@ -78,8 +79,8 @@ pub type TableName = [String; 3];
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct FlownodeOptions {
/// rpc address
pub rpc_addr: String,
pub node_id: Option<u64>,
pub grpc: GrpcOptions,
}

/// Flownode Builder
Expand Down Expand Up @@ -497,6 +498,7 @@ impl FlownodeManager {
/// run in common_runtime background runtime
pub fn run_background(self: Arc<Self>) -> JoinHandle<()> {
info!("Starting flownode manager's background task");
// TODO(discord9): add heartbeat tasks here
common_runtime::spawn_bg(async move {
self.run().await;
})
Expand Down
Loading

0 comments on commit a44fe62

Please sign in to comment.