From 6f0779b67dabf862d458043c9950d568b4aa78f9 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 9 Apr 2024 21:59:14 +0800 Subject: [PATCH 01/10] feat: add doc for MetasrvOptions --- src/meta-srv/src/metasrv.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 4b84fcb9e38c..909eacc69d7c 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -64,21 +64,38 @@ pub const METASRV_HOME: &str = "/tmp/metasrv"; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(default)] pub struct MetasrvOptions { + /// The address the server listens on. pub bind_addr: String, + /// The address the server advertises to the clients. pub server_addr: String, + /// The address of the store, e.g., etcd. pub store_addr: String, + /// The type of selector. pub selector: SelectorType, + /// Whether to use the memory store. pub use_memory_store: bool, + /// Whether to enable region failover. pub enable_region_failover: bool, + /// The HTTP server options. pub http: HttpOptions, + /// The logging options. pub logging: LoggingOptions, + /// The procedure options. pub procedure: ProcedureConfig, + /// The failure detector options. pub failure_detector: PhiAccrualFailureDetectorOptions, + /// The datanode options. pub datanode: DatanodeOptions, + /// Whether to enable telemetry. pub enable_telemetry: bool, + /// The data home directory. pub data_home: String, + /// The WAL options. pub wal: MetasrvWalConfig, + /// The metrics export options. pub export_metrics: ExportMetricsOption, + /// The store key prefix. If it is not empty, all keys in the store will be prefixed with it. + /// This is useful when multiple metasrv clusters share the same store. pub store_key_prefix: String, /// The max operations per txn /// From 3c03ebd75af10c751d013ea3ded640da078914ab Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Wed, 10 Apr 2024 17:03:52 +0800 Subject: [PATCH 02/10] feat: register candidate before election --- src/common/meta/src/cluster.rs | 12 +++++ src/meta-srv/src/election.rs | 8 ++++ src/meta-srv/src/election/etcd.rs | 75 ++++++++++++++++++++++++++++++- src/meta-srv/src/metasrv.rs | 39 +++++++++++----- 4 files changed, 121 insertions(+), 13 deletions(-) diff --git a/src/common/meta/src/cluster.rs b/src/common/meta/src/cluster.rs index 5a96c095927f..bdc9bc8ae49b 100644 --- a/src/common/meta/src/cluster.rs +++ b/src/common/meta/src/cluster.rs @@ -27,6 +27,7 @@ use crate::error::{ use crate::peer::Peer; const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info"; +const CLUSTER_METASRV_INFO_PREFIX: &str = "__meta_cluster_metasrv_info"; lazy_static! { static ref CLUSTER_NODE_INFO_PREFIX_PATTERN: Regex = Regex::new(&format!( @@ -35,6 +36,17 @@ lazy_static! { .unwrap(); } +pub fn metasrv_node_key(target: &str) -> Vec { + format!("{}-{}", CLUSTER_METASRV_INFO_PREFIX, target).into_bytes() +} + +pub fn metasrv_node_info(target: &str, is_leader: bool) -> (Vec, NodeStatus) { + ( + metasrv_node_key(target), + NodeStatus::Metasrv(MetasrvStatus { is_leader }), + ) +} + /// [ClusterInfo] provides information about the cluster. #[async_trait::async_trait] pub trait ClusterInfo { diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index fb5c9296e2d1..6562150bfaa6 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -21,8 +21,10 @@ use etcd_client::LeaderKey; use tokio::sync::broadcast::Receiver; use crate::error::Result; +use crate::metasrv::LeaderValue; pub const ELECTION_KEY: &str = "__metasrv_election"; +pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/"; #[derive(Debug, Clone)] pub enum LeaderChangeMessage { @@ -65,6 +67,12 @@ pub trait Election: Send + Sync { /// note: a new leader will only return true on the first call. fn in_infancy(&self) -> bool; + /// Registers a candidate for the election. + async fn register_candidate(&self) -> Result<()>; + + /// Gets all candidates in the election. + async fn all_candidates(&self) -> Result>; + /// Campaign waits to acquire leadership in an election. /// /// Multiple sessions can participate in the election, diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index 80d3b51152fd..cf2276249ab3 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -18,13 +18,13 @@ use std::time::Duration; use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS}; use common_telemetry::{error, info, warn}; -use etcd_client::Client; +use etcd_client::{Client, GetOptions, PutOptions}; use snafu::{OptionExt, ResultExt}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; -use crate::election::{Election, LeaderChangeMessage, ELECTION_KEY}; +use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY}; use crate::error; use crate::error::Result; use crate::metasrv::{ElectionRef, LeaderValue}; @@ -111,6 +111,18 @@ impl EtcdElection { format!("{}{}", self.store_key_prefix, ELECTION_KEY) } } + + fn candidate_root(&self) -> String { + if self.store_key_prefix.is_empty() { + CANDIDATES_ROOT.to_string() + } else { + format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT) + } + } + + fn candidate_key(&self) -> String { + format!("{}{}", self.candidate_root(), self.leader_value) + } } #[async_trait::async_trait] @@ -127,6 +139,65 @@ impl Election for EtcdElection { .is_ok() } + async fn register_candidate(&self) -> Result<()> { + const CANDIDATE_LEASE_SECS: u64 = 600; + const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2; + + let mut lease_client = self.client.lease_client(); + let res = lease_client + .grant(CANDIDATE_LEASE_SECS as i64, None) + .await + .context(error::EtcdFailedSnafu)?; + let lease_id = res.id(); + + // The register info: key is the candidate key, value is its leader value. + let key = self.candidate_key().into_bytes(); + let value = self.leader_value.clone().into_bytes(); + // Puts with the lease id + self.client + .kv_client() + .put(key, value, Some(PutOptions::new().with_lease(lease_id))) + .await + .context(error::EtcdFailedSnafu)?; + + let (mut keeper, mut receiver) = lease_client + .keep_alive(lease_id) + .await + .context(error::EtcdFailedSnafu)?; + + let mut keep_alive_interval = + tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS)); + + loop { + let _ = keep_alive_interval.tick().await; + keeper.keep_alive().await.context(error::EtcdFailedSnafu)?; + + if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? { + if res.ttl() <= 0 { + // Failed to keep alive, just break the loop. + break; + } + } + } + + Ok(()) + } + + async fn all_candidates(&self) -> Result> { + let key = self.candidate_root().into_bytes(); + let res = self + .client + .kv_client() + .get(key, Some(GetOptions::new().with_prefix())) + .await + .context(error::EtcdFailedSnafu)?; + + res.kvs() + .iter() + .map(|kv| Ok(LeaderValue(String::from_utf8_lossy(kv.value()).to_string()))) + .collect() + } + async fn campaign(&self) -> Result<()> { let mut lease_client = self.client.lease_client(); let mut election_client = self.client.election_client(); diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 909eacc69d7c..b8fc6265bf36 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -358,18 +358,35 @@ impl Metasrv { state_handler.on_become_follower().await; }); - let election = election.clone(); - let started = self.started.clone(); - let _handle = common_runtime::spawn_bg(async move { - while started.load(Ordering::Relaxed) { - let res = election.campaign().await; - if let Err(e) = res { - warn!("Metasrv election error: {}", e); + // Register candidate and keep lease in background. + { + let election = election.clone(); + let started = self.started.clone(); + let _handle = common_runtime::spawn_bg(async move { + while started.load(Ordering::Relaxed) { + let res = election.register_candidate().await; + if let Err(e) = res { + warn!("Metasrv register candidate error: {}", e); + } } - info!("Metasrv re-initiate election"); - } - info!("Metasrv stopped"); - }); + }); + } + + // Campaign + { + let election = election.clone(); + let started = self.started.clone(); + let _handle = common_runtime::spawn_bg(async move { + while started.load(Ordering::Relaxed) { + let res = election.campaign().await; + if let Err(e) = res { + warn!("Metasrv election error: {}", e); + } + info!("Metasrv re-initiate election"); + } + info!("Metasrv stopped"); + }); + } } else { if let Err(e) = self.wal_options_allocator.start().await { error!(e; "Failed to start wal options allocator"); From 6a8308e1fab94b887290ba0905a1444e0963cdff Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Thu, 11 Apr 2024 21:45:17 +0800 Subject: [PATCH 03/10] feat: get all peers metasrv --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/common/meta/src/cluster.rs | 16 +----- src/meta-client/src/client.rs | 80 ++++++++++++++++++++++++--- src/meta-client/src/client/cluster.rs | 51 +++++++++-------- src/meta-srv/src/service/cluster.rs | 59 +++++++++++++++++++- 6 files changed, 160 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d321df6a510c..f7ccb1ab93a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3775,7 +3775,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b97efbf92a0bf9abcfa1d8fe0ffe8741a2e7309e#b97efbf92a0bf9abcfa1d8fe0ffe8741a2e7309e" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3edba4e6342d8926e427a1a2f12973420d06991c#3edba4e6342d8926e427a1a2f12973420d06991c" dependencies = [ "prost 0.12.3", "serde", diff --git a/Cargo.toml b/Cargo.toml index 0ecd7d2cafcd..30788ba32ae1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,7 +104,7 @@ etcd-client = "0.12" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b97efbf92a0bf9abcfa1d8fe0ffe8741a2e7309e" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3edba4e6342d8926e427a1a2f12973420d06991c" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/common/meta/src/cluster.rs b/src/common/meta/src/cluster.rs index bdc9bc8ae49b..9fde3f1cfb45 100644 --- a/src/common/meta/src/cluster.rs +++ b/src/common/meta/src/cluster.rs @@ -27,7 +27,6 @@ use crate::error::{ use crate::peer::Peer; const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info"; -const CLUSTER_METASRV_INFO_PREFIX: &str = "__meta_cluster_metasrv_info"; lazy_static! { static ref CLUSTER_NODE_INFO_PREFIX_PATTERN: Regex = Regex::new(&format!( @@ -36,17 +35,6 @@ lazy_static! { .unwrap(); } -pub fn metasrv_node_key(target: &str) -> Vec { - format!("{}-{}", CLUSTER_METASRV_INFO_PREFIX, target).into_bytes() -} - -pub fn metasrv_node_info(target: &str, is_leader: bool) -> (Vec, NodeStatus) { - ( - metasrv_node_key(target), - NodeStatus::Metasrv(MetasrvStatus { is_leader }), - ) -} - /// [ClusterInfo] provides information about the cluster. #[async_trait::async_trait] pub trait ClusterInfo { @@ -62,11 +50,13 @@ pub trait ClusterInfo { } /// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-{cluster_id}-{role}-{node_id}`. +/// This key cannot be used to describe the `Metasrv` because the `Metasrv` does not have +/// a `cluster_id`, it serves multiple clusters. #[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] pub struct NodeInfoKey { /// The cluster id. pub cluster_id: u64, - /// The role of the node. It can be [Role::Datanode], [Role::Frontend], or [Role::Metasrv]. + /// The role of the node. It can be [Role::Datanode] or [Role::Frontend]. pub role: Role, /// The node id. pub node_id: u64, diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index b37e74fe8831..41267b29c97a 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -26,6 +26,9 @@ use api::v1::meta::Role; use cluster::Client as ClusterClient; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::cluster::{ + ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole, +}; use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; @@ -46,8 +49,9 @@ use snafu::{OptionExt, ResultExt}; use store::Client as StoreClient; pub use self::heartbeat::{HeartbeatSender, HeartbeatStream}; -use crate::error; -use crate::error::{ConvertMetaResponseSnafu, Result}; +use crate::error::{ + ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, NotStartedSnafu, Result, +}; pub type Id = (u64, u64); @@ -239,6 +243,57 @@ impl ProcedureExecutor for MetaClient { } } +#[async_trait::async_trait] +impl ClusterInfo for MetaClient { + type Error = Error; + + async fn list_nodes(&self, role: Option) -> Result> { + let cluster_client = self.cluster_client()?; + + let (get_metasrv_nodes, nodes_key_prefix) = match role { + None => ( + true, + Some(NodeInfoKey::key_prefix_with_cluster_id(self.id.0)), + ), + Some(ClusterRole::Metasrv) => (true, None), + Some(role) => ( + false, + Some(NodeInfoKey::key_prefix_with_role(self.id.0, role)), + ), + }; + + let mut nodes = if get_metasrv_nodes { + let last_activity_ts = -1; // Metasrv does not provide this information. + let (leader, followers) = cluster_client.get_metasrv_peers().await?; + followers + .into_iter() + .map(|peer| NodeInfo { + peer, + last_activity_ts, + status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }), + }) + .chain(leader.into_iter().map(|leader| NodeInfo { + peer: leader, + last_activity_ts, + status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }), + })) + .collect::>() + } else { + Vec::new() + }; + + if let Some(prefix) = nodes_key_prefix { + let req = RangeRequest::new().with_prefix(prefix); + let res = cluster_client.range(req).await?; + for kv in res.kvs { + nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?); + } + } + + Ok(nodes) + } +} + impl MetaClient { pub fn new(id: Id) -> Self { Self { @@ -405,31 +460,31 @@ impl MetaClient { ) -> Result { let res = self .procedure_client()? - .submit_ddl_task(req.try_into().context(error::ConvertMetaRequestSnafu)?) + .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?) .await? .try_into() - .context(error::ConvertMetaResponseSnafu)?; + .context(ConvertMetaResponseSnafu)?; Ok(res) } #[inline] pub fn heartbeat_client(&self) -> Result { - self.heartbeat.clone().context(error::NotStartedSnafu { + self.heartbeat.clone().context(NotStartedSnafu { name: "heartbeat_client", }) } #[inline] pub fn store_client(&self) -> Result { - self.store.clone().context(error::NotStartedSnafu { + self.store.clone().context(NotStartedSnafu { name: "store_client", }) } #[inline] pub fn lock_client(&self) -> Result { - self.lock.clone().context(error::NotStartedSnafu { + self.lock.clone().context(NotStartedSnafu { name: "lock_client", }) } @@ -438,7 +493,14 @@ impl MetaClient { pub fn procedure_client(&self) -> Result { self.procedure .clone() - .context(error::NotStartedSnafu { name: "ddl_client" }) + .context(NotStartedSnafu { name: "ddl_client" }) + } + + #[inline] + pub fn cluster_client(&self) -> Result { + self.cluster.clone().context(NotStartedSnafu { + name: "cluster_client", + }) } #[inline] @@ -460,7 +522,7 @@ mod tests { use meta_srv::Result as MetaResult; use super::*; - use crate::mocks; + use crate::{error, mocks}; const TEST_KEY_PREFIX: &str = "__unit_test__meta__"; diff --git a/src/meta-client/src/client/cluster.rs b/src/meta-client/src/client/cluster.rs index 3e9568d06a87..9ece92177aae 100644 --- a/src/meta-client/src/client/cluster.rs +++ b/src/meta-client/src/client/cluster.rs @@ -17,10 +17,9 @@ use std::sync::Arc; use api::greptime_proto::v1; use api::v1::meta::cluster_client::ClusterClient; -use api::v1::meta::{ResponseHeader, Role}; +use api::v1::meta::{MetasrvPeersRequest, ResponseHeader, Role}; use common_grpc::channel_manager::ChannelManager; -use common_meta::cluster; -use common_meta::cluster::{ClusterInfo, NodeInfo, NodeInfoKey}; +use common_meta::peer::Peer; use common_meta::rpc::store::{BatchGetRequest, BatchGetResponse, RangeRequest, RangeResponse}; use common_telemetry::{info, warn}; use snafu::{ensure, ResultExt}; @@ -72,27 +71,10 @@ impl Client { let inner = self.inner.read().await; inner.batch_get(req).await } -} - -#[async_trait::async_trait] -impl ClusterInfo for Client { - type Error = Error; - - async fn list_nodes(&self, role: Option) -> Result> { - let cluster_id = self.inner.read().await.id.0; - let key_prefix = match role { - None => NodeInfoKey::key_prefix_with_cluster_id(cluster_id), - Some(role) => NodeInfoKey::key_prefix_with_role(cluster_id, role), - }; - - let req = RangeRequest::new().with_prefix(key_prefix); - - let res = self.range(req).await?; - res.kvs - .into_iter() - .map(|kv| NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)) - .collect::>>() + pub async fn get_metasrv_peers(&self) -> Result<(Option, Vec)> { + let inner = self.inner.read().await; + inner.get_metasrv_peers().await } } @@ -239,4 +221,27 @@ impl Inner { .try_into() .context(ConvertMetaResponseSnafu) } + + async fn get_metasrv_peers(&self) -> Result<(Option, Vec)> { + self.with_retry( + "get_metasrv_peers", + move |mut client| { + let inner_req = tonic::Request::new(MetasrvPeersRequest::default()); + + async move { + client + .metasrv_peers(inner_req) + .await + .map(|res| res.into_inner()) + } + }, + |res| &res.header, + ) + .await + .map(|res| { + let leader = res.leader.map(|x| x.into()); + let peers = res.followers.into_iter().map(|x| x.into()).collect(); + (leader, peers) + }) + } } diff --git a/src/meta-srv/src/service/cluster.rs b/src/meta-srv/src/service/cluster.rs index 049b34269d26..69056ffe1bb2 100644 --- a/src/meta-srv/src/service/cluster.rs +++ b/src/meta-srv/src/service/cluster.rs @@ -14,7 +14,8 @@ use api::v1::meta::{ cluster_server, BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse, - Error, RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader, + Error, MetasrvPeersRequest, MetasrvPeersResponse, Peer, RangeRequest as PbRangeRequest, + RangeResponse as PbRangeResponse, ResponseHeader, }; use common_telemetry::warn; use snafu::ResultExt; @@ -34,7 +35,7 @@ impl cluster_server::Cluster for Metasrv { ..Default::default() }; - warn!("The current meta is not leader, but a batch_get request have reached the meta. Detail: {:?}.", req); + warn!("The current meta is not leader, but a `batch_get` request have reached the meta. Detail: {:?}.", req); return Ok(Response::new(resp)); } @@ -57,7 +58,7 @@ impl cluster_server::Cluster for Metasrv { ..Default::default() }; - warn!("The current meta is not leader, but a range request have reached the meta. Detail: {:?}.", req); + warn!("The current meta is not leader, but a `range` request have reached the meta. Detail: {:?}.", req); return Ok(Response::new(resp)); } @@ -71,6 +72,58 @@ impl cluster_server::Cluster for Metasrv { let resp = res.to_proto_resp(ResponseHeader::success(0)); Ok(Response::new(resp)) } + + async fn metasrv_peers( + &self, + req: Request, + ) -> GrpcResult { + if !self.is_leader() { + let is_not_leader = ResponseHeader::failed(0, Error::is_not_leader()); + let resp = MetasrvPeersResponse { + header: Some(is_not_leader), + ..Default::default() + }; + + warn!("The current meta is not leader, but a `metasrv_peers` request have reached the meta. Detail: {:?}.", req); + return Ok(Response::new(resp)); + } + + let (leader, followers) = match self.election() { + Some(election) => { + let leader = election.leader().await?; + let peers = election.all_candidates().await?; + let followers = peers + .into_iter() + .filter(|peer| peer.0 != leader.0) + .map(|peer| Peer { + addr: peer.0, + ..Default::default() + }) + .collect(); + ( + Some(Peer { + addr: leader.0, + ..Default::default() + }), + followers, + ) + } + None => ( + Some(Peer { + addr: self.options().server_addr.clone(), + ..Default::default() + }), + vec![], + ), + }; + + let resp = MetasrvPeersResponse { + header: Some(ResponseHeader::success(0)), + leader, + followers, + }; + Ok(Response::new(resp)) + } } impl Metasrv { From daa2879312b56342662edf895b87fc90313435c5 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Fri, 12 Apr 2024 15:43:17 +0800 Subject: [PATCH 04/10] chore: simply code --- src/meta-srv/src/election/etcd.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index cf2276249ab3..0b205f21b642 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -105,19 +105,11 @@ impl EtcdElection { } fn election_key(&self) -> String { - if self.store_key_prefix.is_empty() { - ELECTION_KEY.to_string() - } else { - format!("{}{}", self.store_key_prefix, ELECTION_KEY) - } + format!("{}{}", self.store_key_prefix, ELECTION_KEY) } fn candidate_root(&self) -> String { - if self.store_key_prefix.is_empty() { - CANDIDATES_ROOT.to_string() - } else { - format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT) - } + format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT) } fn candidate_key(&self) -> String { From b968d2596153807aea9e5a268d3ed9f6c57efa71 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Fri, 12 Apr 2024 17:17:17 +0800 Subject: [PATCH 05/10] chore: proto rev --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f7ccb1ab93a1..1a6a468e7054 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3775,7 +3775,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3edba4e6342d8926e427a1a2f12973420d06991c#3edba4e6342d8926e427a1a2f12973420d06991c" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=8da84a04b137c4104262459807eab1c04b92f3cc#8da84a04b137c4104262459807eab1c04b92f3cc" dependencies = [ "prost 0.12.3", "serde", diff --git a/Cargo.toml b/Cargo.toml index 30788ba32ae1..bbba87e3ae81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,7 +104,7 @@ etcd-client = "0.12" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3edba4e6342d8926e427a1a2f12973420d06991c" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "8da84a04b137c4104262459807eab1c04b92f3cc" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" From 1b510533feab6a0ed09a38983acdb8becb452676 Mon Sep 17 00:00:00 2001 From: Jeremyhi Date: Mon, 15 Apr 2024 14:34:23 +0800 Subject: [PATCH 06/10] Update src/common/meta/src/cluster.rs Co-authored-by: dennis zhuang --- src/common/meta/src/cluster.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/meta/src/cluster.rs b/src/common/meta/src/cluster.rs index 9fde3f1cfb45..ba3aecea2678 100644 --- a/src/common/meta/src/cluster.rs +++ b/src/common/meta/src/cluster.rs @@ -56,7 +56,7 @@ pub trait ClusterInfo { pub struct NodeInfoKey { /// The cluster id. pub cluster_id: u64, - /// The role of the node. It can be [Role::Datanode] or [Role::Frontend]. + /// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`. pub role: Role, /// The node id. pub node_id: u64, From f511b94667536df3067cbf4ad3e5cb2bd83e8772 Mon Sep 17 00:00:00 2001 From: Jeremyhi Date: Mon, 15 Apr 2024 14:36:21 +0800 Subject: [PATCH 07/10] Update src/meta-client/src/client.rs Co-authored-by: dennis zhuang --- src/meta-client/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 41267b29c97a..58befdd1f811 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -493,7 +493,7 @@ impl MetaClient { pub fn procedure_client(&self) -> Result { self.procedure .clone() - .context(NotStartedSnafu { name: "ddl_client" }) + .context(NotStartedSnafu { name: "procedure_client" }) } #[inline] From e2d15435f195d95a04e6fb8856343d27f7d486d7 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 15 Apr 2024 15:47:47 +0800 Subject: [PATCH 08/10] fmt Signed-off-by: tison --- src/meta-client/src/client.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 58befdd1f811..6f190b999a21 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -491,9 +491,9 @@ impl MetaClient { #[inline] pub fn procedure_client(&self) -> Result { - self.procedure - .clone() - .context(NotStartedSnafu { name: "procedure_client" }) + self.procedure.clone().context(NotStartedSnafu { + name: "procedure_client", + }) } #[inline] From 1962dc2a158aa872a926b2fca6e503f789a2a172 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 15 Apr 2024 15:48:36 +0800 Subject: [PATCH 09/10] Apply suggestions from code review Co-authored-by: dennis zhuang --- src/meta-client/src/client.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 6f190b999a21..fed97c568193 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -496,7 +496,6 @@ impl MetaClient { }) } - #[inline] pub fn cluster_client(&self) -> Result { self.cluster.clone().context(NotStartedSnafu { name: "cluster_client", From 434780418d174becd0e6a3a3fd3077a9c863bbfb Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 15 Apr 2024 15:54:53 +0800 Subject: [PATCH 10/10] impl> From for LeaderValue Signed-off-by: tison --- src/meta-srv/src/election/etcd.rs | 11 +++-------- src/meta-srv/src/metasrv.rs | 7 +++++++ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index 0b205f21b642..695808a78728 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -183,11 +183,7 @@ impl Election for EtcdElection { .get(key, Some(GetOptions::new().with_prefix())) .await .context(error::EtcdFailedSnafu)?; - - res.kvs() - .iter() - .map(|kv| Ok(LeaderValue(String::from_utf8_lossy(kv.value()).to_string()))) - .collect() + res.kvs().iter().map(|kv| Ok(kv.value().into())).collect() } async fn campaign(&self) -> Result<()> { @@ -264,7 +260,7 @@ impl Election for EtcdElection { async fn leader(&self) -> Result { if self.is_leader.load(Ordering::Relaxed) { - Ok(LeaderValue(self.leader_value.clone())) + Ok(self.leader_value.as_bytes().into()) } else { let res = self .client @@ -273,8 +269,7 @@ impl Election for EtcdElection { .await .context(error::EtcdFailedSnafu)?; let leader_value = res.kv().context(error::NoLeaderSnafu)?.value(); - let leader_value = String::from_utf8_lossy(leader_value).to_string(); - Ok(LeaderValue(leader_value)) + Ok(leader_value.into()) } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index b8fc6265bf36..dd0fbb1fde1a 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -208,6 +208,13 @@ impl Context { pub struct LeaderValue(pub String); +impl> From for LeaderValue { + fn from(value: T) -> Self { + let string = String::from_utf8_lossy(value.as_ref()); + Self(string.to_string()) + } +} + #[derive(Clone)] pub struct SelectorContext { pub server_addr: String,