Skip to content

Commit

Permalink
fix: fetch leader_id from curp server directly
Browse files Browse the repository at this point in the history
  • Loading branch information
Phoenix500526 committed Jun 2, 2023
1 parent 4e11068 commit d7bb73e
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 9 deletions.
32 changes: 28 additions & 4 deletions curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use crate::{

/// Protocol client
pub struct Client<C: Command> {
/// Self id
self_id: ServerId,
/// Current leader and term
state: RwLock<State>,
/// All servers's `Connect`
Expand Down Expand Up @@ -91,8 +93,13 @@ where
{
/// Create a new protocol client based on the addresses
#[inline]
pub async fn new(addrs: HashMap<ServerId, String>, timeout: ClientTimeout) -> Self {
pub async fn new(
self_id: ServerId,
addrs: HashMap<ServerId, String>,
timeout: ClientTimeout,
) -> Self {
Self {
self_id,
state: RwLock::new(State::new()),
connects: rpc::connect(addrs, None).await,
timeout,
Expand Down Expand Up @@ -500,10 +507,27 @@ where
}
}

/// Get the current leader.
/// Fetch the current leader id and term from the curp server where is on the same node.
#[inline]
pub fn leader(&self) -> Option<ServerId> {
self.state.read().leader.clone()
async fn fetch_local_leader_info(&self) -> Result<(Option<ServerId>, u64), ProposeError> {
let resp = self
.connects
.get(self.self_id.as_str())
.unwrap_or_else(|| unreachable!("self id {} not found", self.self_id.as_str()))
.fetch_leader(FetchLeaderRequest::new(), *self.timeout.retry_timeout())
.await?
.into_inner();

Ok((resp.leader_id, resp.term))
}

/// Fetch the current leader id without cache
#[inline]
pub async fn get_leader_id_from_curp(&self) -> ServerId {
if let Ok((Some(leader_id), _term)) = self.fetch_local_leader_info().await {
return leader_id;
}
self.fetch_leader().await
}
}

Expand Down
2 changes: 1 addition & 1 deletion curp/tests/common/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl CurpGroup {
}

pub async fn new_client(&self, timeout: ClientTimeout) -> Client<TestCommand> {
Client::<TestCommand>::new(self.all.clone(), timeout).await
Client::<TestCommand>::new("S0".to_owned(), self.all.clone(), timeout).await
}

pub fn exe_rxs(
Expand Down
5 changes: 3 additions & 2 deletions xline/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ impl Client {
use_curp_client: bool,
timeout: ClientTimeout,
) -> Result<Self, ClientError> {
let name = String::from("client");
let etcd_client =
EtcdClient::connect(all_members.values().cloned().collect_vec(), None).await?;
let curp_client = CurpClient::new(all_members, timeout).await;
let curp_client = CurpClient::new(name.clone(), all_members, timeout).await;
Ok(Self {
name: String::from("client"),
name,
curp_client,
etcd_client,
use_curp_client,
Expand Down
2 changes: 1 addition & 1 deletion xline/src/server/lease_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ where
if self.lease_storage.is_primary() {
break self.leader_keep_alive(request_stream).await;
}
let leader_id = self.client.get_leader_id().await;
let leader_id = self.client.get_leader_id_from_curp().await;
// Given that a candidate server may become a leader when it won the election or
// a follower when it lost the election. Therefore we need to double check here.
// We can directly invoke leader_keep_alive when a candidate becomes a leader.
Expand Down
7 changes: 6 additions & 1 deletion xline/src/server/xline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,12 @@ impl XlineServer {
key_pair,
)?;
let client = Arc::new(
Client::<Command>::new(self.cluster_info.all_members(), self.client_timeout).await,
Client::<Command>::new(
self.cluster_info.self_id().clone(),
self.cluster_info.all_members(),
self.client_timeout,
)
.await,
);
let index_barrier = Arc::new(IndexBarrier::new());
let id_barrier = Arc::new(IdBarrier::new());
Expand Down

0 comments on commit d7bb73e

Please sign in to comment.