Skip to content

Commit

Permalink
chore: rename SyncError to WaitSyncError
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com>

chore: rename `PbSerialize` to `PbCodec`

Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com>
  • Loading branch information
bsbds committed Sep 6, 2023
1 parent afd14ce commit b30fa33
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 70 deletions.
12 changes: 6 additions & 6 deletions curp-external-api/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use crate::LogIndex;
/// Command to execute on the server side
#[async_trait]
pub trait Command:
Sync + Send + DeserializeOwned + Serialize + std::fmt::Debug + Clone + ConflictCheck + PbSerialize
Sync + Send + DeserializeOwned + Serialize + std::fmt::Debug + Clone + ConflictCheck + PbCodec
{
/// Error type
type Error: Send + Sync + Clone + std::error::Error + Serialize + DeserializeOwned + PbSerialize;
type Error: Send + Sync + Clone + std::error::Error + Serialize + DeserializeOwned + PbCodec;

/// K (key) is used to tell confliction
/// The key can be a single key or a key range
Expand All @@ -31,10 +31,10 @@ pub trait Command:
type PR: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned;

/// Execution result
type ER: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned + PbSerialize;
type ER: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned + PbCodec;

/// After_sync result
type ASR: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned + PbSerialize;
type ASR: std::fmt::Debug + Send + Sync + Clone + Serialize + DeserializeOwned + PbCodec;

/// Get keys of the command
fn keys(&self) -> &[Self::K];
Expand Down Expand Up @@ -158,8 +158,8 @@ where
async fn reset(&self, snapshot: Option<(Snapshot, LogIndex)>) -> Result<(), C::Error>;
}

