Skip to content

Commit

Permalink
refactor: execute in after sync
Browse files Browse the repository at this point in the history
  • Loading branch information
bsbds committed May 9, 2024
1 parent b600a7d commit 3cb9ed5
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 75 deletions.
125 changes: 91 additions & 34 deletions crates/xline/src/server/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ use xlineapi::{
};

use crate::{
revision_number::RevisionNumberGeneratorState,
rpc::{RequestBackend, RequestWrapper},
storage::{
db::{WriteOp, DB},
index::IndexOperate,
storage_api::XlineStorageOps,
AlarmStore, AuthStore, KvStore, LeaseStore,
},
Expand Down Expand Up @@ -265,6 +267,73 @@ impl CommandExecutor {
_ => Ok(()),
}
}

/// After sync KV commands
async fn after_sync_kv<T>(
&self,
wrapper: &RequestWrapper,
txn_db: &T,
index: &(dyn IndexOperate + Send + Sync),
revision_gen: &RevisionNumberGeneratorState<'_>,
to_execute: bool,
) -> Result<
(
<Command as CurpCommand>::ASR,
Option<<Command as CurpCommand>::ER>,
),
ExecuteError,
>
where
T: XlineStorageOps + TransactionApi,
{
let (asr, er) = self
.kv_storage
.after_sync(wrapper, txn_db, index, revision_gen, to_execute)
.await?;
Ok((asr, er))
}

async fn after_sync_others<T>(
&self,
wrapper: &RequestWrapper,
txn_db: &T,
general_revision: &RevisionNumberGeneratorState<'_>,
auth_revision: &RevisionNumberGeneratorState<'_>,
to_execute: bool,
) -> Result<
(
<Command as CurpCommand>::ASR,
Option<<Command as CurpCommand>::ER>,
),
ExecuteError,
>
where
T: XlineStorageOps + TransactionApi,
{
let er = to_execute
.then(|| match wrapper.backend() {
RequestBackend::Auth => self.auth_storage.execute(wrapper),
RequestBackend::Lease => self.lease_storage.execute(wrapper),
RequestBackend::Alarm => Ok(self.alarm_storage.execute(wrapper)),
RequestBackend::Kv => unreachable!("Should not execute kv commands"),
})
.transpose()?;

let (asr, wr_ops) = match wrapper.backend() {
RequestBackend::Auth => self.auth_storage.after_sync(wrapper, &auth_revision)?,
RequestBackend::Lease => {
self.lease_storage
.after_sync(wrapper, &general_revision)
.await?
}
RequestBackend::Alarm => self.alarm_storage.after_sync(wrapper, &general_revision),
RequestBackend::Kv => unreachable!("Should not sync kv commands"),
};

txn_db.write_ops(wr_ops)?;

Ok((asr, er))
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -316,7 +385,7 @@ impl CurpCommandExecutor<Command> for CommandExecutor {
.collect::<Result<_, _>>()?;

let index = self.kv_storage.index();
let mut index_state = index.state();
let index_state = index.state();
let general_revision_gen = self.kv_storage.revision_gen();
let auth_revision_gen = self.auth_storage.revision_gen();
let general_revision_state = general_revision_gen.state();
Expand All @@ -328,40 +397,28 @@ impl CurpCommandExecutor<Command> for CommandExecutor {
let mut resps = Vec::with_capacity(cmds.len());
for (cmd, to_execute) in cmds.into_iter().map(AfterSyncCmd::into_parts) {
let wrapper = cmd.request();
let er = to_execute
.then(|| match wrapper.backend() {
RequestBackend::Kv => self
.kv_storage
.execute(wrapper, Some((&txn_db, &mut index_state))),
RequestBackend::Auth => self.auth_storage.execute(wrapper),
RequestBackend::Lease => self.lease_storage.execute(wrapper),
RequestBackend::Alarm => Ok(self.alarm_storage.execute(wrapper)),
})
.transpose()?;
tracing::info!("sync cmd: {cmd:?}");
if to_execute {
tracing::info!("execute in after sync for: {cmd:?}");
}
let (asr, wr_ops) = match wrapper.backend() {
RequestBackend::Kv => (
self.kv_storage
.after_sync(wrapper, &txn_db, &index_state, &general_revision_state)
.await?,
vec![],
),
RequestBackend::Auth => self
.auth_storage
.after_sync(wrapper, &auth_revision_state)?,
RequestBackend::Lease => {
self.lease_storage
.after_sync(wrapper, &general_revision_state)
.await?
let (asr, er) = match wrapper.backend() {
RequestBackend::Kv => {
self.after_sync_kv(
wrapper,
&txn_db,
&index_state,
&general_revision_state,
to_execute,
)
.await
}
RequestBackend::Alarm => self
.alarm_storage
.after_sync(wrapper, &general_revision_state),
};
txn_db.write_ops(wr_ops)?;
RequestBackend::Auth | RequestBackend::Lease | RequestBackend::Alarm => {
self.after_sync_others(
wrapper,
&txn_db,
&general_revision_state,
&auth_revision_state,
to_execute,
)
.await
}
}?;
resps.push((asr, er));

if let RequestWrapper::CompactionRequest(ref compact_req) = *wrapper {
Expand Down
1 change: 1 addition & 0 deletions crates/xline/src/server/watch_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ mod test {
&txn,
&store.index().state(),
&store.revision_gen().state(),
false,
)
.await
.unwrap();
Expand Down
Loading

0 comments on commit 3cb9ed5

Please sign in to comment.