Skip to content

Commit

Permalink
refactor: share all_members information between different servers
Browse files Browse the repository at this point in the history
  • Loading branch information
Phoenix500526 committed May 27, 2023
1 parent c7d0089 commit 86f4584
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 44 deletions.
2 changes: 1 addition & 1 deletion benchmark/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl CommandRunner {
let mut clients = Vec::with_capacity(self.args.clients);
for _ in 0..self.args.clients {
let client = Client::new(
self.args.endpoints.clone(),
self.args.endpoints.clone().into(),
self.args.use_curp,
ClientTimeout::new(
Duration::from_secs(10),
Expand Down
2 changes: 1 addition & 1 deletion curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
{
/// Create a new protocol client based on the addresses
#[inline]
pub async fn new(addrs: HashMap<ServerId, String>, timeout: ClientTimeout) -> Self {
pub async fn new(addrs: Arc<HashMap<ServerId, String>>, timeout: ClientTimeout) -> Self {
Self {
state: RwLock::new(State::new()),
connects: rpc::connect(addrs, None).await,
Expand Down
18 changes: 11 additions & 7 deletions curp/src/rpc/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,22 @@ pub trait TxFilter: Send + Sync + Debug {

/// Convert a vec of addr string to a vec of `Connect`
pub(crate) async fn connect(
addrs: HashMap<ServerId, String>,
addrs: Arc<HashMap<ServerId, String>>,
tx_filter: Option<Box<dyn TxFilter>>,
) -> HashMap<ServerId, Arc<dyn ConnectApi>> {
futures::future::join_all(addrs.into_iter().map(|(id, mut addr)| async move {
futures::future::join_all(addrs.iter().map(|(id, addr)| async move {
// Addrs must start with "http" to communicate with the server
if !addr.starts_with("http://") {
addr.insert_str(0, "http://");
}
let addr = if addr.starts_with("http://") {
addr.clone()
} else {
let mut prefix = "http://".to_owned();
prefix.push_str(addr.as_str());
prefix
};
(
id,
id.clone(),
addr.clone(),
ProtocolClient::connect(addr.clone()).await,
ProtocolClient::connect(addr).await,
)
}))
.await
Expand Down
8 changes: 4 additions & 4 deletions curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
pub(super) async fn new<CE: CommandExecutor<C> + 'static>(
id: ServerId,
is_leader: bool,
others: HashMap<ServerId, String>,
others: Arc<HashMap<ServerId, String>>,
cmd_executor: Arc<CE>,
snapshot_allocator: impl SnapshotAllocator + 'static,
role_change: RC,
Expand Down Expand Up @@ -491,7 +491,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
Self::run_bg_tasks(
Arc::clone(&curp),
Arc::clone(&storage),
others.clone(),
others,
Arc::clone(&shutdown_trigger),
tx_filter,
log_rx,
Expand All @@ -513,12 +513,12 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
async fn run_bg_tasks(
curp: Arc<RawCurp<C, RC>>,
storage: Arc<impl StorageApi<Command = C> + 'static>,
others: HashMap<ServerId, String>,
others: Arc<HashMap<ServerId, String>>,
shutdown_trigger: Arc<Event>,
tx_filter: Option<Box<dyn TxFilter>>,
log_rx: tokio::sync::mpsc::UnboundedReceiver<LogEntry<C>>,
) {
let connects = rpc::connect(others.clone(), tx_filter).await;
let connects = rpc::connect(others, tx_filter).await;
let election_task = tokio::spawn(Self::election_task(Arc::clone(&curp), connects.clone()));
let sync_task_daemons = connects
.into_iter()
Expand Down
6 changes: 3 additions & 3 deletions curp/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl<C: Command + 'static, RC: RoleChange + 'static> Rpc<C, RC> {
pub async fn new<CE: CommandExecutor<C> + 'static>(
id: ServerId,
is_leader: bool,
others: HashMap<ServerId, String>,
others: Arc<HashMap<ServerId, String>>,
executor: CE,
snapshot_allocator: impl SnapshotAllocator + 'static,
role_change: RC,
Expand Down Expand Up @@ -182,7 +182,7 @@ impl<C: Command + 'static, RC: RoleChange + 'static> Rpc<C, RC> {
pub async fn run<CE, U, UE>(
id: ServerId,
is_leader: bool,
others: HashMap<ServerId, String>,
others: Arc<HashMap<ServerId, String>>,
server_port: Option<u16>,
executor: CE,
snapshot_allocator: impl SnapshotAllocator + 'static,
Expand Down Expand Up @@ -248,7 +248,7 @@ impl<C: Command + 'static, RC: RoleChange + 'static> Rpc<C, RC> {
pub async fn run_from_listener<CE, U, UE>(
id: ServerId,
is_leader: bool,
others: HashMap<ServerId, String>,
others: Arc<HashMap<ServerId, String>>,
listener: TcpListener,
executor: CE,
snapshot_allocator: impl SnapshotAllocator + 'static,
Expand Down
6 changes: 3 additions & 3 deletions curp/tests/common/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl CurpGroup {
handle.spawn(Rpc::run_from_listener(
id_c,
i == 0,
others.into_iter().collect(),
Arc::new(others.into_iter().collect()),
listener,
ce,
MemorySnapshotAllocator,
Expand Down Expand Up @@ -225,7 +225,7 @@ impl CurpGroup {
.iter()
.map(|(id, node)| (id.clone(), node.addr.clone()))
.collect();
Client::<TestCommand>::new(addrs, timeout).await
Client::<TestCommand>::new(Arc::new(addrs), timeout).await
}

pub fn exe_rxs(
Expand Down Expand Up @@ -315,7 +315,7 @@ impl CurpGroup {
handle.spawn(Rpc::run_from_listener(
id_c,
is_leader,
others.into_iter().collect(),
Arc::new(others.into_iter().collect()),
listener,
ce,
MemorySnapshotAllocator,
Expand Down
4 changes: 2 additions & 2 deletions xline/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, fmt::Debug};
use std::{collections::HashMap, fmt::Debug, sync::Arc};

use curp::{client::Client as CurpClient, cmd::ProposeId};
use etcd_client::{
Expand Down Expand Up @@ -64,7 +64,7 @@ impl Client {
/// If `EtcdClient::connect` fails.
#[inline]
pub async fn new(
all_members: HashMap<String, String>,
all_members: Arc<HashMap<String, String>>,
use_curp_client: bool,
timeout: ClientTimeout,
) -> Result<Self, ClientError> {
Expand Down
2 changes: 1 addition & 1 deletion xline/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ async fn main() -> Result<()> {
let db_proxy = DB::open(storage_config)?;
let server = XlineServer::new(
cluster_config.name().clone(),
cluster_config.members().clone(),
cluster_config.members().clone().into(),
*cluster_config.is_leader(),
cluster_config.curp_config().clone(),
*cluster_config.client_timeout(),
Expand Down
4 changes: 2 additions & 2 deletions xline/src/server/lease_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ where
/// Id generator
id_gen: Arc<IdGenerator>,
/// Address of all members
all_members: HashMap<String, String>,
all_members: Arc<HashMap<String, String>>,
}

impl<S> LeaseServer<S>
Expand All @@ -57,7 +57,7 @@ where
client: Arc<Client<Command>>,
name: String,
id_gen: Arc<IdGenerator>,
all_members: HashMap<String, String>,
all_members: Arc<HashMap<String, String>>,
) -> Arc<Self> {
let lease_server = Arc::new(Self {
lease_storage,
Expand Down
23 changes: 13 additions & 10 deletions xline/src/server/xline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct XlineServer {
/// is leader
is_leader: bool,
/// all Members
all_members: HashMap<ServerId, String>,
all_members: Arc<HashMap<ServerId, String>>,
/// Curp server timeout
curp_cfg: Arc<CurpConfig>,
/// Client timeout
Expand All @@ -81,7 +81,7 @@ impl XlineServer {
#[must_use]
pub fn new(
name: String,
all_members: HashMap<ServerId, String>,
all_members: Arc<HashMap<ServerId, String>>,
is_leader: bool,
curp_config: CurpConfig,
client_timeout: ClientTimeout,
Expand Down Expand Up @@ -316,8 +316,9 @@ impl XlineServer {
&auth_revision_gen,
key_pair,
)?;
let client =
Arc::new(Client::<Command>::new(self.all_members.clone(), self.client_timeout).await);
let client = Arc::new(
Client::<Command>::new(Arc::clone(&self.all_members), self.client_timeout).await,
);
let index_barrier = Arc::new(IndexBarrier::new());
let id_barrier = Arc::new(IdBarrier::new());

Expand Down Expand Up @@ -355,7 +356,7 @@ impl XlineServer {
Arc::clone(&client),
self.id.clone(),
Arc::clone(&id_gen),
self.all_members.clone(),
Arc::clone(&self.all_members),
),
AuthServer::new(
Arc::clone(&auth_storage),
Expand Down Expand Up @@ -387,11 +388,13 @@ impl XlineServer {
storage_cfg: &StorageConfig,
curp_cfg: Arc<CurpConfig>,
) -> CurpServer<S> {
let others = all_members
.iter()
.filter(|&(key, _value)| id.as_str() != key.as_str())
.map(|(server_id, addr)| (server_id.clone(), addr.clone()))
.collect();
let others = Arc::new(
all_members
.iter()
.filter(|&(key, _value)| id.as_str() != key.as_str())
.map(|(server_id, addr)| (server_id.clone(), addr.clone()))
.collect(),
);

match *storage_cfg {
StorageConfig::Memory => {
Expand Down
27 changes: 17 additions & 10 deletions xline/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::{BTreeMap, HashMap},
path::PathBuf,
sync::Arc,
};

use jsonwebtoken::{DecodingKey, EncodingKey};
Expand All @@ -18,7 +19,7 @@ pub struct Cluster {
/// listeners of members
listeners: BTreeMap<usize, TcpListener>,
/// address of members
all_members: HashMap<String, String>,
all_members: Arc<HashMap<String, String>>,
/// Client of cluster
client: Option<Client>,
/// Stop sender
Expand All @@ -36,10 +37,12 @@ impl Cluster {
for i in 0..size {
listeners.insert(i, TcpListener::bind("0.0.0.0:0").await.unwrap());
}
let all_members: HashMap<String, String> = listeners
.iter()
.map(|(i, l)| (format!("server{}", i), l.local_addr().unwrap().to_string()))
.collect();
let all_members = Arc::new(
listeners
.iter()
.map(|(i, l)| (format!("server{}", i), l.local_addr().unwrap().to_string()))
.collect(),
);

Self {
listeners,
Expand Down Expand Up @@ -107,11 +110,15 @@ impl Cluster {
/// Create or get the client with the specified index
pub(crate) async fn client(&mut self) -> &mut Client {
if self.client.is_none() {
let client = Client::new(self.all_members.clone(), true, ClientTimeout::default())
.await
.unwrap_or_else(|e| {
panic!("Client connect error: {:?}", e);
});
let client = Client::new(
Arc::clone(&self.all_members),
true,
ClientTimeout::default(),
)
.await
.unwrap_or_else(|e| {
panic!("Client connect error: {:?}", e);
});
self.client = Some(client);
}
self.client.as_mut().unwrap()
Expand Down

0 comments on commit 86f4584

Please sign in to comment.