Skip to content

Commit

Permalink
chore: fix check
Browse files Browse the repository at this point in the history
  • Loading branch information
bsbds committed May 9, 2024
1 parent 3cb9ed5 commit 84a437a
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 12 deletions.
2 changes: 1 addition & 1 deletion crates/curp/src/server/cmd_worker/conflict_checked_mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ impl<C: Command, CE: CommandExecutor<C>> Filter<C, CE> {
None
}
Err(err) => {
self.cmd_executor.trigger(entry.inflight_id(), entry.index);
self.cmd_executor.trigger(entry.inflight_id());
Some(err)
}
}
Expand Down
15 changes: 11 additions & 4 deletions crates/curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{fmt::Debug, iter, sync::Arc};

use async_trait::async_trait;
use clippy_utilities::NumericCast;
use curp_external_api::cmd::AfterSyncCmd;
#[cfg(test)]
use mockall::automock;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -141,7 +142,7 @@ async fn worker_exe<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
| EntryData::SetNodeState(_, _, _) => true,
};
if !success {
ce.trigger(entry.inflight_id(), entry.index);
ce.trigger(entry.inflight_id());
}
success
}
Expand All @@ -161,10 +162,16 @@ async fn worker_as<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
let id = curp.id();
let success = match entry.entry_data {
EntryData::Command(ref cmd) => {
let Some(prepare) = prepare else {
let Some(_prepare) = prepare else {
unreachable!("prepare should always be Some(_) when entry is a command");
};
let asr = ce.after_sync(cmd.as_ref(), entry.index, prepare).await;
let asr = ce
.after_sync(vec![AfterSyncCmd::new(cmd.as_ref(), false)], entry.index)
.await
.map(|res| {
let (asr, _) = res.into_iter().next().unwrap();
asr
});
let asr_ok = asr.is_ok();
cb.write().insert_asr(entry.propose_id, asr);
sp.lock()
Expand Down Expand Up @@ -248,7 +255,7 @@ async fn worker_as<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
}
EntryData::Empty => true,
};
ce.trigger(entry.inflight_id(), entry.index);
ce.trigger(entry.inflight_id());
success
}

Expand Down
4 changes: 2 additions & 2 deletions crates/xline/src/conflict/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ impl EntryGenerator {
)
}

fn gen_entry(&mut self, keys: Vec<KeyRange>, req: RequestWrapper) -> CommandEntry<Command> {
fn gen_entry(&mut self, _keys: Vec<KeyRange>, req: RequestWrapper) -> CommandEntry<Command> {
self.id += 1;
let cmd = Command::new(keys, req);
let cmd = Command::new(req);
CommandEntry::new(ProposeId(0, self.id), Arc::new(cmd))
}
}
1 change: 1 addition & 0 deletions crates/xline/src/server/barriers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl IndexBarrier {
}

/// Trigger all barriers whose index is less than or equal to the given index.
#[allow(dead_code)]
pub(crate) fn trigger(&self, index: LogIndex) {
let mut inner_l = self.inner.lock();
if inner_l.last_trigger_index < index {
Expand Down
7 changes: 7 additions & 0 deletions crates/xline/src/server/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,13 @@ impl CommandExecutor {

#[async_trait::async_trait]
impl CurpCommandExecutor<Command> for CommandExecutor {
fn prepare(
&self,
_cmd: &Command,
) -> Result<<Command as CurpCommand>::PR, <Command as CurpCommand>::Error> {
Ok(-1)
}

async fn execute(
&self,
cmd: &Command,
Expand Down
4 changes: 2 additions & 2 deletions crates/xline/src/server/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use xlineapi::{
AuthInfo, ResponseWrapper,
};

use super::{auth_server::get_token, barriers::IndexBarrier};
use super::barriers::IndexBarrier;
use crate::{
metrics,
revision_check::RevisionCheck,
Expand Down Expand Up @@ -93,7 +93,7 @@ impl KvServer {
fn do_serializable(&self, command: &Command) -> Result<Response, tonic::Status> {
self.auth_storage
.check_permission(command.request(), command.auth_info())?;
let cmd_res = self.kv_storage.execute(command.request())?;
let cmd_res = self.kv_storage.execute(command.request(), None)?;
Ok(Self::parse_response_op(cmd_res.into_inner().into()))
}

Expand Down
3 changes: 0 additions & 3 deletions crates/xline/src/server/xline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,7 @@ impl XlineServer {
Arc::clone(&lease_storage),
Arc::clone(&alarm_storage),
Arc::clone(&persistent),
Arc::clone(&index_barrier),
Arc::clone(&id_barrier),
header_gen.general_revision_arc(),
header_gen.auth_revision_arc(),
Arc::clone(&compact_events),
self.storage_config.quota,
));
Expand Down

0 comments on commit 84a437a

Please sign in to comment.