Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor propose id into <client_id>#<seq_num> #466

Merged
merged 1 commit into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion curp-external-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ mockall = "0.11.3"
prost = "0.11"
serde = { version = "1.0.130", features = ["derive", "rc"] }
thiserror = "1.0.31"
uuid = { version = "1.1.2", features = ["v4"] }
17 changes: 13 additions & 4 deletions curp-external-api/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use async_trait::async_trait;
use engine::Snapshot;
use prost::DecodeError;
use serde::{de::DeserializeOwned, Serialize};
use uuid::Uuid;

use crate::LogIndex;

Expand Down Expand Up @@ -79,11 +78,21 @@ pub trait Command:
/// Command Id wrapper, abstracting underlying implementation
pub type ProposeId = String;

/// Generate propose id with the given prefix
/// Generate propose id with client id and seq num
#[inline]
#[must_use]
pub fn generate_propose_id(prefix: &str) -> ProposeId {
format!("{}-{}", prefix, Uuid::new_v4())
pub fn generate_propose_id(client_id: u64, seq_num: u64) -> ProposeId {
format!("{client_id}#{seq_num}")
}

/// Parse propose id to (`client_id`, `seq_num`)
#[inline]
#[must_use]
pub fn parse_propose_id(id: &ProposeId) -> Option<(u64, u64)> {
let mut iter = id.split('#');
let client_id = iter.next()?.parse().ok()?;
let seq_num: u64 = iter.next()?.parse().ok()?;
Some((client_id, seq_num))
}

/// Check conflict of two keys
Expand Down
14 changes: 9 additions & 5 deletions curp-test-utils/src/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ use crate::{META_TABLE, REVISION_TABLE, TEST_TABLE};
pub(crate) const APPLIED_INDEX_KEY: &str = "applied_index";
pub(crate) const LAST_REVISION_KEY: &str = "last_revision";

/// Test client id
pub const TEST_CLIENT_ID: &str = "test_client_id";

static NEXT_ID: Lazy<AtomicU64> = Lazy::new(|| AtomicU64::new(1));

pub fn next_id() -> u64 {
NEXT_ID.fetch_add(1, Ordering::SeqCst)
pub fn next_id() -> String {
let seq_num = NEXT_ID.fetch_add(1, Ordering::Relaxed);
format!("{TEST_CLIENT_ID}#{seq_num}")
}

#[derive(Error, Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -66,7 +70,7 @@ pub struct TestCommand {
impl Default for TestCommand {
fn default() -> Self {
Self {
id: next_id().to_string(),
id: next_id(),
keys: vec![1],
exe_dur: Duration::ZERO,
as_dur: Duration::ZERO,
Expand Down Expand Up @@ -114,7 +118,7 @@ impl PbCodec for TestCommandResult {
impl TestCommand {
pub fn new_get(keys: Vec<u32>) -> Self {
Self {
id: next_id().to_string(),
id: next_id(),
keys,
exe_dur: Duration::ZERO,
as_dur: Duration::ZERO,
Expand All @@ -126,7 +130,7 @@ impl TestCommand {

pub fn new_put(keys: Vec<u32>, value: u32) -> Self {
Self {
id: next_id().to_string(),
id: next_id(),
keys,
exe_dur: Duration::ZERO,
as_dur: Duration::ZERO,
Expand Down
1 change: 1 addition & 0 deletions curp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ madsim = { version = "0.2.22", features = ["rpc", "macros"] }
opentelemetry = "0.18.0"
parking_lot = "0.12.1"
prost = "0.11"
rand = "0.8.5"
serde = { version = "1.0.130", features = ["derive", "rc"] }
thiserror = "1.0.31"
tokio = { version = "0.2.23", package = "madsim-tokio", features = [
Expand Down
29 changes: 28 additions & 1 deletion curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
time::Duration,
};

use curp_external_api::cmd::PbSerializeError;
use curp_external_api::cmd::{generate_propose_id, PbSerializeError};
use dashmap::DashMap;
use event_listener::Event;
use futures::{pin_mut, stream::FuturesUnordered, StreamExt};
Expand Down Expand Up @@ -772,6 +772,33 @@ where
fn all_connects(&self) -> Vec<Arc<dyn ConnectApi>> {
self.connects.iter().map(|c| Arc::clone(&c)).collect()
}

/// Get the client id
///
/// # Errors
///
/// `ProposeError::Timeout` if timeout
#[allow(clippy::unused_async)] // TODO: grant a client id from server
async fn get_client_id(&self) -> Result<u64, ProposeError> {
Ok(rand::random())
}

/// New a seq num and record it
#[allow(clippy::unused_self)] // TODO: implement request tracker
fn new_seq_num(&self) -> u64 {
0
}

/// Generate a propose id
///
/// # Errors
/// `ProposeError::Timeout` if timeout
#[inline]
pub async fn gen_propose_id(&self) -> Result<ProposeId, CommandProposeError<C>> {
let client_id = self.get_client_id().await?;
let seq_num = self.new_seq_num();
Ok(generate_propose_id(client_id, seq_num))
}
}

/// Get the superquorum for curp protocol
Expand Down
2 changes: 1 addition & 1 deletion curp/tests/common/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub struct CurpNode {
pub as_rx: mpsc::UnboundedReceiver<(TestCommand, LogIndex)>,
pub role_change_arc: Arc<TestRoleChangeInner>,
pub handle: JoinHandle<Result<(), ServerError>>,
pub trigger: shutdown::Trigger,
pub trigger: Trigger,
}

pub struct CurpGroup {
Expand Down
8 changes: 2 additions & 6 deletions xline-client/src/clients/auth.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use curp::{client::Client as CurpClient, cmd::generate_propose_id};
use curp::client::Client as CurpClient;
use pbkdf2::{
password_hash::{rand_core::OsRng, PasswordHasher, SaltString},
Pbkdf2,
Expand Down Expand Up @@ -30,8 +30,6 @@ use crate::{
/// Client for Auth operations.
#[derive(Clone, Debug)]
pub struct AuthClient {
/// Name of the AuthClient, which will be used in CURP propose id generation
name: String,
/// The client running the CURP protocol, communicate with all servers.
curp_client: Arc<CurpClient<Command>>,
/// The auth RPC client, only communicate with one server at a time
Expand All @@ -44,13 +42,11 @@ impl AuthClient {
/// Creates a new `AuthClient`
#[inline]
pub fn new(
name: String,
curp_client: Arc<CurpClient<Command>>,
channel: Channel,
token: Option<String>,
) -> Self {
Self {
name,
curp_client,
auth_client: xlineapi::AuthClient::new(AuthService::new(
channel,
Expand Down Expand Up @@ -693,7 +689,7 @@ impl AuthClient {
request: Req,
use_fast_path: bool,
) -> Result<Res> {
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(request.into(), self.token.clone());
let cmd = Command::new(vec![], request, propose_id);

Expand Down
26 changes: 8 additions & 18 deletions xline-client/src/clients/kv.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use curp::{client::Client as CurpClient, cmd::generate_propose_id};
use curp::client::Client as CurpClient;
use xline::server::{Command, KeyRange};
use xlineapi::{
CompactionResponse, DeleteRangeResponse, PutResponse, RangeResponse, RequestWithToken,
Expand All @@ -15,8 +15,6 @@ use crate::{
/// Client for KV operations.
#[derive(Clone, Debug)]
pub struct KvClient {
/// Name of the kv client, which will be used in CURP propose id generation
name: String,
/// The client running the CURP protocol, communicate with all servers.
curp_client: Arc<CurpClient<Command>>,
/// The auth token
Expand All @@ -26,16 +24,8 @@ pub struct KvClient {
impl KvClient {
/// New `KvClient`
#[inline]
pub(crate) fn new(
name: String,
curp_client: Arc<CurpClient<Command>>,
token: Option<String>,
) -> Self {
Self {
name,
curp_client,
token,
}
pub(crate) fn new(curp_client: Arc<CurpClient<Command>>, token: Option<String>) -> Self {
Self { curp_client, token }
}

/// Put a key-value into the store
Expand Down Expand Up @@ -65,7 +55,7 @@ impl KvClient {
#[inline]
pub async fn put(&self, request: PutRequest) -> Result<PutResponse> {
let key_ranges = vec![KeyRange::new_one_key(request.key())];
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::PutRequest::from(request).into(),
self.token.clone(),
Expand Down Expand Up @@ -110,7 +100,7 @@ impl KvClient {
#[inline]
pub async fn range(&self, request: RangeRequest) -> Result<RangeResponse> {
let key_ranges = vec![KeyRange::new(request.key(), request.range_end())];
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::RangeRequest::from(request).into(),
self.token.clone(),
Expand Down Expand Up @@ -148,7 +138,7 @@ impl KvClient {
#[inline]
pub async fn delete(&self, request: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let key_ranges = vec![KeyRange::new(request.key(), request.range_end())];
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::DeleteRangeRequest::from(request).into(),
self.token.clone(),
Expand Down Expand Up @@ -203,7 +193,7 @@ impl KvClient {
.iter()
.map(|cmp| KeyRange::new(cmp.key.as_slice(), cmp.range_end.as_slice()))
.collect();
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::TxnRequest::from(request).into(),
self.token.clone(),
Expand Down Expand Up @@ -256,7 +246,7 @@ impl KvClient {
#[inline]
pub async fn compact(&self, request: CompactionRequest) -> Result<CompactionResponse> {
let use_fast_path = request.physical();
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::CompactionRequest::from(request).into(),
self.token.clone(),
Expand Down
10 changes: 3 additions & 7 deletions xline-client/src/clients/lease.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use curp::{client::Client as CurpClient, cmd::generate_propose_id};
use curp::client::Client as CurpClient;
use futures::channel::mpsc::channel;
use tonic::{transport::Channel, Streaming};
use xline::server::Command;
Expand All @@ -22,8 +22,6 @@ use crate::{
/// Client for Lease operations.
#[derive(Clone, Debug)]
pub struct LeaseClient {
/// Name of the LeaseClient, which will be used in CURP propose id generation
name: String,
/// The client running the CURP protocol, communicate with all servers.
curp_client: Arc<CurpClient<Command>>,
/// The lease RPC client, only communicate with one server at a time
Expand All @@ -38,14 +36,12 @@ impl LeaseClient {
/// Creates a new `LeaseClient`
#[inline]
pub fn new(
name: String,
curp_client: Arc<CurpClient<Command>>,
channel: Channel,
token: Option<String>,
id_gen: Arc<LeaseIdGenerator>,
) -> Self {
Self {
name,
curp_client,
lease_client: xlineapi::LeaseClient::new(AuthService::new(
channel,
Expand Down Expand Up @@ -85,7 +81,7 @@ impl LeaseClient {
/// ```
#[inline]
pub async fn grant(&self, mut request: LeaseGrantRequest) -> Result<LeaseGrantResponse> {
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;
if request.inner.id == 0 {
request.inner.id = self.id_gen.next();
}
Expand Down Expand Up @@ -260,7 +256,7 @@ impl LeaseClient {
/// ```
#[inline]
pub async fn leases(&self) -> Result<LeaseLeasesResponse> {
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::LeaseLeasesRequest {}.into(),
self.token.clone(),
Expand Down
19 changes: 3 additions & 16 deletions xline-client/src/clients/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use std::{
};

use clippy_utilities::OverflowArithmetic;
use curp::{
client::Client as CurpClient,
cmd::{generate_propose_id, ProposeId},
};
use curp::{client::Client as CurpClient, cmd::ProposeId};
use futures::{Future, FutureExt};
use tonic::transport::Channel;
use xline::server::{Command, CommandResponse, KeyRange, SyncResponse};
Expand All @@ -35,8 +32,6 @@ use crate::{
/// Client for Lock operations.
#[derive(Clone, Debug)]
pub struct LockClient {
/// Name of the LockClient
name: String,
/// The client running the CURP protocol, communicate with all servers.
curp_client: Arc<CurpClient<Command>>,
/// The lease client
Expand All @@ -53,22 +48,14 @@ impl LockClient {
/// Creates a new `LockClient`
#[inline]
pub fn new(
name: String,
curp_client: Arc<CurpClient<Command>>,
channel: Channel,
token: Option<String>,
id_gen: Arc<LeaseIdGenerator>,
) -> Self {
Self {
name: name.clone(),
curp_client: Arc::clone(&curp_client),
lease_client: LeaseClient::new(
name,
curp_client,
channel.clone(),
token.clone(),
id_gen,
),
lease_client: LeaseClient::new(curp_client, channel.clone(), token.clone(), id_gen),
watch_client: WatchClient::new(channel, token.clone()),
token,
}
Expand Down Expand Up @@ -277,7 +264,7 @@ impl LockClient {
{
let request_with_token =
RequestWithToken::new_with_token(request.into(), self.token.clone());
let propose_id = generate_propose_id(&self.name);
let propose_id = self.curp_client.gen_propose_id().await?;

let cmd = Self::command_from_request_wrapper(propose_id, request_with_token);
self.curp_client
Expand Down
Loading
Loading