Skip to content

Commit

Permalink
refactor: add a ClusterMember structure to wrap all_members info
Browse files Browse the repository at this point in the history
  • Loading branch information
Phoenix500526 committed May 29, 2023
1 parent 86f4584 commit 5654da3
Show file tree
Hide file tree
Showing 17 changed files with 344 additions and 215 deletions.
2 changes: 1 addition & 1 deletion benchmark/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl CommandRunner {
let mut clients = Vec::with_capacity(self.args.clients);
for _ in 0..self.args.clients {
let client = Client::new(
self.args.endpoints.clone().into(),
self.args.endpoints.clone(),
self.args.use_curp,
ClientTimeout::new(
Duration::from_secs(10),
Expand Down
2 changes: 1 addition & 1 deletion curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
{
/// Create a new protocol client based on the addresses
#[inline]
pub async fn new(addrs: Arc<HashMap<ServerId, String>>, timeout: ClientTimeout) -> Self {
pub async fn new(addrs: HashMap<ServerId, String>, timeout: ClientTimeout) -> Self {
Self {
state: RwLock::new(State::new()),
connects: rpc::connect(addrs, None).await,
Expand Down
3 changes: 3 additions & 0 deletions curp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ pub mod client;
/// Server side, handling request and sync requests to the log
pub mod server;

/// Cluster members information
pub mod members;

/// Error types
pub mod error;

Expand Down
200 changes: 200 additions & 0 deletions curp/src/members.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::Hasher,
time::SystemTime,
};

use itertools::Itertools;

use crate::ServerId;

/// cluster members information
#[derive(Debug)]
pub struct ClusterMember {
/// current server id
id: String,
/// other peers information
all_members: HashMap<ServerId, String>,
}

impl ClusterMember {
/// Construct a new `ClusterMember`
///
/// # Panics
///
/// panic if `all_members` is empty
#[inline]
#[must_use]
pub fn new(all_members: HashMap<ServerId, String>, id: String) -> Self {
assert!(!all_members.is_empty());
Self { id, all_members }
}

/// get server address via server id
#[must_use]
#[inline]
pub fn get_address(&self, id: &str) -> Option<&String> {
self.all_members.get(id)
}

/// get the current server address
#[must_use]
#[inline]
pub fn get_self_url(&self) -> &str {
self.all_members.get(self.id.as_str()).unwrap_or_else(|| {
unreachable!(
"The address of {} not found in all_members {:?}",
self.id, self.all_members
)
})
}

/// get the current server id
#[must_use]
#[inline]
pub fn get_self_id(&self) -> &ServerId {
&self.id
}

/// get peers id
#[must_use]
#[inline]
pub fn get_peers_id(&self) -> Vec<ServerId> {
self.all_members
.keys()
.filter(|id| id.as_str() != self.id.as_str())
.cloned()
.collect()
}

/// get peers urls
#[must_use]
#[inline]
pub fn get_peers_urls(&self) -> Vec<String> {
self.all_members
.iter()
.filter_map(|(id, url)| (id.as_str() != self.id.as_str()).then_some(url))
.cloned()
.collect()
}

/// caculate the member id
#[must_use]
#[inline]
pub fn calc_member_id(&self, cluster_name: &str) -> u64 {
let ts = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_else(|e| unreachable!("SystemTime before UNIX EPOCH! {e}"))
.as_secs();
let mut hasher = DefaultHasher::new();
hasher.write(self.get_self_url().as_bytes());
hasher.write(cluster_name.as_bytes());
hasher.write_u64(ts);
hasher.finish()
}

/// caculate the cluster id
#[must_use]
#[inline]
pub fn calc_cluster_id(&self, cluster_name: &str) -> u64 {
let mut hasher = DefaultHasher::new();
let member_urls = self
.all_members
.values()
.sorted()
.map(String::as_str)
.collect::<Vec<_>>();

for url in member_urls {
hasher.write(url.as_bytes());
}
hasher.write(cluster_name.as_bytes());
hasher.finish()
}

/// get peers
#[must_use]
#[inline]
pub fn get_peers(&self) -> HashMap<ServerId, String> {
self.all_members
.iter()
.filter_map(|(id, url)| {
(id.as_str() != self.id.as_str()).then(|| (id.clone(), url.clone()))
})
.collect()
}

/// get all members
#[must_use]
#[inline]
pub fn get_all_members(&self) -> HashMap<ServerId, String> {
self.all_members.clone()
}

/// peers count
#[must_use]
#[inline]
#[allow(clippy::integer_arithmetic)] // It's impossible to overflow
pub fn peers_len(&self) -> usize {
self.all_members.len() - 1
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_calculate_id() {
let all_members: HashMap<ServerId, String> = vec![
("S1".to_owned(), "S1".to_owned()),
("S2".to_owned(), "S2".to_owned()),
("S3".to_owned(), "S3".to_owned()),
]
.into_iter()
.collect();

let node1 = ClusterMember::new(all_members.clone(), "S1".to_owned());
let node2 = ClusterMember::new(all_members.clone(), "S2".to_owned());
let node3 = ClusterMember::new(all_members, "S3".to_owned());

assert_ne!(node1.calc_member_id(""), node2.calc_member_id(""));
assert_ne!(node1.calc_member_id(""), node3.calc_member_id(""));
assert_ne!(node3.calc_member_id(""), node2.calc_member_id(""));

assert_eq!(node1.calc_cluster_id(""), node2.calc_cluster_id(""));
assert_eq!(node3.calc_cluster_id(""), node2.calc_cluster_id(""));
}

#[test]
fn test_get_peers() {
let all_members: HashMap<ServerId, String> = vec![
("S1".to_owned(), "S1".to_owned()),
("S2".to_owned(), "S2".to_owned()),
("S3".to_owned(), "S3".to_owned()),
]
.into_iter()
.collect();

let node1 = ClusterMember::new(all_members, "S1".to_owned());
let peers = node1.get_peers();
let node1_id = node1.get_self_id();
let node1_url = node1.get_self_url();
assert!(!peers.contains_key(node1.get_self_id()));
assert_eq!(peers.len(), 2);
assert_eq!(node1.peers_len(), peers.len());

let peer_urls = node1.get_peers_urls();
let peer_ids = node1.get_peers_id();
assert_eq!(peer_ids.len(), peer_urls.len());

assert!(peer_urls
.iter()
.find(|url| url.as_str() == node1_url)
.is_none());
assert!(peer_ids
.iter()
.find(|id| id.as_str() == node1_id.as_str())
.is_none());
}
}
18 changes: 7 additions & 11 deletions curp/src/rpc/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,18 @@ pub trait TxFilter: Send + Sync + Debug {

/// Convert a vec of addr string to a vec of `Connect`
pub(crate) async fn connect(
addrs: Arc<HashMap<ServerId, String>>,
addrs: HashMap<ServerId, String>,
tx_filter: Option<Box<dyn TxFilter>>,
) -> HashMap<ServerId, Arc<dyn ConnectApi>> {
futures::future::join_all(addrs.iter().map(|(id, addr)| async move {
futures::future::join_all(addrs.into_iter().map(|(id, mut addr)| async move {
// Addrs must start with "http" to communicate with the server
let addr = if addr.starts_with("http://") {
addr.clone()
} else {
let mut prefix = "http://".to_owned();
prefix.push_str(addr.as_str());
prefix
};
if !addr.starts_with("http://") {
addr.insert_str(0, "http://");
}
(
id.clone(),
id,
addr.clone(),
ProtocolClient::connect(addr).await,
ProtocolClient::connect(addr.clone()).await,
)
}))
.await
Expand Down
23 changes: 12 additions & 11 deletions curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
cmd::{Command, CommandExecutor},
error::ProposeError,
log_entry::LogEntry,
members::ClusterMember,
role_change::RoleChange,
rpc::{
self, connect::ConnectApi, AppendEntriesRequest, AppendEntriesResponse, FetchLeaderRequest,
Expand Down Expand Up @@ -407,19 +408,20 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
#[inline]
#[allow(clippy::too_many_arguments)] // TODO: remove this clippy lint when tx_filter is removed.(issue #143)
pub(super) async fn new<CE: CommandExecutor<C> + 'static>(
id: ServerId,
cluster_info: Arc<ClusterMember>,
is_leader: bool,
others: Arc<HashMap<ServerId, String>>,
cmd_executor: Arc<CE>,
snapshot_allocator: impl SnapshotAllocator + 'static,
role_change: RC,
curp_cfg: Arc<CurpConfig>,
tx_filter: Option<Box<dyn TxFilter>>,
) -> Result<Self, CurpError> {
let sync_events = others
.keys()
let sync_events = cluster_info
.get_peers_id()
.iter()
.map(|server_id| (server_id.clone(), Arc::new(Event::new())))
.collect();

let (log_tx, log_rx) = mpsc::unbounded_channel();
let shutdown_trigger = Arc::new(Event::new());
let cmd_board = Arc::new(RwLock::new(CommandBoard::new()));
Expand All @@ -435,10 +437,10 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {

// create curp state machine
let (voted_for, entries) = storage.recover().await?;
let id = cluster_info.get_self_id();
let curp = if voted_for.is_none() && entries.is_empty() {
Arc::new(RawCurp::new(
id,
others.keys().cloned().collect(),
Arc::clone(&cluster_info),
is_leader,
Arc::clone(&cmd_board),
Arc::clone(&spec_pool),
Expand All @@ -457,8 +459,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
entries.last()
);
Arc::new(RawCurp::recover_from(
id,
others.keys().cloned().collect(),
Arc::clone(&cluster_info),
is_leader,
Arc::clone(&cmd_board),
Arc::clone(&spec_pool),
Expand Down Expand Up @@ -491,7 +492,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
Self::run_bg_tasks(
Arc::clone(&curp),
Arc::clone(&storage),
others,
cluster_info,
Arc::clone(&shutdown_trigger),
tx_filter,
log_rx,
Expand All @@ -513,12 +514,12 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
async fn run_bg_tasks(
curp: Arc<RawCurp<C, RC>>,
storage: Arc<impl StorageApi<Command = C> + 'static>,
others: Arc<HashMap<ServerId, String>>,
cluster_info: Arc<ClusterMember>,
shutdown_trigger: Arc<Event>,
tx_filter: Option<Box<dyn TxFilter>>,
log_rx: tokio::sync::mpsc::UnboundedReceiver<LogEntry<C>>,
) {
let connects = rpc::connect(others, tx_filter).await;
let connects = rpc::connect(cluster_info.get_peers(), tx_filter).await;
let election_task = tokio::spawn(Self::election_task(Arc::clone(&curp), connects.clone()));
let sync_task_daemons = connects
.into_iter()
Expand Down
Loading

0 comments on commit 5654da3

Please sign in to comment.