Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/issue 299 #302

Merged
merged 3 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 32 additions & 32 deletions curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{cmp::Ordering, collections::HashMap, fmt::Debug, iter, marker::Phantom
use event_listener::Event;
use futures::{pin_mut, stream::FuturesUnordered, StreamExt};
use parking_lot::RwLock;
use tokio::{sync::broadcast, time::timeout};
use tokio::time::timeout;
use tracing::{debug, instrument, warn};
use utils::{config::ClientTimeout, parking_lot_lock::RwLockMap};

Expand All @@ -19,6 +19,8 @@ use crate::{

/// Protocol client
pub struct Client<C: Command> {
/// local server id. Only use in an inner client.
local_server_id: Option<ServerId>,
/// Current leader and term
state: RwLock<State>,
/// All servers's `Connect`
Expand Down Expand Up @@ -48,30 +50,23 @@ struct State {
term: u64,
/// When a new leader is set, notify
leader_notify: Arc<Event>,
/// Send leader changes
leader_tx: broadcast::Sender<ServerId>,
}

impl State {
/// Create the initial client state
fn new() -> Self {
let (leader_tx, _) = broadcast::channel(1);
Self {
leader: None,
term: 0,
leader_notify: Arc::new(Event::new()),
leader_tx,
}
}

/// Set the leader and notify a waiter
/// Set the leader and notify all the waiters
fn set_leader(&mut self, id: ServerId) {
debug!("client update its leader to {id}");
if self.leader.as_ref().map_or(true, |prev_id| prev_id != &id) {
let _ignored = self.leader_tx.send(id.clone()).ok(); // it's ok to have no receiver
}
self.leader = Some(id);
self.leader_notify.notify(1);
self.leader_notify.notify(usize::MAX);
Phoenix500526 marked this conversation as resolved.
Show resolved Hide resolved
}

/// Update to the newest term and reset local cache
Expand All @@ -98,8 +93,13 @@ where
{
/// Create a new protocol client based on the addresses
#[inline]
pub async fn new(addrs: HashMap<ServerId, String>, timeout: ClientTimeout) -> Self {
pub async fn new(
self_id: Option<ServerId>,
addrs: HashMap<ServerId, String>,
timeout: ClientTimeout,
) -> Self {
Self {
local_server_id: self_id,
state: RwLock::new(State::new()),
connects: rpc::connect(addrs, None).await,
timeout,
Expand Down Expand Up @@ -507,16 +507,31 @@ where
}
}

/// Get the current leader.
/// Fetch the current leader id and term from the curp server where is on the same node.
/// Note that this method should not be invoked by an outside client.
#[inline]
pub fn leader(&self) -> Option<ServerId> {
self.state.read().leader.clone()
async fn fetch_local_leader_info(&self) -> Result<(Option<ServerId>, u64), ProposeError> {
if let Some(ref local_server) = self.local_server_id {
let resp = self
.connects
.get(local_server)
.unwrap_or_else(|| unreachable!("self id {} not found", local_server))
.fetch_leader(FetchLeaderRequest::new(), *self.timeout.retry_timeout())
.await?
.into_inner();
Ok((resp.leader_id, resp.term))
} else {
unreachable!("The outer client shouldn't invoke fetch_local_leader_info");
}
}

/// Get the receiver for leader changes
/// Fetch the current leader id without cache
#[inline]
pub fn leader_rx(&self) -> broadcast::Receiver<ServerId> {
self.state.read().leader_tx.subscribe()
pub async fn get_leader_id_from_curp(&self) -> ServerId {
if let Ok((Some(leader_id), _term)) = self.fetch_local_leader_info().await {
return leader_id;
}
self.fetch_leader().await
}
}

Expand All @@ -536,21 +551,6 @@ fn superquorum(nodes: usize) -> usize {
mod tests {
use super::*;

#[allow(clippy::unwrap_used)]
#[tokio::test]
async fn will_get_notify_on_leader_changes() {
let mut state = State::new();
let mut rx = state.leader_tx.subscribe();

state.set_leader("S1".to_owned());
assert_eq!(rx.recv().await.unwrap().as_str(), "S1");

state.set_leader("S2".to_owned());
state.set_leader("S3".to_owned());
assert!(rx.recv().await.is_err());
assert_eq!(rx.recv().await.unwrap().as_str(), "S3");
}

#[test]
fn superquorum_should_work() {
assert_eq!(superquorum(11), 9);
Expand Down
9 changes: 8 additions & 1 deletion curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::{
time::MissedTickBehavior,
};
use tracing::{debug, error, info, warn};
use utils::config::CurpConfig;
use utils::config::{ClientTimeout, CurpConfig};

use super::{
cmd_board::{CmdBoardRef, CommandBoard},
Expand All @@ -24,6 +24,7 @@ use super::{
storage::{StorageApi, StorageError},
};
use crate::{
client::Client,
cmd::{Command, CommandExecutor},
error::ProposeError,
log_entry::LogEntry,
Expand Down Expand Up @@ -275,6 +276,12 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {

/// Spawned tasks
impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
/// get curp inner client from `CurpNode`
#[inline]
pub(crate) async fn inner_client(&self, client_timeout: ClientTimeout) -> Client<C> {
self.curp.inner_client(client_timeout).await
}

/// Tick periodically
async fn election_task(
curp: Arc<RawCurp<C, RC>>,
Expand Down
12 changes: 11 additions & 1 deletion curp/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ use tokio::{net::TcpListener, sync::broadcast};
use tokio_stream::wrappers::TcpListenerStream;
use tower::filter::FilterLayer;
use tracing::{info, instrument};
use utils::{config::CurpConfig, tracing::Extract};
use utils::{
config::{ClientTimeout, CurpConfig},
tracing::Extract,
};

use self::curp_node::{CurpError, CurpNode};
use crate::{
client::Client,
cmd::{Command, CommandExecutor},
error::ServerError,
members::ClusterMember,
Expand Down Expand Up @@ -295,6 +299,12 @@ impl<C: Command + 'static, RC: RoleChange + 'static> Rpc<C, RC> {
pub fn leader_rx(&self) -> broadcast::Receiver<Option<ServerId>> {
self.inner.leader_rx()
}

/// Get an inner client
#[inline]
pub async fn inner_client(&self, client_timeout: ClientTimeout) -> Client<C> {
self.inner.inner_client(client_timeout).await
}
}

impl From<CurpError> for tonic::Status {
Expand Down
13 changes: 12 additions & 1 deletion curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tracing::{
log::{log_enabled, Level},
};
use utils::{
config::CurpConfig,
config::{ClientTimeout, CurpConfig},
parking_lot_lock::{MutexMap, RwLockMap},
};

Expand All @@ -39,6 +39,7 @@ use self::{
};
use super::cmd_worker::CEEventTxApi;
use crate::{
client::Client,
cmd::{Command, ProposeId},
error::ProposeError,
log_entry::LogEntry,
Expand Down Expand Up @@ -615,6 +616,16 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
raw_curp
}

/// get curp inner client
pub(crate) async fn inner_client(&self, timeout: ClientTimeout) -> Client<C> {
Client::<C>::new(
Some(self.id().clone()),
self.ctx.cluster_info.all_members(),
timeout,
)
.await
}

/// Create a new `RawCurp`
/// `is_leader` will only take effect when all servers start from a fresh state
#[allow(clippy::too_many_arguments)] // only called once
Expand Down
2 changes: 1 addition & 1 deletion curp/tests/common/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl CurpGroup {
}

pub async fn new_client(&self, timeout: ClientTimeout) -> Client<TestCommand> {
Client::<TestCommand>::new(self.all.clone(), timeout).await
Client::<TestCommand>::new(None, self.all.clone(), timeout).await
}

pub fn exe_rxs(
Expand Down
5 changes: 3 additions & 2 deletions xline/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ impl Client {
use_curp_client: bool,
timeout: ClientTimeout,
) -> Result<Self, ClientError> {
let name = String::from("client");
let etcd_client =
EtcdClient::connect(all_members.values().cloned().collect_vec(), None).await?;
let curp_client = CurpClient::new(all_members, timeout).await;
let curp_client = CurpClient::new(None, all_members, timeout).await;
Ok(Self {
name: String::from("client"),
name,
curp_client,
etcd_client,
use_curp_client,
Expand Down
4 changes: 2 additions & 2 deletions xline/src/server/lease_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ where
if self.lease_storage.is_primary() {
break self.leader_keep_alive(request_stream).await;
}
let leader_id = self.client.get_leader_id().await;
let leader_id = self.client.get_leader_id_from_curp().await;
Phoenix500526 marked this conversation as resolved.
Show resolved Hide resolved
// Given that a candidate server may become a leader when it won the election or
// a follower when it lost the election. Therefore we need to double check here.
// We can directly invoke leader_keep_alive when a candidate becomes a leader.
Expand Down Expand Up @@ -349,7 +349,7 @@ where
};
return Ok(tonic::Response::new(res));
}
let leader_id = self.client.get_leader_id().await;
let leader_id = self.client.get_leader_id_from_curp().await;
let leader_addr = self.cluster_info.address(&leader_id).unwrap_or_else(|| {
unreachable!(
"The address of leader {} not found in all_members {:?}",
Expand Down
32 changes: 16 additions & 16 deletions xline/src/server/xline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};

use anyhow::Result;
use clippy_utilities::{Cast, OverflowArithmetic};
use curp::{
client::Client, members::ClusterMember, server::Rpc, ProtocolServer, SnapshotAllocator,
};
use curp::{members::ClusterMember, server::Rpc, ProtocolServer, SnapshotAllocator};
use event_listener::Event;
use jsonwebtoken::{DecodingKey, EncodingKey};
use tokio::net::TcpListener;
Expand Down Expand Up @@ -276,9 +274,7 @@ impl XlineServer {
Arc::clone(&auth_revision_gen),
key_pair,
)?;
let client = Arc::new(
Client::<Command>::new(self.cluster_info.all_members(), self.client_timeout).await,
);

let index_barrier = Arc::new(IndexBarrier::new());
let id_barrier = Arc::new(IdBarrier::new());

Expand All @@ -300,6 +296,19 @@ impl XlineServer {
_ => unimplemented!(),
};

let curp_server = CurpServer::new(
Arc::clone(&self.cluster_info),
self.is_leader,
ce,
snapshot_allocator,
state,
Arc::clone(&self.curp_cfg),
None,
)
.await;

let client = Arc::new(curp_server.inner_client(self.client_timeout).await);

Ok((
KvServer::new(
kv_storage,
Expand All @@ -326,16 +335,7 @@ impl XlineServer {
AuthServer::new(auth_storage, client, self.cluster_info.self_id().clone()),
WatchServer::new(watcher, Arc::clone(&header_gen)),
MaintenanceServer::new(persistent, header_gen),
CurpServer::new(
Arc::clone(&self.cluster_info),
self.is_leader,
ce,
snapshot_allocator,
state,
Arc::clone(&self.curp_cfg),
None,
)
.await,
curp_server,
))
}
}
Expand Down