From 3cb9ed5fde708280209e0555f919825ae8683811 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Wed, 10 Apr 2024 16:35:34 +0800 Subject: [PATCH] refactor: execute in after sync --- crates/xline/src/server/command.rs | 125 +++++++++++++------ crates/xline/src/server/watch_server.rs | 1 + crates/xline/src/storage/kv_store.rs | 156 ++++++++++++++++++------ crates/xline/src/storage/kvwatcher.rs | 2 +- 4 files changed, 209 insertions(+), 75 deletions(-) diff --git a/crates/xline/src/server/command.rs b/crates/xline/src/server/command.rs index 8f9d76970..3dd9c25f3 100644 --- a/crates/xline/src/server/command.rs +++ b/crates/xline/src/server/command.rs @@ -19,9 +19,11 @@ use xlineapi::{ }; use crate::{ + revision_number::RevisionNumberGeneratorState, rpc::{RequestBackend, RequestWrapper}, storage::{ db::{WriteOp, DB}, + index::IndexOperate, storage_api::XlineStorageOps, AlarmStore, AuthStore, KvStore, LeaseStore, }, @@ -265,6 +267,73 @@ impl CommandExecutor { _ => Ok(()), } } + + /// After sync KV commands + async fn after_sync_kv( + &self, + wrapper: &RequestWrapper, + txn_db: &T, + index: &(dyn IndexOperate + Send + Sync), + revision_gen: &RevisionNumberGeneratorState<'_>, + to_execute: bool, + ) -> Result< + ( + ::ASR, + Option<::ER>, + ), + ExecuteError, + > + where + T: XlineStorageOps + TransactionApi, + { + let (asr, er) = self + .kv_storage + .after_sync(wrapper, txn_db, index, revision_gen, to_execute) + .await?; + Ok((asr, er)) + } + + async fn after_sync_others( + &self, + wrapper: &RequestWrapper, + txn_db: &T, + general_revision: &RevisionNumberGeneratorState<'_>, + auth_revision: &RevisionNumberGeneratorState<'_>, + to_execute: bool, + ) -> Result< + ( + ::ASR, + Option<::ER>, + ), + ExecuteError, + > + where + T: XlineStorageOps + TransactionApi, + { + let er = to_execute + .then(|| match wrapper.backend() { + RequestBackend::Auth => self.auth_storage.execute(wrapper), + RequestBackend::Lease => self.lease_storage.execute(wrapper), + RequestBackend::Alarm => Ok(self.alarm_storage.execute(wrapper)), + RequestBackend::Kv => unreachable!("Should not execute kv commands"), + }) + .transpose()?; + + let (asr, wr_ops) = match wrapper.backend() { + RequestBackend::Auth => self.auth_storage.after_sync(wrapper, &auth_revision)?, + RequestBackend::Lease => { + self.lease_storage + .after_sync(wrapper, &general_revision) + .await? + } + RequestBackend::Alarm => self.alarm_storage.after_sync(wrapper, &general_revision), + RequestBackend::Kv => unreachable!("Should not sync kv commands"), + }; + + txn_db.write_ops(wr_ops)?; + + Ok((asr, er)) + } } #[async_trait::async_trait] @@ -316,7 +385,7 @@ impl CurpCommandExecutor for CommandExecutor { .collect::>()?; let index = self.kv_storage.index(); - let mut index_state = index.state(); + let index_state = index.state(); let general_revision_gen = self.kv_storage.revision_gen(); let auth_revision_gen = self.auth_storage.revision_gen(); let general_revision_state = general_revision_gen.state(); @@ -328,40 +397,28 @@ impl CurpCommandExecutor for CommandExecutor { let mut resps = Vec::with_capacity(cmds.len()); for (cmd, to_execute) in cmds.into_iter().map(AfterSyncCmd::into_parts) { let wrapper = cmd.request(); - let er = to_execute - .then(|| match wrapper.backend() { - RequestBackend::Kv => self - .kv_storage - .execute(wrapper, Some((&txn_db, &mut index_state))), - RequestBackend::Auth => self.auth_storage.execute(wrapper), - RequestBackend::Lease => self.lease_storage.execute(wrapper), - RequestBackend::Alarm => Ok(self.alarm_storage.execute(wrapper)), - }) - .transpose()?; - tracing::info!("sync cmd: {cmd:?}"); - if to_execute { - tracing::info!("execute in after sync for: {cmd:?}"); - } - let (asr, wr_ops) = match wrapper.backend() { - RequestBackend::Kv => ( - self.kv_storage - .after_sync(wrapper, &txn_db, &index_state, &general_revision_state) - .await?, - vec![], - ), - RequestBackend::Auth => self - .auth_storage - .after_sync(wrapper, &auth_revision_state)?, - RequestBackend::Lease => { - self.lease_storage - .after_sync(wrapper, &general_revision_state) - .await? + let (asr, er) = match wrapper.backend() { + RequestBackend::Kv => { + self.after_sync_kv( + wrapper, + &txn_db, + &index_state, + &general_revision_state, + to_execute, + ) + .await } - RequestBackend::Alarm => self - .alarm_storage - .after_sync(wrapper, &general_revision_state), - }; - txn_db.write_ops(wr_ops)?; + RequestBackend::Auth | RequestBackend::Lease | RequestBackend::Alarm => { + self.after_sync_others( + wrapper, + &txn_db, + &general_revision_state, + &auth_revision_state, + to_execute, + ) + .await + } + }?; resps.push((asr, er)); if let RequestWrapper::CompactionRequest(ref compact_req) = *wrapper { diff --git a/crates/xline/src/server/watch_server.rs b/crates/xline/src/server/watch_server.rs index 9cf26c0dc..40a8d8629 100644 --- a/crates/xline/src/server/watch_server.rs +++ b/crates/xline/src/server/watch_server.rs @@ -457,6 +457,7 @@ mod test { &txn, &store.index().state(), &store.revision_gen().state(), + false, ) .await .unwrap(); diff --git a/crates/xline/src/storage/kv_store.rs b/crates/xline/src/storage/kv_store.rs index 2df372347..ea9e5f599 100644 --- a/crates/xline/src/storage/kv_store.rs +++ b/crates/xline/src/storage/kv_store.rs @@ -118,15 +118,18 @@ impl KvStoreInner { } /// Get `KeyValue` of a range with limit and count only, return kvs and total count - fn get_range_with_opts( - txn_db: &Transaction, + fn get_range_with_opts( + txn_db: &T, index: &dyn IndexOperate, key: &[u8], range_end: &[u8], revision: i64, limit: usize, count_only: bool, - ) -> Result<(Vec, usize), ExecuteError> { + ) -> Result<(Vec, usize), ExecuteError> + where + T: XlineStorageOps, + { let mut revisions = index.get(key, range_end, revision); let total = revisions.len(); if count_only || total == 0 { @@ -218,11 +221,12 @@ impl KvStore { txn_db: &T, index: &(dyn IndexOperate + Send + Sync), revision_gen: &RevisionNumberGeneratorState<'_>, - ) -> Result + to_execute: bool, + ) -> Result<(SyncResponse, Option), ExecuteError> where T: XlineStorageOps + TransactionApi, { - self.sync_request(request, txn_db, index, revision_gen) + self.sync_request(request, txn_db, index, revision_gen, to_execute) .await } @@ -603,12 +607,15 @@ impl KvStore { } /// Handle `RangeRequest` - fn execute_range( + fn execute_range( &self, - tnx_db: &Transaction, + tnx_db: &T, index: &dyn IndexOperate, req: &RangeRequest, - ) -> Result { + ) -> Result + where + T: XlineStorageOps, + { req.check_revision(self.compacted_revision(), self.revision())?; let storage_fetch_limit = if (req.sort_order() != SortOrder::None) @@ -662,12 +669,15 @@ impl KvStore { } /// Generates `PutResponse` - fn generate_put_resp( + fn generate_put_resp( &self, req: &PutRequest, - txn_db: &Transaction, + txn_db: &T, prev_rev: Option, - ) -> Result<(PutResponse, Option), ExecuteError> { + ) -> Result<(PutResponse, Option), ExecuteError> + where + T: XlineStorageOps, + { let response = PutResponse { header: Some(self.header_gen.gen_header()), ..Default::default() @@ -885,7 +895,8 @@ impl KvStore { txn_db: &T, index: &(dyn IndexOperate + Send + Sync), revision_gen: &RevisionNumberGeneratorState<'_>, - ) -> Result + to_execute: bool, + ) -> Result<(SyncResponse, Option), ExecuteError> where T: XlineStorageOps + TransactionApi, { @@ -895,33 +906,54 @@ impl KvStore { let next_revision = revision_gen.get().overflow_add(1); #[allow(clippy::wildcard_enum_match_arm)] - let events = match *wrapper { - RequestWrapper::RangeRequest(_) => { - vec![] + let (events, execute_response): (_, Option) = match *wrapper { + RequestWrapper::RangeRequest(ref req) => { + self.sync_range(txn_db, index, req, to_execute) } RequestWrapper::PutRequest(ref req) => { - self.sync_put(txn_db, index, req, next_revision, &mut 0)? + self.sync_put(txn_db, index, req, next_revision, &mut 0, to_execute) } RequestWrapper::DeleteRangeRequest(ref req) => { - self.sync_delete_range(txn_db, index, req, next_revision, &mut 0)? + self.sync_delete_range(txn_db, index, req, next_revision, &mut 0, to_execute) } RequestWrapper::TxnRequest(ref req) => { - self.sync_txn(txn_db, index, req, next_revision, &mut 0)? + self.sync_txn(txn_db, index, req, next_revision, &mut 0, to_execute) + } + RequestWrapper::CompactionRequest(ref req) => { + self.sync_compaction(req, to_execute).await } - RequestWrapper::CompactionRequest(ref req) => self.sync_compaction(req).await?, _ => unreachable!("Other request should not be sent to this store"), - }; + }?; - let response = if events.is_empty() { + let sync_response = if events.is_empty() { SyncResponse::new(revision_gen.get()) } else { self.notify_updates(next_revision, events).await; SyncResponse::new(revision_gen.next()) }; - tracing::warn!("sync response: {response:?}"); + tracing::warn!("sync response: {sync_response:?}"); - Ok(response) + Ok((sync_response, execute_response.map(CommandResponse::new))) + } + + /// Sync `RangeRequest` + fn sync_range( + &self, + txn_db: &T, + index: &dyn IndexOperate, + req: &RangeRequest, + to_execute: bool, + ) -> Result<(Vec, Option), ExecuteError> + where + T: XlineStorageOps, + { + Ok(( + vec![], + to_execute + .then(|| self.execute_range(txn_db, index, req).map(Into::into)) + .transpose()?, + )) } /// Handle `PutRequest` @@ -932,7 +964,8 @@ impl KvStore { req: &PutRequest, revision: i64, sub_revision: &mut i64, - ) -> Result, ExecuteError> + to_execute: bool, + ) -> Result<(Vec, Option), ExecuteError> where T: XlineStorageOps, { @@ -974,12 +1007,25 @@ impl KvStore { txn_db.write_op(WriteOp::PutKeyValue(new_rev.as_revision(), kv.clone()))?; *sub_revision = sub_revision.overflow_add(1); - Ok(vec![Event { + let events = vec![Event { #[allow(clippy::as_conversions)] // This cast is always valid r#type: EventType::Put as i32, kv: Some(kv), prev_kv: None, - }]) + }]; + + let execute_resp = to_execute + .then(|| { + self.generate_put_resp( + req, + txn_db, + prev_rev_opt.map(|key_rev| key_rev.as_revision()), + ) + .map(|(resp, _)| resp.into()) + }) + .transpose()?; + + Ok((events, execute_resp)) } /// Handle `DeleteRangeRequest` @@ -990,7 +1036,8 @@ impl KvStore { req: &DeleteRangeRequest, revision: i64, sub_revision: &mut i64, - ) -> Result, ExecuteError> + to_execute: bool, + ) -> Result<(Vec, Option), ExecuteError> where T: XlineStorageOps, { @@ -1005,7 +1052,12 @@ impl KvStore { Self::detach_leases(&keys, &self.lease_collection); - Ok(Self::new_deletion_events(revision, keys)) + let execute_resp = to_execute + .then(|| self.generate_delete_range_resp(req, txn_db, index)) + .transpose()? + .map(Into::into); + + Ok((Self::new_deletion_events(revision, keys), execute_resp)) } /// Handle `TxnRequest` @@ -1016,7 +1068,8 @@ impl KvStore { request: &TxnRequest, revision: i64, sub_revision: &mut i64, - ) -> Result, ExecuteError> + to_execute: bool, + ) -> Result<(Vec, Option), ExecuteError> where T: XlineStorageOps, { @@ -1032,30 +1085,47 @@ impl KvStore { request.failure.iter() }; - let events = requests + let (events, resps): (Vec<_>, Vec<_>) = requests .filter_map(|op| op.request.as_ref()) .map(|req| match *req { - Request::RequestRange(_) => Ok(vec![]), + Request::RequestRange(ref r) => self.sync_range(txn_db, index, r, to_execute), Request::RequestTxn(ref r) => { - self.sync_txn(txn_db, index, r, revision, sub_revision) + self.sync_txn(txn_db, index, r, revision, sub_revision, to_execute) } Request::RequestPut(ref r) => { - self.sync_put(txn_db, index, r, revision, sub_revision) + self.sync_put(txn_db, index, r, revision, sub_revision, to_execute) } Request::RequestDeleteRange(ref r) => { - self.sync_delete_range(txn_db, index, r, revision, sub_revision) + self.sync_delete_range(txn_db, index, r, revision, sub_revision, to_execute) } }) .collect::, _>>()? .into_iter() - .flatten() - .collect(); + .unzip(); + + let resp = to_execute.then(|| { + TxnResponse { + header: Some(self.header_gen.gen_header()), + succeeded: success, + responses: resps + .into_iter() + .map(Option::into_iter) + .flatten() + .map(Into::into) + .collect(), + } + .into() + }); - Ok(events) + Ok((events.into_iter().flatten().collect(), resp)) } /// Sync `CompactionRequest` and return if kvstore is changed - async fn sync_compaction(&self, req: &CompactionRequest) -> Result, ExecuteError> { + async fn sync_compaction( + &self, + req: &CompactionRequest, + to_execute: bool, + ) -> Result<(Vec, Option), ExecuteError> { let revision = req.revision; let ops = vec![WriteOp::PutScheduledCompactRevision(revision)]; // TODO: Remove the physical process logic here. It's better to move into the KvServer @@ -1074,7 +1144,13 @@ impl KvStore { } self.inner.db.write_ops(ops)?; - Ok(vec![]) + let resp = to_execute + .then(|| CompactionResponse { + header: Some(self.header_gen.gen_header()), + }) + .map(Into::into); + + Ok((vec![], resp)) } } @@ -1297,7 +1373,7 @@ mod test { let index_state = index.state(); let rev_gen_state = store.revision.state(); let _res = store - .after_sync(request, &txn_db, &index_state, &rev_gen_state) + .after_sync(request, &txn_db, &index_state, &rev_gen_state, false) .await?; index_state.commit(); rev_gen_state.commit(); diff --git a/crates/xline/src/storage/kvwatcher.rs b/crates/xline/src/storage/kvwatcher.rs index 5607512d1..ddcd2b9cd 100644 --- a/crates/xline/src/storage/kvwatcher.rs +++ b/crates/xline/src/storage/kvwatcher.rs @@ -759,7 +759,7 @@ mod test { let rev_gen = store.revision_gen(); let rev_gen_state = rev_gen.state(); store - .after_sync(&req, &txn, &index_state, &rev_gen_state) + .after_sync(&req, &txn, &index_state, &rev_gen_state, false) .await .unwrap(); index_state.commit();