/// Serializaion for protobuf
pub trait PbSerialize: Sized {
/// Codec for encoding and decoding data into/from the Protobuf format
pub trait PbCodec: Sized {
/// Encode
fn encode(&self) -> Vec<u8>;
/// Decode
Expand Down
10 changes: 5 additions & 5 deletions curp-test-utils/src/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
use async_trait::async_trait;
use clippy_utilities::NumericCast;
use curp_external_api::{
cmd::{Command, CommandExecutor, ConflictCheck, PbSerialize, ProposeId},
cmd::{Command, CommandExecutor, ConflictCheck, PbCodec, ProposeId},
LogIndex,
};
use engine::{Engine, EngineType, Snapshot, SnapshotApi, StorageEngine, WriteOperation};
Expand Down Expand Up @@ -39,7 +39,7 @@ impl Display for ExecuteError {
}

// The `ExecuteError` is only for internal use, so we do not have to serialize it to protobuf format
impl PbSerialize for ExecuteError {
impl PbCodec for ExecuteError {
fn encode(&self) -> Vec<u8> {
self.0.clone().into_bytes()
}
Expand Down Expand Up @@ -94,7 +94,7 @@ impl TestCommandResult {
}

// The `TestCommandResult` is only for internal use, so we do not have to serialize it to protobuf format
impl PbSerialize for TestCommandResult {
impl PbCodec for TestCommandResult {
fn encode(&self) -> Vec<u8> {
bincode::serialize(self).unwrap_or_else(|_| {
unreachable!("test cmd result should always be successfully serialized")
Expand Down Expand Up @@ -171,7 +171,7 @@ impl From<LogIndexResult> for LogIndex {
}

// The `TestCommandResult` is only for internal use, so we donnot have to serialize it to protobuf format
impl PbSerialize for LogIndexResult {
impl PbCodec for LogIndexResult {
fn encode(&self) -> Vec<u8> {
bincode::serialize(self).unwrap_or_else(|_| {
unreachable!("test cmd result should always be successfully serialized")
Expand Down Expand Up @@ -219,7 +219,7 @@ impl ConflictCheck for TestCommand {
}

// The `TestCommand` is only for internal use, so we donnot have to serialize it to protobuf format
impl PbSerialize for TestCommand {
impl PbCodec for TestCommand {
fn encode(&self) -> Vec<u8> {
bincode::serialize(self)
.unwrap_or_else(|_| unreachable!("test cmd should always be successfully serialized"))
Expand Down
8 changes: 4 additions & 4 deletions curp/proto/error.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ message ProposeError {
oneof propose_error {
google.protobuf.Empty key_conflict = 1;
google.protobuf.Empty duplicated = 2;
SyncError sync_error = 3;
WaitSyncError wait_sync_error = 3;
string encode_error = 4;
}
}

message CommandSyncError {
oneof command_sync_error {
SyncError sync = 1;
WaitSyncError wait_sync = 1;
// The serialized command error
// The original type is Command::Error
bytes execute = 2;
Expand All @@ -26,8 +26,8 @@ message CommandSyncError {
}
}

message SyncError {
oneof sync_error {
message WaitSyncError {
oneof wait_sync_error {
RedirectData redirect = 1;
string other = 2;
}
Expand Down
14 changes: 10 additions & 4 deletions curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use utils::{config::ClientTimeout, parking_lot_lock::RwLockMap};
use crate::{
cmd::{Command, ProposeId},
error::{
ClientBuildError, CommandProposeError, CommandSyncError, ProposeError, RpcError, SyncError,
ClientBuildError, CommandProposeError, CommandSyncError, ProposeError, RpcError,
WaitSyncError,
},
members::ServerId,
rpc::{
Expand Down Expand Up @@ -326,7 +327,10 @@ where
debug!("slow round for cmd({}) succeeded", cmd.id());
return Ok((asr, er));
}
SyncResult::Error(CommandSyncError::Sync(SyncError::Redirect(server_id, term))) => {
SyncResult::Error(CommandSyncError::WaitSync(WaitSyncError::Redirect(
server_id,
term,
))) => {
let new_leader = server_id.and_then(|id| {
self.state.map_write(|mut state| {
(state.term <= term).then(|| {
Expand All @@ -338,8 +342,10 @@ where
});
self.resend_propose(Arc::clone(&cmd), new_leader).await?; // resend the propose to the new leader
}
SyncResult::Error(CommandSyncError::Sync(e)) => {
return Err(ProposeError::SyncedError(SyncError::Other(e.to_string())).into());
SyncResult::Error(CommandSyncError::WaitSync(e)) => {
return Err(
ProposeError::SyncedError(WaitSyncError::Other(e.to_string())).into(),
);
}
SyncResult::Error(CommandSyncError::Execute(e)) => {
return Err(CommandProposeError::Execute(e));
Expand Down
70 changes: 37 additions & 33 deletions curp/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::io;

use curp_external_api::cmd::{Command, PbSerialize, PbSerializeError};
use curp_external_api::cmd::{Command, PbCodec, PbSerializeError};
use prost::Message;
use serde::{Deserialize, Serialize};
use thiserror::Error;
Expand All @@ -9,7 +9,7 @@ use crate::{
members::ServerId,
rpc::{
PbCommandSyncError, PbCommandSyncErrorOuter, PbProposeError, PbProposeErrorOuter,
PbSyncError, PbSyncErrorOuter, RedirectData,
PbWaitSyncError, PbWaitSyncErrorOuter, RedirectData,
},
};

Expand Down Expand Up @@ -80,7 +80,7 @@ pub enum ProposeError {
Duplicated,
/// Command syncing error
#[error("syncing error {0}")]
SyncedError(SyncError),
SyncedError(WaitSyncError),
/// Encode error
#[error("encode error: {0}")]
EncodeError(String),
Expand All @@ -94,9 +94,11 @@ impl TryFrom<PbProposeError> for ProposeError {
Ok(match err {
PbProposeError::KeyConflict(_) => ProposeError::KeyConflict,
PbProposeError::Duplicated(_) => ProposeError::Duplicated,
PbProposeError::SyncError(e) => {
ProposeError::SyncedError(e.sync_error.ok_or(PbSerializeError::EmptyField)?.into())
}
PbProposeError::WaitSyncError(e) => ProposeError::SyncedError(
e.wait_sync_error
.ok_or(PbSerializeError::EmptyField)?
.into(),
),
PbProposeError::EncodeError(s) => ProposeError::EncodeError(s),
})
}
Expand All @@ -108,8 +110,8 @@ impl From<ProposeError> for PbProposeError {
match err {
ProposeError::KeyConflict => PbProposeError::KeyConflict(()),
ProposeError::Duplicated => PbProposeError::Duplicated(()),
ProposeError::SyncedError(e) => PbProposeError::SyncError(PbSyncErrorOuter {
sync_error: Some(e.into()),
ProposeError::SyncedError(e) => PbProposeError::WaitSyncError(PbWaitSyncErrorOuter {
wait_sync_error: Some(e.into()),
}),
ProposeError::EncodeError(s) => PbProposeError::EncodeError(s),
}
Expand All @@ -123,7 +125,7 @@ impl From<PbSerializeError> for ProposeError {
}
}

impl PbSerialize for ProposeError {
impl PbCodec for ProposeError {
#[inline]
fn encode(&self) -> Vec<u8> {
PbProposeErrorOuter {
Expand Down Expand Up @@ -188,7 +190,7 @@ pub enum CommandProposeError<C: Command> {
#[derive(Clone, Error, Serialize, Deserialize, Debug)]
#[allow(clippy::module_name_repetitions)] // this-error generate code false-positive
#[non_exhaustive]
pub enum SyncError {
pub enum WaitSyncError {
/// If client sent a wait synced request to a non-leader
#[error("redirect to {0:?}, term {1}")]
Redirect(Option<ServerId>, u64),
Expand All @@ -197,23 +199,23 @@ pub enum SyncError {
Other(String),
}

impl From<PbSyncError> for SyncError {
impl From<PbWaitSyncError> for WaitSyncError {
#[inline]
fn from(err: PbSyncError) -> Self {
fn from(err: PbWaitSyncError) -> Self {
match err {
PbSyncError::Redirect(data) => SyncError::Redirect(data.server_id, data.term),
PbSyncError::Other(s) => SyncError::Other(s),
PbWaitSyncError::Redirect(data) => WaitSyncError::Redirect(data.server_id, data.term),
PbWaitSyncError::Other(s) => WaitSyncError::Other(s),
}
}
}

impl From<SyncError> for PbSyncError {
fn from(err: SyncError) -> Self {
impl From<WaitSyncError> for PbWaitSyncError {
fn from(err: WaitSyncError) -> Self {
match err {
SyncError::Redirect(server_id, term) => {
PbSyncError::Redirect(RedirectData { server_id, term })
WaitSyncError::Redirect(server_id, term) => {
PbWaitSyncError::Redirect(RedirectData { server_id, term })
}
SyncError::Other(s) => PbSyncError::Other(s),
WaitSyncError::Other(s) => PbWaitSyncError::Other(s),
}
}
}
Expand All @@ -222,7 +224,7 @@ impl From<SyncError> for PbSyncError {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) enum CommandSyncError<C: Command> {
/// If wait sync went wrong
Sync(SyncError),
WaitSync(WaitSyncError),
/// If the execution of the cmd went wrong
Execute(C::Error),
/// If after sync of the cmd went wrong
Expand All @@ -232,8 +234,8 @@ pub(crate) enum CommandSyncError<C: Command> {
impl<C: Command> From<CommandSyncError<C>> for PbCommandSyncError {
fn from(err: CommandSyncError<C>) -> Self {
match err {
CommandSyncError::Sync(e) => PbCommandSyncError::Sync(PbSyncErrorOuter {
sync_error: Some(e.into()),
CommandSyncError::WaitSync(e) => PbCommandSyncError::WaitSync(PbWaitSyncErrorOuter {
wait_sync_error: Some(e.into()),
}),
CommandSyncError::Execute(e) => PbCommandSyncError::Execute(e.encode()),
CommandSyncError::AfterSync(e) => PbCommandSyncError::AfterSync(e.encode()),
Expand All @@ -246,9 +248,11 @@ impl<C: Command> TryFrom<PbCommandSyncError> for CommandSyncError<C> {

fn try_from(err: PbCommandSyncError) -> Result<Self, Self::Error> {
Ok(match err {
PbCommandSyncError::Sync(e) => {
CommandSyncError::Sync(e.sync_error.ok_or(PbSerializeError::EmptyField)?.into())
}
PbCommandSyncError::WaitSync(e) => CommandSyncError::WaitSync(
e.wait_sync_error
.ok_or(PbSerializeError::EmptyField)?
.into(),
),
PbCommandSyncError::Execute(e) => {
CommandSyncError::Execute(<C as Command>::Error::decode(&e)?)
}
Expand All @@ -259,7 +263,7 @@ impl<C: Command> TryFrom<PbCommandSyncError> for CommandSyncError<C> {
}
}

impl<C: Command> PbSerialize for CommandSyncError<C> {
impl<C: Command> PbCodec for CommandSyncError<C> {
fn encode(&self) -> Vec<u8> {
PbCommandSyncErrorOuter {
command_sync_error: Some(self.clone().into()),
Expand All @@ -275,9 +279,9 @@ impl<C: Command> PbSerialize for CommandSyncError<C> {
}
}

impl<C: Command> From<SyncError> for CommandSyncError<C> {
fn from(err: SyncError) -> Self {
Self::Sync(err)
impl<C: Command> From<WaitSyncError> for CommandSyncError<C> {
fn from(err: WaitSyncError) -> Self {
Self::WaitSync(err)
}
}

Expand All @@ -291,21 +295,21 @@ mod test {
fn propose_error_serialization_is_ok() {
let err = ProposeError::Duplicated;
let _decoded_err =
<ProposeError as PbSerialize>::decode(&err.encode()).expect("decode should success");
<ProposeError as PbCodec>::decode(&err.encode()).expect("decode should success");
assert!(matches!(err, _decoded_err));
}

#[test]
fn cmd_sync_error_serialization_is_ok() {
let err: CommandSyncError<TestCommand> =
CommandSyncError::Sync(SyncError::Other("msg".to_owned()));
let _decoded_err = <CommandSyncError<TestCommand> as PbSerialize>::decode(&err.encode())
CommandSyncError::WaitSync(WaitSyncError::Other("msg".to_owned()));
let _decoded_err = <CommandSyncError<TestCommand> as PbCodec>::decode(&err.encode())
.expect("decode should success");
assert!(matches!(err, _decoded_err));

let err1: CommandSyncError<TestCommand> =
CommandSyncError::Execute(ExecuteError("msg".to_owned()));
let _decoded_err1 = <CommandSyncError<TestCommand> as PbSerialize>::decode(&err1.encode())
let _decoded_err1 = <CommandSyncError<TestCommand> as PbCodec>::decode(&err1.encode())
.expect("decode should success");
assert!(matches!(err1, _decoded_err1));
}
Expand Down
13 changes: 7 additions & 6 deletions curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::{collections::HashMap, sync::Arc};

use curp_external_api::cmd::{PbSerialize, PbSerializeError};
use curp_external_api::cmd::{PbCodec, PbSerializeError};
use serde::{de::DeserializeOwned, Serialize};

use crate::{
cmd::{Command, ProposeId},
error::{CommandSyncError, ProposeError, SyncError},
error::{CommandSyncError, ProposeError, WaitSyncError},
log_entry::LogEntry,
members::ServerId,
LogIndex,
Expand All @@ -20,9 +20,10 @@ pub(crate) use self::proto::{
},
errorpb::{
command_sync_error::CommandSyncError as PbCommandSyncError,
propose_error::ProposeError as PbProposeError, sync_error::SyncError as PbSyncError,
propose_error::ProposeError as PbProposeError,
wait_sync_error::WaitSyncError as PbWaitSyncError,
CommandSyncError as PbCommandSyncErrorOuter, ProposeError as PbProposeErrorOuter,
RedirectData, SyncError as PbSyncErrorOuter,
RedirectData, WaitSyncError as PbWaitSyncErrorOuter,
},
messagepb::{
fetch_read_state_response::ReadState, protocol_server::Protocol, AppendEntriesRequest,
Expand Down Expand Up @@ -243,7 +244,7 @@ impl WaitSyncedResponse {
unreachable!("should not call after sync if execution fails")
}
(None, None) => WaitSyncedResponse::new_error::<C>(
SyncError::Other("can't get er result".to_owned()).into(),
WaitSyncError::Other("can't get er result".to_owned()).into(),
), // this is highly unlikely to happen,
(Some(Err(err)), None) => {
WaitSyncedResponse::new_error(CommandSyncError::<C>::Execute(err))
Expand All @@ -256,7 +257,7 @@ impl WaitSyncedResponse {
// The er is ignored as the propose has failed
(Some(Ok(_er)), None) => {
WaitSyncedResponse::new_error::<C>(
SyncError::Other("can't get after sync result".to_owned()).into(),
WaitSyncError::Other("can't get after sync result".to_owned()).into(),
) // this is highly unlikely to happen,
}
}
Expand Down
Loading

0 comments on commit b30fa33

Please sign in to comment.