Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: get metasrv clusterinfo #3696

Merged
merged 10 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b97efbf92a0bf9abcfa1d8fe0ffe8741a2e7309e" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "8da84a04b137c4104262459807eab1c04b92f3cc" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
4 changes: 3 additions & 1 deletion src/common/meta/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ pub trait ClusterInfo {
}

/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-{cluster_id}-{role}-{node_id}`.
/// This key cannot be used to describe the `Metasrv` because the `Metasrv` does not have
/// a `cluster_id`, it serves multiple clusters.
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct NodeInfoKey {
/// The cluster id.
pub cluster_id: u64,
/// The role of the node. It can be [Role::Datanode], [Role::Frontend], or [Role::Metasrv].
/// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`.
pub role: Role,
/// The node id.
pub node_id: u64,
Expand Down
83 changes: 72 additions & 11 deletions src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ use api::v1::meta::Role;
use cluster::Client as ClusterClient;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cluster::{
ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
};
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
Expand All @@ -46,8 +49,9 @@ use snafu::{OptionExt, ResultExt};
use store::Client as StoreClient;

pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
use crate::error;
use crate::error::{ConvertMetaResponseSnafu, Result};
use crate::error::{
ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, NotStartedSnafu, Result,
};

pub type Id = (u64, u64);

Expand Down Expand Up @@ -239,6 +243,57 @@ impl ProcedureExecutor for MetaClient {
}
}

#[async_trait::async_trait]
impl ClusterInfo for MetaClient {
type Error = Error;

async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
let cluster_client = self.cluster_client()?;

let (get_metasrv_nodes, nodes_key_prefix) = match role {
None => (
true,
Some(NodeInfoKey::key_prefix_with_cluster_id(self.id.0)),
),
Some(ClusterRole::Metasrv) => (true, None),
Some(role) => (
false,
Some(NodeInfoKey::key_prefix_with_role(self.id.0, role)),
),
};

let mut nodes = if get_metasrv_nodes {
let last_activity_ts = -1; // Metasrv does not provide this information.
let (leader, followers) = cluster_client.get_metasrv_peers().await?;
followers
.into_iter()
.map(|peer| NodeInfo {
peer,
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
})
.chain(leader.into_iter().map(|leader| NodeInfo {
peer: leader,
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
}))
.collect::<Vec<_>>()
} else {
Vec::new()
};

if let Some(prefix) = nodes_key_prefix {
let req = RangeRequest::new().with_prefix(prefix);
let res = cluster_client.range(req).await?;
for kv in res.kvs {
nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
}
}

Ok(nodes)
}
}

