Skip to content

Commit

Permalink
chore: move curp client & remove compactable loop
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <igxnon@gmail.com>
  • Loading branch information
iGxnon committed Jan 5, 2024
1 parent fe0c0b2 commit fec91f8
Show file tree
Hide file tree
Showing 13 changed files with 69 additions and 92 deletions.
9 changes: 2 additions & 7 deletions xline-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,12 @@ use std::{
task::{Context, Poll},
};

use curp::client::{ClientApi, ClientBuilder as CurpClientBuilder};

use curp::client::ClientBuilder as CurpClientBuilder;
use http::{header::AUTHORIZATION, HeaderValue, Request, Uri};
use tonic::transport::{Channel, Endpoint};
use tower::Service;
use utils::config::ClientConfig;
use xlineapi::command::Command;
use xlineapi::command::{Command, CurpClient};

use crate::{
clients::{
Expand All @@ -178,10 +177,6 @@ pub mod types;
/// Error definitions for `xline-client`.
pub mod error;

/// The curp client trait object
/// TODO: use `type CurpClient = impl ClientApi<...>` when `type_alias_impl_trait` stabilized
type CurpClient = dyn ClientApi<Error = tonic::Status, Cmd = Command> + Sync + Send + 'static;

/// Xline client
#[derive(Clone, Debug)]
pub struct Client {
Expand Down
3 changes: 1 addition & 2 deletions xline/src/server/auth_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ use pbkdf2::{
use tonic::metadata::MetadataMap;
use tracing::debug;
use xlineapi::{
command::{command_from_request_wrapper, CommandResponse, SyncResponse},
command::{command_from_request_wrapper, CommandResponse, CurpClient, SyncResponse},
request_validation::RequestValidator,
RequestWithToken,
};

use super::CurpClient;
use crate::rpc::{
Auth, AuthDisableRequest, AuthDisableResponse, AuthEnableRequest, AuthEnableResponse,
AuthRoleAddRequest, AuthRoleAddResponse, AuthRoleDeleteRequest, AuthRoleDeleteResponse,
Expand Down
7 changes: 3 additions & 4 deletions xline/src/server/cluster_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ use itertools::Itertools;
use tonic::{Request, Response, Status};
use utils::timestamp;
use xlineapi::{
Cluster, Member, MemberAddRequest, MemberAddResponse, MemberListRequest, MemberListResponse,
MemberPromoteRequest, MemberPromoteResponse, MemberRemoveRequest, MemberRemoveResponse,
MemberUpdateRequest, MemberUpdateResponse,
command::CurpClient, Cluster, Member, MemberAddRequest, MemberAddResponse, MemberListRequest,
MemberListResponse, MemberPromoteRequest, MemberPromoteResponse, MemberRemoveRequest,
MemberRemoveResponse, MemberUpdateRequest, MemberUpdateResponse,
};

use super::CurpClient;
use crate::header_gen::HeaderGenerator;

/// Cluster Server
Expand Down
3 changes: 1 addition & 2 deletions xline/src/server/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use futures::future::{join_all, Either};
use tokio::time::timeout;
use tracing::{debug, instrument};
use xlineapi::{
command::{command_from_request_wrapper, Command, CommandResponse, SyncResponse},
command::{command_from_request_wrapper, Command, CommandResponse, CurpClient, SyncResponse},
execute_error::ExecuteError,
request_validation::RequestValidator,
RequestWithToken, ResponseWrapper,
Expand All @@ -23,7 +23,6 @@ use xlineapi::{
use super::{
auth_server::get_token,
barriers::{IdBarrier, IndexBarrier},
CurpClient,
};
use crate::{
revision_check::RevisionCheck,
Expand Down
4 changes: 2 additions & 2 deletions xline/src/server/lease_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ use tonic::transport::Endpoint;
use tracing::{debug, warn};
use utils::shutdown;
use xlineapi::{
command::{Command, CommandResponse, KeyRange, SyncResponse},
command::{Command, CommandResponse, CurpClient, KeyRange, SyncResponse},
execute_error::ExecuteError,
RequestWithToken,
};

use super::{auth_server::get_token, CurpClient};
use super::auth_server::get_token;
use crate::{
id_gen::IdGenerator,
rpc::{
Expand Down
4 changes: 2 additions & 2 deletions xline/src/server/lock_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use clippy_utilities::OverflowArithmetic;
use tonic::transport::{Channel, Endpoint};
use tracing::debug;
use xlineapi::{
command::{command_from_request_wrapper, CommandResponse, KeyRange, SyncResponse},
command::{command_from_request_wrapper, CommandResponse, CurpClient, KeyRange, SyncResponse},
execute_error::ExecuteError,
EventType, RequestWithToken,
};

use super::{auth_server::get_token, CurpClient};
use super::auth_server::get_token;
use crate::{
id_gen::IdGenerator,
rpc::{
Expand Down
2 changes: 1 addition & 1 deletion xline/src/server/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use engine::SnapshotApi;
use futures::stream::Stream;
use sha2::{Digest, Sha256};
use tracing::error;
use xlineapi::command::CurpClient;

use super::CurpClient;
use crate::{
header_gen::HeaderGenerator,
rpc::{
Expand Down
7 changes: 0 additions & 7 deletions xline/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,3 @@ mod xline_server;

pub(crate) use self::maintenance::MAINTENANCE_SNAPSHOT_CHUNK_SIZE;
pub use self::xline_server::XlineServer;

use curp::client::ClientApi;
use xlineapi::command::Command;

/// The curp client trait object
/// TODO: use `type CurpClient = impl ClientApi<...>` when `type_alias_impl_trait` stabilized
type CurpClient = dyn ClientApi<Error = tonic::Status, Cmd = Command> + Sync + Send + 'static;
7 changes: 3 additions & 4 deletions xline/src/server/xline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use utils::{
config::{ClientConfig, CompactConfig, CurpConfig, ServerTimeout, StorageConfig},
shutdown,
};
use xlineapi::command::Command;
use xlineapi::command::{Command, CurpClient};

use super::{
auth_server::AuthServer,
Expand All @@ -30,7 +30,6 @@ use super::{
lock_server::LockServer,
maintenance::MaintenanceServer,
watch_server::{WatchServer, CHANNEL_SIZE},
CurpClient,
};
use crate::{
header_gen::HeaderGenerator,
Expand All @@ -53,7 +52,7 @@ use crate::{
};

/// Rpc Server of curp protocol
type CurpServer<S, C> = Rpc<Command, State<S, C>>;
type CurpServer<S> = Rpc<Command, State<S, Arc<CurpClient>>>;

/// Xline server
#[derive(Debug)]
Expand Down Expand Up @@ -338,7 +337,7 @@ impl XlineServer {
WatchServer<S>,
MaintenanceServer<S>,
ClusterServer,
CurpServer<S, Arc<CurpClient>>,
CurpServer<S>,
Arc<CurpClient>,
)> {
let (header_gen, id_gen) = Self::construct_generator(&self.cluster_info);
Expand Down
19 changes: 10 additions & 9 deletions xline/src/storage/compact/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use event_listener::Event;
use periodic_compactor::PeriodicCompactor;
use revision_compactor::RevisionCompactor;
use tokio::{sync::mpsc::Receiver, time::sleep};
use tracing::warn;
use utils::{config::AutoCompactConfig, shutdown};
use xlineapi::{command::Command, execute_error::ExecuteError};

Expand Down Expand Up @@ -46,27 +45,29 @@ pub(crate) trait Compactor<C: Compactable>: Send + Sync {
#[cfg_attr(test, mockall::automock)]
#[async_trait]
pub(crate) trait Compactable: Send + Sync + 'static {
/// do compact
async fn compact(&self, revision: i64) -> Result<(), ExecuteError>;
/// do compact, return the compacted revision or rpc error
async fn compact(&self, revision: i64) -> Result<i64, tonic::Status>;
}

#[async_trait]
impl Compactable
for Arc<dyn ClientApi<Error = tonic::Status, Cmd = Command> + Sync + Send + 'static>
{
async fn compact(&self, revision: i64) -> Result<(), ExecuteError> {
async fn compact(&self, revision: i64) -> Result<i64, tonic::Status> {

Check warning on line 56 in xline/src/storage/compact/mod.rs

View check run for this annotation

Codecov / codecov/patch

xline/src/storage/compact/mod.rs#L56

Added line #L56 was not covered by tests
let request = CompactionRequest {
revision,
physical: false,
};
let request_wrapper = RequestWithToken::new_with_token(request.into(), None);
let cmd = Command::new(vec![], request_wrapper);
loop {
match self.propose(&cmd, true).await {
Ok(res) => return res.map(|_ig| ()),
Err(err) => warn!("send compaction request failed, error: {err}"),
};
let err = match self.propose(&cmd, true).await? {
Ok(_) => return Ok(revision),
Err(err) => err,

Check warning on line 65 in xline/src/storage/compact/mod.rs

View check run for this annotation

Codecov / codecov/patch

xline/src/storage/compact/mod.rs#L62-L65

Added lines #L62 - L65 were not covered by tests
};
if let ExecuteError::RevisionCompacted(_, compacted_rev) = err {
return Ok(compacted_rev);
}
Err(tonic::Status::from(err))

Check warning on line 70 in xline/src/storage/compact/mod.rs

View check run for this annotation

Codecov / codecov/patch

xline/src/storage/compact/mod.rs#L67-L70

Added lines #L67 - L70 were not covered by tests
}
}

Expand Down
45 changes: 19 additions & 26 deletions xline/src/storage/compact/periodic_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use clippy_utilities::OverflowArithmetic;
use tokio::sync::RwLock;
use tracing::{info, warn};
use utils::shutdown;
use xlineapi::execute_error::ExecuteError;

use super::{Compactable, Compactor};
use crate::revision_number::RevisionNumberGenerator;
Expand Down Expand Up @@ -113,31 +112,25 @@ impl<C: Compactable> PeriodicCompactor<C> {
return None;

Check warning on line 112 in xline/src/storage/compact/periodic_compactor.rs

View check run for this annotation

Codecov / codecov/patch

xline/src/storage/compact/periodic_compactor.rs#L112

Added line #L112 was not covered by tests
};

let res = compactable.compact(revision).await;
if res.is_ok() {
info!(
"completed auto revision compaction, revision = {}, period = {:?}, took {:?}",
revision,
self.period,
now.elapsed().as_secs()
);
return target_revision;
}
if let Err(ExecuteError::RevisionCompacted(_, compacted_rev)) = res {
info!(
"required revision {} has been compacted, the current compacted revision is {}, period = {:?}, took {:?}",
revision,
compacted_rev,
self.period,
now.elapsed().as_secs()
);
return Some(compacted_rev);
match compactable.compact(revision).await {
Ok(rev) => {
info!(
"completed auto revision compaction, request revision = {}, target revision = {}, period = {:?}, took {:?}",
revision,
rev,
self.period,
now.elapsed().as_secs()
);

Check warning on line 123 in xline/src/storage/compact/periodic_compactor.rs

View check run for this annotation

Codecov / codecov/patch

xline/src/storage/compact/periodic_compactor.rs#L117-L123

Added lines #L117 - L123 were not covered by tests
Some(rev)
}
Err(err) => {
warn!(
"failed auto revision compaction, revision = {}, period = {:?}, err: {}",
revision, self.period, err
);
None

Check warning on line 131 in xline/src/storage/compact/periodic_compactor.rs

View check run for this annotation

Codecov / codecov/patch

xline/src/storage/compact/periodic_compactor.rs#L126-L131

Added lines #L126 - L131 were not covered by tests
}
}
warn!(
"failed auto revision compaction, revision = {}, period = {:?}, result: {:?}",
revision, self.period, res
);
None
}
}

Expand Down Expand Up @@ -249,7 +242,7 @@ mod test {
revision_window.sample(revision);
}
let mut compactable = MockCompactable::new();
compactable.expect_compact().times(3).returning(|_| Ok(()));
compactable.expect_compact().times(3).returning(Ok);
let (_shutdown_trigger, shutdown_listener) = shutdown::channel();
let revision_gen = Arc::new(RevisionNumberGenerator::new(1));
let periodic_compactor = PeriodicCompactor::new_arc(
Expand Down
46 changes: 20 additions & 26 deletions xline/src/storage/compact/revision_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use clippy_utilities::OverflowArithmetic;
use tokio::sync::RwLock;
use tracing::{info, warn};
use utils::shutdown;
use xlineapi::execute_error::ExecuteError;

use super::{Compactable, Compactor};
use crate::revision_number::RevisionNumberGenerator;
Expand Down Expand Up @@ -68,34 +67,29 @@ impl<C: Compactable> RevisionCompactor<C> {
target_revision, self.retention
);

let Some(compactor) = &*self.compactable.read().await else {
let Some(compactable) = &*self.compactable.read().await else {
return None;

Check warning on line 71 in xline/src/storage/compact/revision_compactor.rs

View check run for this annotation

Codecov / codecov/patch

xline/src/storage/compact/revision_compactor.rs#L71

Added line #L71 was not covered by tests
};

let res = compactor.compact(target_revision).await;
if res.is_ok() {
info!(
"completed auto revision compaction, revision = {}, retention = {}, took {:?}",
target_revision,
self.retention,
now.elapsed().as_secs()
);
return Some(target_revision);
}
if let Err(ExecuteError::RevisionCompacted(_, compacted_rev)) = res {
info!(
"required revision {} has been compacted, the current compacted revision is {}, retention = {:?}",
target_revision,
compacted_rev,
self.retention,
);
return Some(compacted_rev);
match compactable.compact(target_revision).await {
Ok(rev) => {
info!(
"completed auto revision compaction, request revision = {}, target revision = {}, retention = {}, took {:?}",
target_revision,
rev,
self.retention,
now.elapsed().as_secs()
);

Check warning on line 82 in xline/src/storage/compact/revision_compactor.rs

View check run for this annotation

Codecov / codecov/patch

xline/src/storage/compact/revision_compactor.rs#L76-L82

Added lines #L76 - L82 were not covered by tests
Some(rev)
}
Err(err) => {
warn!(
"failed auto revision compaction, revision = {}, retention = {}, result: {}",
target_revision, self.retention, err
);
None

Check warning on line 90 in xline/src/storage/compact/revision_compactor.rs

View check run for this annotation

Codecov / codecov/patch

xline/src/storage/compact/revision_compactor.rs#L85-L90

Added lines #L85 - L90 were not covered by tests
}
}
warn!(
"failed auto revision compaction, revision = {}, retention = {}, result: {:?}",
target_revision, self.retention, res
);
None
}
}

Expand Down Expand Up @@ -141,7 +135,7 @@ mod test {
#[tokio::test]
async fn revision_compactor_should_work_in_normal_path() {
let mut compactable = MockCompactable::new();
compactable.expect_compact().times(3).returning(|_| Ok(()));
compactable.expect_compact().times(3).returning(Ok);
let (_shutdown_trigger, shutdown_listener) = shutdown::channel();
let revision_gen = Arc::new(RevisionNumberGenerator::new(110));
let revision_compactor =
Expand Down
5 changes: 5 additions & 0 deletions xlineapi/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
ops::{Bound, RangeBounds},
};

use curp::client::ClientApi;
use curp::cmd::Command as CurpCommand;
use curp_external_api::cmd::{ConflictCheck, PbCodec, PbSerializeError};
use itertools::Itertools;
Expand All @@ -15,6 +16,10 @@ use crate::{
ResponseWrapper, TxnRequest,
};

/// The curp client trait object on the command of xline
/// TODO: use `type CurpClient = impl ClientApi<...>` when `type_alias_impl_trait` stabilized
pub type CurpClient = dyn ClientApi<Error = tonic::Status, Cmd = Command> + Sync + Send + 'static;

/// Range start and end to get all keys
const UNBOUNDED: &[u8] = &[0_u8];
/// Range end to get one key
Expand Down

0 comments on commit fec91f8

Please sign in to comment.