impl MetaClient {
pub fn new(id: Id) -> Self {
Self {
Expand Down Expand Up @@ -405,40 +460,46 @@ impl MetaClient {
) -> Result<SubmitDdlTaskResponse> {
let res = self
.procedure_client()?
.submit_ddl_task(req.try_into().context(error::ConvertMetaRequestSnafu)?)
.submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
.await?
.try_into()
.context(error::ConvertMetaResponseSnafu)?;
.context(ConvertMetaResponseSnafu)?;

Ok(res)
}

#[inline]
pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
self.heartbeat.clone().context(error::NotStartedSnafu {
self.heartbeat.clone().context(NotStartedSnafu {
name: "heartbeat_client",
})
}

#[inline]
pub fn store_client(&self) -> Result<StoreClient> {
self.store.clone().context(error::NotStartedSnafu {
self.store.clone().context(NotStartedSnafu {
name: "store_client",
})
}

#[inline]
pub fn lock_client(&self) -> Result<LockClient> {
self.lock.clone().context(error::NotStartedSnafu {
self.lock.clone().context(NotStartedSnafu {
name: "lock_client",
})
}

#[inline]
pub fn procedure_client(&self) -> Result<ProcedureClient> {
self.procedure
.clone()
.context(error::NotStartedSnafu { name: "ddl_client" })
self.procedure.clone().context(NotStartedSnafu {
name: "procedure_client",
})
}

pub fn cluster_client(&self) -> Result<ClusterClient> {
self.cluster.clone().context(NotStartedSnafu {
name: "cluster_client",
})
}

#[inline]
Expand All @@ -460,7 +521,7 @@ mod tests {
use meta_srv::Result as MetaResult;

use super::*;
use crate::mocks;
use crate::{error, mocks};

const TEST_KEY_PREFIX: &str = "__unit_test__meta__";

Expand Down
51 changes: 28 additions & 23 deletions src/meta-client/src/client/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ use std::sync::Arc;

use api::greptime_proto::v1;
use api::v1::meta::cluster_client::ClusterClient;
use api::v1::meta::{ResponseHeader, Role};
use api::v1::meta::{MetasrvPeersRequest, ResponseHeader, Role};
use common_grpc::channel_manager::ChannelManager;
use common_meta::cluster;
use common_meta::cluster::{ClusterInfo, NodeInfo, NodeInfoKey};
use common_meta::peer::Peer;
use common_meta::rpc::store::{BatchGetRequest, BatchGetResponse, RangeRequest, RangeResponse};
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
Expand Down Expand Up @@ -72,27 +71,10 @@ impl Client {
let inner = self.inner.read().await;
inner.batch_get(req).await
}
}

#[async_trait::async_trait]
impl ClusterInfo for Client {
type Error = Error;

async fn list_nodes(&self, role: Option<cluster::Role>) -> Result<Vec<NodeInfo>> {
let cluster_id = self.inner.read().await.id.0;
let key_prefix = match role {
None => NodeInfoKey::key_prefix_with_cluster_id(cluster_id),
Some(role) => NodeInfoKey::key_prefix_with_role(cluster_id, role),
};

let req = RangeRequest::new().with_prefix(key_prefix);

let res = self.range(req).await?;

fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
res.kvs
.into_iter()
.map(|kv| NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu))
.collect::<Result<Vec<_>>>()
pub async fn get_metasrv_peers(&self) -> Result<(Option<Peer>, Vec<Peer>)> {
let inner = self.inner.read().await;
inner.get_metasrv_peers().await
}
}

Expand Down Expand Up @@ -239,4 +221,27 @@ impl Inner {
.try_into()
.context(ConvertMetaResponseSnafu)
}

async fn get_metasrv_peers(&self) -> Result<(Option<Peer>, Vec<Peer>)> {
self.with_retry(
"get_metasrv_peers",
move |mut client| {
let inner_req = tonic::Request::new(MetasrvPeersRequest::default());

async move {
client
.metasrv_peers(inner_req)
.await
.map(|res| res.into_inner())
}
},
|res| &res.header,
)
.await
.map(|res| {
let leader = res.leader.map(|x| x.into());
let peers = res.followers.into_iter().map(|x| x.into()).collect();
(leader, peers)
})
}
}
8 changes: 8 additions & 0 deletions src/meta-srv/src/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use etcd_client::LeaderKey;
use tokio::sync::broadcast::Receiver;

use crate::error::Result;
use crate::metasrv::LeaderValue;

pub const ELECTION_KEY: &str = "__metasrv_election";
pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";

#[derive(Debug, Clone)]
pub enum LeaderChangeMessage {
Expand Down Expand Up @@ -65,6 +67,12 @@ pub trait Election: Send + Sync {
/// note: a new leader will only return true on the first call.
fn in_infancy(&self) -> bool;

/// Registers a candidate for the election.
async fn register_candidate(&self) -> Result<()>;

/// Gets all candidates in the election.
async fn all_candidates(&self) -> Result<Vec<LeaderValue>>;

/// Campaign waits to acquire leadership in an election.
///
/// Multiple sessions can participate in the election,
Expand Down
78 changes: 68 additions & 10 deletions src/meta-srv/src/election/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ use std::time::Duration;

use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_telemetry::{error, info, warn};
use etcd_client::Client;
use etcd_client::{Client, GetOptions, PutOptions};
use snafu::{OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;

use crate::election::{Election, LeaderChangeMessage, ELECTION_KEY};
use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY};
use crate::error;
use crate::error::Result;
use crate::metasrv::{ElectionRef, LeaderValue};
Expand Down Expand Up @@ -105,11 +105,15 @@ impl EtcdElection {
}

fn election_key(&self) -> String {
if self.store_key_prefix.is_empty() {
ELECTION_KEY.to_string()
} else {
format!("{}{}", self.store_key_prefix, ELECTION_KEY)
}
format!("{}{}", self.store_key_prefix, ELECTION_KEY)
}

fn candidate_root(&self) -> String {
format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
}

fn candidate_key(&self) -> String {
format!("{}{}", self.candidate_root(), self.leader_value)
}
}

Expand All @@ -127,6 +131,61 @@ impl Election for EtcdElection {
.is_ok()
}

async fn register_candidate(&self) -> Result<()> {
const CANDIDATE_LEASE_SECS: u64 = 600;
const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;

let mut lease_client = self.client.lease_client();
let res = lease_client
.grant(CANDIDATE_LEASE_SECS as i64, None)
.await
.context(error::EtcdFailedSnafu)?;
let lease_id = res.id();

// The register info: key is the candidate key, value is its leader value.
let key = self.candidate_key().into_bytes();
let value = self.leader_value.clone().into_bytes();
// Puts with the lease id
self.client
.kv_client()
.put(key, value, Some(PutOptions::new().with_lease(lease_id)))
.await
.context(error::EtcdFailedSnafu)?;

let (mut keeper, mut receiver) = lease_client
.keep_alive(lease_id)
.await
.context(error::EtcdFailedSnafu)?;

let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS));

loop {
let _ = keep_alive_interval.tick().await;
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;

if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
if res.ttl() <= 0 {
// Failed to keep alive, just break the loop.
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
break;
tisonkun marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

Ok(())
}

async fn all_candidates(&self) -> Result<Vec<LeaderValue>> {
let key = self.candidate_root().into_bytes();
let res = self
.client
.kv_client()
.get(key, Some(GetOptions::new().with_prefix()))
.await
.context(error::EtcdFailedSnafu)?;
res.kvs().iter().map(|kv| Ok(kv.value().into())).collect()
}

async fn campaign(&self) -> Result<()> {
let mut lease_client = self.client.lease_client();
let mut election_client = self.client.election_client();
Expand Down Expand Up @@ -201,7 +260,7 @@ impl Election for EtcdElection {

async fn leader(&self) -> Result<LeaderValue> {
if self.is_leader.load(Ordering::Relaxed) {
Ok(LeaderValue(self.leader_value.clone()))
Ok(self.leader_value.as_bytes().into())
} else {
let res = self
.client
Expand All @@ -210,8 +269,7 @@ impl Election for EtcdElection {
.await
.context(error::EtcdFailedSnafu)?;
let leader_value = res.kv().context(error::NoLeaderSnafu)?.value();
let leader_value = String::from_utf8_lossy(leader_value).to_string();
Ok(LeaderValue(leader_value))
Ok(leader_value.into())
}
}

Expand Down
Loading