diff --git a/crates/xline/src/server/command.rs b/crates/xline/src/server/command.rs index 6452f9930..a8b416be1 100644 --- a/crates/xline/src/server/command.rs +++ b/crates/xline/src/server/command.rs @@ -348,10 +348,7 @@ impl CurpCommandExecutor for CommandExecutor { } }; ops.append(&mut wr_ops); - let key_revisions = self.db.flush_ops(ops)?; - if !key_revisions.is_empty() { - self.kv_storage.insert_index(key_revisions); - } + let _key_revisions = self.db.flush_ops(ops)?; self.lease_storage.mark_lease_synced(wrapper); if !quota_enough { if let Some(alarmer) = self.alarmer.read().clone() { diff --git a/crates/xline/src/server/watch_server.rs b/crates/xline/src/server/watch_server.rs index 6b047ce97..3911409e0 100644 --- a/crates/xline/src/server/watch_server.rs +++ b/crates/xline/src/server/watch_server.rs @@ -456,8 +456,7 @@ mod test { ..Default::default() }); let (_sync_res, ops) = store.after_sync(&req, revision).await.unwrap(); - let key_revisions = db.flush_ops(ops).unwrap(); - store.insert_index(key_revisions); + let _key_revisions = db.flush_ops(ops).unwrap(); } #[tokio::test] diff --git a/crates/xline/src/storage/compact/mod.rs b/crates/xline/src/storage/compact/mod.rs index ec0b063a8..fcf183e4b 100644 --- a/crates/xline/src/storage/compact/mod.rs +++ b/crates/xline/src/storage/compact/mod.rs @@ -12,10 +12,7 @@ use utils::{ }; use xlineapi::{command::Command, execute_error::ExecuteError, RequestWrapper}; -use super::{ - index::{Index, IndexOperate}, - KvStore, -}; +use super::{index::Index, KvStore}; use crate::{revision_number::RevisionNumberGenerator, rpc::CompactionRequest}; /// mod revision compactor; diff --git a/crates/xline/src/storage/index.rs b/crates/xline/src/storage/index.rs index e6d903abd..27bc8138e 100644 --- a/crates/xline/src/storage/index.rs +++ b/crates/xline/src/storage/index.rs @@ -1,15 +1,50 @@ -use std::collections::HashSet; +#![allow(clippy::multiple_inherent_impl)] +#![allow(unused)] // Remove this when `IndexState` is used in xline + +use std::collections::{btree_map, BTreeMap, HashSet}; use clippy_utilities::OverflowArithmetic; -use crossbeam_skiplist::SkipMap; +use crossbeam_skiplist::{map::Entry, SkipMap}; use itertools::Itertools; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use utils::parking_lot_lock::RwLockMap; use xlineapi::command::KeyRange; use super::revision::{KeyRevision, Revision}; use crate::server::command::RangeType; +/// Operations for `Index` +pub(crate) trait IndexOperate { + /// Get `Revision` of keys, get the latest `Revision` when revision <= 0 + fn get(&self, key: &[u8], range_end: &[u8], revision: i64) -> Vec; + + /// Register a new `KeyRevision` of the given key + /// + /// Returns a new `KeyRevision` and previous `KeyRevision` of the key + fn register_revision( + &self, + key: Vec, + revision: i64, + sub_revision: i64, + ) -> (KeyRevision, Option); + + /// Gets the latest revision of the key + fn current_rev(&self, key: &[u8]) -> Option; + + /// Insert or update `KeyRevision` + fn insert(&self, key_revisions: Vec<(Vec, KeyRevision)>); + + /// Mark keys as deleted and return latest revision before deletion and deletion revision + /// return all revision pairs and all keys in range + fn delete( + &self, + key: &[u8], + range_end: &[u8], + revision: i64, + sub_revision: i64, + ) -> (Vec<(Revision, Revision)>, Vec>); +} + /// Keys to revisions mapping #[derive(Debug)] pub(crate) struct Index { @@ -25,7 +60,15 @@ impl Index { } } - /// Filter out `KeyRevision` that is less than one revision and convert to `Revision` + /// Creates a `IndexState` + pub(crate) fn state(&self) -> IndexState<'_> { + IndexState { + index_ref: self, + state: Mutex::default(), + } + } + + /// Filter out `KeyRevision` that is greater than or equal to the given revision and convert to `Revision` fn filter_revision(revs: &[KeyRevision], revision: i64) -> Vec { revs.iter() .filter(|rev| rev.mod_revision >= revision) @@ -53,18 +96,6 @@ impl Index { .map(KeyRevision::as_revision) } - /// Insert `KeyRevision` of deleted and generate `Revision` pair of deleted - fn gen_del_revision( - revs: &mut Vec, - revision: i64, - sub_revision: i64, - ) -> Option<(Revision, Revision)> { - let last_available_rev = Self::get_revision(revs, 0)?; - let del_rev = KeyRevision::new_deletion(revision, sub_revision); - revs.push(del_rev); - Some((last_available_rev, del_rev.as_revision())) - } - /// Get all revisions that need to be kept after compact at the given revision pub(crate) fn keep(&self, at_rev: i64) -> HashSet { let mut revs = HashSet::new(); @@ -88,116 +119,224 @@ impl Index { }); revs } -} -/// Operations of Index -pub(super) trait IndexOperate { - /// Get `Revision` of keys, get the latest `Revision` when revision <= 0 - fn get(&self, key: &[u8], range_end: &[u8], revision: i64) -> Vec; + /// Insert `KeyRevision` of deleted and generate `Revision` pair of deleted + fn gen_del_revision( + revs: &mut Vec, + revision: i64, + sub_revision: i64, + ) -> Option<(Revision, Revision)> { + let last_available_rev = Self::get_revision(revs, 0)?; + let del_rev = KeyRevision::new_deletion(revision, sub_revision); + revs.push(del_rev); + Some((last_available_rev, del_rev.as_revision())) + } +} +impl Index { /// Get `Revision` of keys from one revision - fn get_from_rev(&self, key: &[u8], range_end: &[u8], revision: i64) -> Vec; - - /// Mark keys as deleted and return latest revision before deletion and deletion revision - /// return all revision pairs and all keys in range - fn delete( + pub(super) fn get_from_rev( &self, key: &[u8], range_end: &[u8], revision: i64, - sub_revision: i64, - ) -> (Vec<(Revision, Revision)>, Vec>); - - /// Insert or update `KeyRevision` - fn insert(&self, key_revisions: Vec<(Vec, KeyRevision)>); - - /// Register a new `KeyRevision` of the given key - fn register_revision(&self, key: &[u8], revision: i64, sub_revision: i64) -> KeyRevision; - - /// Restore `KeyRevision` of a key - fn restore( - &self, - key: Vec, - revision: i64, - sub_revision: i64, - create_revision: i64, - version: i64, - ); - - /// Compact a `KeyRevision` by removing the versions with smaller or equal - /// revision than the given atRev except the largest one (If the largest one is - /// a tombstone, it will not be kept). - fn compact(&self, at_rev: i64) -> Vec; -} - -impl IndexOperate for Index { - fn get(&self, key: &[u8], range_end: &[u8], revision: i64) -> Vec { + ) -> Vec { match RangeType::get_range_type(key, range_end) { RangeType::OneKey => self .inner .get(key) - .and_then(|entry| { + .map(|entry| { entry .value() - .map_read(|revs| Self::get_revision(revs.as_ref(), revision)) + .map_read(|revs| Self::filter_revision(revs.as_ref(), revision)) }) - .map(|rev| vec![rev]) .unwrap_or_default(), RangeType::AllKeys => self .inner .iter() - .filter_map(|entry| { + .flat_map(|entry| { entry .value() - .map_read(|revs| Self::get_revision(revs.as_ref(), revision)) + .map_read(|revs| Self::filter_revision(revs.as_ref(), revision)) }) + .sorted() .collect(), RangeType::Range => self .inner .range(KeyRange::new(key, range_end)) - .filter_map(|entry| { + .flat_map(|entry| { entry .value() - .map_read(|revs| Self::get_revision(revs.as_ref(), revision)) + .map_read(|revs| Self::filter_revision(revs.as_ref(), revision)) }) + .sorted() .collect(), } } - fn get_from_rev(&self, key: &[u8], range_end: &[u8], revision: i64) -> Vec { + /// Restore `KeyRevision` of a key + pub(super) fn restore( + &self, + key: Vec, + revision: i64, + sub_revision: i64, + create_revision: i64, + version: i64, + ) { + self.inner + .get_or_insert(key, RwLock::new(Vec::new())) + .value() + .map_write(|mut revisions| { + revisions.push(KeyRevision::new( + create_revision, + version, + revision, + sub_revision, + )); + }); + } + /// Compact a `KeyRevision` by removing all versions smaller than or equal to the + /// given atRev, except for the largest one. Note that if the largest version + /// is a tombstone, it will also be removed. + pub(super) fn compact(&self, at_rev: i64) -> Vec { + let mut revs = Vec::new(); + let mut del_keys = Vec::new(); + + self.inner.iter().for_each(|entry| { + entry.value().map_write(|mut revisions| { + if let Some(revision) = revisions.first() { + if revision.mod_revision < at_rev { + let pivot = revisions.partition_point(|rev| rev.mod_revision <= at_rev); + let compacted_last_idx = pivot.overflow_sub(1); + // There is at least 1 element in the first partition, so the key revision at `compacted_last_idx` + // must exist. + let key_rev = revisions.get(compacted_last_idx).unwrap_or_else(|| { + unreachable!( + "Oops, the key revision at {compacted_last_idx} should not be None", + ) + }); + let compact_revs = if key_rev.is_deleted() { + revisions.drain(..=compacted_last_idx) + } else { + revisions.drain(..compacted_last_idx) + }; + revs.extend(compact_revs); + + if revisions.is_empty() { + del_keys.push(entry.key().clone()); + } + } + } + }); + }); + for key in del_keys { + let _ignore = self.inner.remove(&key); + } + revs + } +} + +/// Maps a closure to an entry +fn fmap_entry(mut op: F) -> impl FnMut(Entry, RwLock>>) -> R +where + F: FnMut((&[u8], &[KeyRevision])) -> R, +{ + move |entry: Entry, RwLock>>| { + entry.value().map_read(|revs| op((entry.key(), &revs))) + } +} + +/// Maps a closure to an entry value +fn fmap_value(mut op: F) -> impl FnMut(Entry, RwLock>>) -> R +where + F: FnMut(&[KeyRevision]) -> R, +{ + move |entry: Entry, RwLock>>| entry.value().map_read(|revs| op(&revs)) +} + +/// Mutably maps a closure to an entry value +fn fmap_value_mut(mut op: F) -> impl FnMut(Entry, RwLock>>) -> R +where + F: FnMut(&mut Vec) -> R, +{ + move |entry: Entry, RwLock>>| { + entry.value().map_write(|mut revs| op(&mut revs)) + } +} + +impl IndexOperate for Index { + fn get(&self, key: &[u8], range_end: &[u8], revision: i64) -> Vec { match RangeType::get_range_type(key, range_end) { RangeType::OneKey => self .inner .get(key) - .map(|entry| { - entry - .value() - .map_read(|revs| Self::filter_revision(revs.as_ref(), revision)) - }) + .and_then(fmap_value(|revs| Index::get_revision(revs, revision))) + .map(|rev| vec![rev]) .unwrap_or_default(), RangeType::AllKeys => self .inner .iter() - .flat_map(|entry| { - entry - .value() - .map_read(|revs| Self::filter_revision(revs.as_ref(), revision)) - }) - .sorted() + .filter_map(fmap_value(|revs| Index::get_revision(revs, revision))) .collect(), RangeType::Range => self .inner .range(KeyRange::new(key, range_end)) - .flat_map(|entry| { - entry - .value() - .map_read(|revs| Self::filter_revision(revs.as_ref(), revision)) - }) - .sorted() + .filter_map(fmap_value(|revs| Index::get_revision(revs, revision))) .collect(), } } + fn register_revision( + &self, + key: Vec, + revision: i64, + sub_revision: i64, + ) -> (KeyRevision, Option) { + self.inner.get(&key).map_or_else( + || { + let new_rev = KeyRevision::new(revision, 1, revision, sub_revision); + let _ignore = self.inner.insert(key, RwLock::new(vec![new_rev])); + (new_rev, None) + }, + fmap_value_mut(|revisions| { + let last = *revisions + .last() + .unwrap_or_else(|| unreachable!("empty revision list")); + let new_rev = if last.is_deleted() { + KeyRevision::new(revision, 1, revision, sub_revision) + } else { + KeyRevision::new( + last.create_revision, + last.version.overflow_add(1), + revision, + sub_revision, + ) + }; + revisions.push(new_rev); + (new_rev, Some(last)) + }), + ) + } + + fn current_rev(&self, key: &[u8]) -> Option { + self.inner + .get(key) + .and_then(fmap_value(|revs| revs.last().copied())) + } + + fn insert(&self, key_revisions: Vec<(Vec, KeyRevision)>) { + for (key, revision) in key_revisions { + self.inner.get(&key).map_or_else( + || { + let _ignore = self.inner.insert(key, RwLock::new(vec![revision])); + }, + fmap_value_mut(|revs| { + revs.push(revision); + }), + ); + } + } + fn delete( &self, key: &[u8], @@ -211,11 +350,9 @@ impl IndexOperate for Index { .inner .get(key) .into_iter() - .filter_map(|entry| { - entry.value().map_write(|mut revs| { - Self::gen_del_revision(revs.as_mut(), revision, sub_revision) - }) - }) + .filter_map(fmap_value_mut(|revs| { + Self::gen_del_revision(revs, revision, sub_revision) + })) .collect(); let keys = if pairs.is_empty() { vec![] @@ -230,12 +367,8 @@ impl IndexOperate for Index { .zip(0..) .filter_map(|(entry, i)| { entry.value().map_write(|mut revs| { - Self::gen_del_revision( - revs.as_mut(), - revision, - sub_revision.overflow_add(i), - ) - .map(|pair| (pair, entry.key().clone())) + Self::gen_del_revision(&mut revs, revision, sub_revision.overflow_add(i)) + .map(|pair| (pair, entry.key().clone())) }) }) .unzip(), @@ -245,108 +378,250 @@ impl IndexOperate for Index { .zip(0..) .filter_map(|(entry, i)| { entry.value().map_write(|mut revs| { - Self::gen_del_revision( - revs.as_mut(), - revision, - sub_revision.overflow_add(i), - ) - .map(|pair| (pair, entry.key().clone())) + Self::gen_del_revision(&mut revs, revision, sub_revision.overflow_add(i)) + .map(|pair| (pair, entry.key().clone())) }) }) .unzip(), }; (pairs, keys) } +} - fn insert(&self, key_revisions: Vec<(Vec, KeyRevision)>) { - for (key, revision) in key_revisions { - if let Some(entry) = self.inner.get::<[u8]>(key.as_ref()) { - entry.value().map_write(|mut revs| revs.push(revision)); - } else { - _ = self.inner.insert(key, RwLock::new(vec![revision])); - } +/// A index with extra state, it won't mutate the index directly before commit +#[derive(Debug)] +pub(crate) struct IndexState<'a> { + /// Inner struct of `Index` + index_ref: &'a Index, + /// State current modification + state: Mutex, Vec>>, +} + +impl IndexState<'_> { + /// Commits all changes + pub(crate) fn commit(self) { + let index = &self.index_ref.inner; + while let Some((key, state_revs)) = self.state.lock().pop_first() { + let entry = index.get_or_insert(key, RwLock::default()); + fmap_value_mut(|revs| { + revs.extend_from_slice(&state_revs); + })(entry); } } - fn register_revision(&self, key: &[u8], revision: i64, sub_revision: i64) -> KeyRevision { - if let Some(entry) = self.inner.get(key) { - entry.value().map_read(|revisions| { - if let Some(rev) = revisions.last() { - if rev.is_deleted() { - KeyRevision::new(revision, 1, revision, sub_revision) - } else { - KeyRevision::new( - rev.create_revision, - rev.version.overflow_add(1), - revision, - sub_revision, - ) - } - } else { - panic!("Get empty revision list for key {key:?}"); - } - }) - } else { - KeyRevision::new(revision, 1, revision, sub_revision) + /// Discards all changes + pub(crate) fn discard(&self) { + self.state.lock().clear(); + } + + /// Gets the revisions for a single key + fn one_key_revisions( + &self, + key: &[u8], + state: &BTreeMap, Vec>, + ) -> Vec { + let index = &self.index_ref.inner; + let mut result = index + .get(key) + .map(fmap_value(<[KeyRevision]>::to_vec)) + .unwrap_or_default(); + if let Some(revs) = state.get(key) { + result.extend_from_slice(revs); } + result } - fn restore( + /// Gets the revisions for a range of keys + fn range_key_revisions(&self, range: KeyRange) -> BTreeMap, Vec> { + let mut map: BTreeMap, Vec> = BTreeMap::new(); + let index = &self.index_ref.inner; + let state = self.state.lock(); + for (key, value) in index + .range(range.clone()) + .map(fmap_entry(|(k, v)| (k.to_vec(), v.to_vec()))) + .chain(state.range(range).map(|(k, v)| (k.clone(), v.clone()))) + { + let entry = map.entry(key.clone()).or_default(); + entry.extend(value); + } + map + } + + /// Gets the revisions for all keys + fn all_key_revisions(&self) -> BTreeMap, Vec> { + let mut map: BTreeMap, Vec> = BTreeMap::new(); + let index = &self.index_ref.inner; + let state = self.state.lock(); + for (key, value) in index + .iter() + .map(fmap_entry(|(k, v)| (k.to_vec(), v.to_vec()))) + .chain(state.clone().into_iter()) + { + let entry = map.entry(key.clone()).or_default(); + entry.extend(value.clone()); + } + map + } + + /// Deletes one key + fn delete_one( + &self, + key: &[u8], + revision: i64, + sub_revision: i64, + ) -> Option<((Revision, Revision), Vec)> { + let mut state = self.state.lock(); + let revs = self.one_key_revisions(key, &state); + let last_available_rev = Index::get_revision(&revs, 0)?; + let entry = state.entry(key.to_vec()).or_default(); + let del_rev = KeyRevision::new_deletion(revision, sub_revision); + entry.push(del_rev); + let pair = (last_available_rev, del_rev.as_revision()); + + Some((pair, key.to_vec())) + } + + /// Deletes a range of keys + fn delete_range( + &self, + range: KeyRange, + revision: i64, + sub_revision: i64, + ) -> (Vec<(Revision, Revision)>, Vec>) { + self.range_key_revisions(range) + .into_keys() + .zip(0..) + .filter_map(|(key, i)| self.delete_one(&key, revision, sub_revision.overflow_add(i))) + .unzip() + } + + /// Deletes all keys + fn delete_all( + &self, + revision: i64, + sub_revision: i64, + ) -> (Vec<(Revision, Revision)>, Vec>) { + self.all_key_revisions() + .into_keys() + .zip(0..) + .filter_map(|(key, i)| self.delete_one(&key, revision, sub_revision.overflow_add(i))) + .unzip() + } + + /// Reads an entry + #[allow(clippy::needless_pass_by_value)] // it's intended to consume the entry + fn entry_read(entry: Entry, RwLock>>) -> (Vec, Vec) { + (entry.key().clone(), entry.value().read().clone()) + } +} + +impl IndexOperate for IndexState<'_> { + fn get(&self, key: &[u8], range_end: &[u8], revision: i64) -> Vec { + match RangeType::get_range_type(key, range_end) { + RangeType::OneKey => { + Index::get_revision(&self.one_key_revisions(key, &self.state.lock()), revision) + .map(|rev| vec![rev]) + .unwrap_or_default() + } + RangeType::AllKeys => self + .all_key_revisions() + .into_iter() + .filter_map(|(_, revs)| Index::get_revision(revs.as_ref(), revision)) + .collect(), + RangeType::Range => self + .range_key_revisions(KeyRange::new(key, range_end)) + .into_iter() + .filter_map(|(_, revs)| Index::get_revision(revs.as_ref(), revision)) + .collect(), + } + } + + fn register_revision( &self, key: Vec, revision: i64, sub_revision: i64, - create_revision: i64, - version: i64, - ) { - self.inner - .get_or_insert(key, RwLock::new(Vec::new())) - .value() - .map_write(|mut revisions| { - revisions.push(KeyRevision::new( - create_revision, - version, + ) -> (KeyRevision, Option) { + let index = &self.index_ref.inner; + let mut state = self.state.lock(); + + let next_rev = |revisions: &[KeyRevision]| { + let last = *revisions + .last() + .unwrap_or_else(|| unreachable!("empty revision list")); + let new_rev = if last.is_deleted() { + KeyRevision::new(revision, 1, revision, sub_revision) + } else { + KeyRevision::new( + last.create_revision, + last.version.overflow_add(1), revision, sub_revision, - )); - }); + ) + }; + (new_rev, Some(last)) + }; + + match (index.get(&key), state.entry(key)) { + (None, btree_map::Entry::Vacant(e)) => { + let new_rev = KeyRevision::new(revision, 1, revision, sub_revision); + let _ignore = e.insert(vec![new_rev]); + (new_rev, None) + } + (None | Some(_), btree_map::Entry::Occupied(mut e)) => { + let (new_rev, last) = next_rev(e.get_mut()); + e.get_mut().push(new_rev); + (new_rev, last) + } + (Some(e), btree_map::Entry::Vacant(se)) => { + let (new_rev, last) = fmap_value(next_rev)(e); + let _ignore = se.insert(vec![new_rev]); + (new_rev, last) + } + } } - fn compact(&self, at_rev: i64) -> Vec { - let mut revs = Vec::new(); - let mut del_keys = Vec::new(); + fn current_rev(&self, key: &[u8]) -> Option { + let index = &self.index_ref.inner; + let state = self.state.lock(); - self.inner.iter().for_each(|entry| { - entry.value().map_write(|mut revisions| { - if let Some(revision) = revisions.first() { - if revision.mod_revision < at_rev { - let pivot = revisions.partition_point(|rev| rev.mod_revision <= at_rev); - let compacted_last_idx = pivot.overflow_sub(1); - // There is at least 1 element in the first partition, so the key revision at `compacted_last_idx` - // must exist. - let key_rev = revisions.get(compacted_last_idx).unwrap_or_else(|| { - unreachable!( - "Oops, the key revision at {compacted_last_idx} should not be None", - ) - }); - let compact_revs = if key_rev.is_deleted() { - revisions.drain(..=compacted_last_idx) - } else { - revisions.drain(..compacted_last_idx) - }; - revs.extend(compact_revs); + match (index.get(key), state.get(key)) { + (None, None) => None, + (None | Some(_), Some(revs)) => revs.last().copied(), + (Some(e), None) => fmap_value(|revs| revs.last().copied())(e), + } + } - if revisions.is_empty() { - del_keys.push(entry.key().clone()); - } - } - } - }); - }); - for key in del_keys { - let _ignore = self.inner.remove(&key); + fn insert(&self, key_revisions: Vec<(Vec, KeyRevision)>) { + let mut state = self.state.lock(); + for (key, revision) in key_revisions { + if let Some(revs) = state.get_mut::<[u8]>(key.as_ref()) { + revs.push(revision); + } else { + _ = state.insert(key, vec![revision]); + } } - revs + } + + fn delete( + &self, + key: &[u8], + range_end: &[u8], + revision: i64, + sub_revision: i64, + ) -> (Vec<(Revision, Revision)>, Vec>) { + let (pairs, keys) = match RangeType::get_range_type(key, range_end) { + RangeType::OneKey => self + .delete_one(key, revision, sub_revision) + .into_iter() + .unzip(), + RangeType::AllKeys => self.delete_all(revision, sub_revision), + RangeType::Range => { + self.delete_range(KeyRange::new(key, range_end), revision, sub_revision) + } + }; + + (pairs, keys) } } @@ -358,30 +633,23 @@ mod test { index .inner .get(key.as_ref()) - .expect("index entry should not be None") - .value() - .map_read(|revs| assert_eq!(*revs, expected_values)); + .map(fmap_value(|revs| assert_eq!(revs, expected_values))) + .expect("index entry should not be None"); } fn init_and_test_insert() -> Index { let index = Index::new(); - - index.insert(vec![ - (b"key".to_vec(), index.register_revision(b"key", 1, 3)), - (b"foo".to_vec(), index.register_revision(b"foo", 4, 5)), - (b"bar".to_vec(), index.register_revision(b"bar", 5, 4)), - ]); - - index.insert(vec![ - (b"key".to_vec(), index.register_revision(b"key", 2, 2)), - (b"foo".to_vec(), index.register_revision(b"foo", 6, 6)), - (b"bar".to_vec(), index.register_revision(b"bar", 7, 7)), - ]); - index.insert(vec![ - (b"key".to_vec(), index.register_revision(b"key", 3, 1)), - (b"foo".to_vec(), index.register_revision(b"foo", 8, 8)), - (b"bar".to_vec(), index.register_revision(b"bar", 9, 9)), - ]); + let mut txn = index.state(); + txn.register_revision(b"key".to_vec(), 1, 3); + txn.register_revision(b"foo".to_vec(), 4, 5); + txn.register_revision(b"bar".to_vec(), 5, 4); + txn.register_revision(b"key".to_vec(), 2, 2); + txn.register_revision(b"foo".to_vec(), 6, 6); + txn.register_revision(b"bar".to_vec(), 7, 7); + txn.register_revision(b"key".to_vec(), 3, 1); + txn.register_revision(b"foo".to_vec(), 8, 8); + txn.register_revision(b"bar".to_vec(), 9, 9); + txn.commit(); match_values( &index, @@ -419,8 +687,10 @@ mod test { #[test] fn test_get() { let index = init_and_test_insert(); - assert_eq!(index.get(b"key", b"", 0), vec![Revision::new(3, 1)]); - assert_eq!(index.get(b"key", b"", 1), vec![Revision::new(1, 3)]); + let txn = index.state(); + assert_eq!(txn.get(b"key", b"", 0), vec![Revision::new(3, 1)]); + assert_eq!(txn.get(b"key", b"", 1), vec![Revision::new(1, 3)]); + txn.commit(); assert_eq!( index.get_from_rev(b"key", b"", 2), vec![Revision::new(2, 2), Revision::new(3, 1)] @@ -453,9 +723,10 @@ mod test { #[test] fn test_delete() { let index = init_and_test_insert(); + let mut txn = index.state(); assert_eq!( - index.delete(b"key", b"", 10, 0), + txn.delete(b"key", b"", 10, 0), ( vec![(Revision::new(3, 1), Revision::new(10, 0))], vec![b"key".to_vec()] @@ -463,7 +734,7 @@ mod test { ); assert_eq!( - index.delete(b"a", b"g", 11, 0), + txn.delete(b"a", b"g", 11, 0), ( vec![ (Revision::new(9, 9), Revision::new(11, 0)), @@ -473,7 +744,10 @@ mod test { ) ); - assert_eq!(index.delete(b"\0", b"\0", 12, 0), (vec![], vec![])); + assert_eq!(txn.delete(b"\0", b"\0", 12, 0), (vec![], vec![])); + + txn.commit(); + match_values( &index, b"key", @@ -552,11 +826,11 @@ mod test { #[test] fn test_compact_with_deletion() { let index = init_and_test_insert(); - index.delete(b"a", b"g", 10, 0); - index.insert(vec![( - b"bar".to_vec(), - index.register_revision(b"bar", 11, 0), - )]); + let mut txn = index.state(); + + txn.delete(b"a", b"g", 10, 0); + txn.register_revision(b"bar".to_vec(), 11, 0); + txn.commit(); let res = index.compact(10); diff --git a/crates/xline/src/storage/kv_store.rs b/crates/xline/src/storage/kv_store.rs index b07a2276d..a7a044f10 100644 --- a/crates/xline/src/storage/kv_store.rs +++ b/crates/xline/src/storage/kv_store.rs @@ -23,7 +23,7 @@ use super::{ db::{DB, SCHEDULED_COMPACT_REVISION}, index::{Index, IndexOperate}, lease_store::LeaseCollection, - revision::{KeyRevision, Revision}, + revision::Revision, }; use crate::{ header_gen::HeaderGenerator, @@ -777,10 +777,10 @@ impl KvStore { sub_revision: i64, ) -> Result<(Vec, Vec), ExecuteError> { let mut ops = Vec::new(); - let new_rev = self - .inner - .index - .register_revision(&req.key, revision, sub_revision); + let (new_rev, prev_rev) = + self.inner + .index + .register_revision(req.key.clone(), revision, sub_revision); let mut kv = KeyValue { key: req.key.clone(), value: req.value.clone(), @@ -790,7 +790,8 @@ impl KvStore { lease: req.lease, }; if req.ignore_lease || req.ignore_value { - let prev_kv = self.inner.get_range(&req.key, &[], 0)?.pop(); + let pre_mod_rev = prev_rev.ok_or(ExecuteError::KeyNotFound)?.mod_revision; + let prev_kv = self.inner.get_range(&req.key, &[], pre_mod_rev)?.pop(); let prev = prev_kv.as_ref().ok_or(ExecuteError::KeyNotFound)?; if req.ignore_lease { kv.lease = prev.lease; @@ -896,12 +897,6 @@ impl KvStore { let events = Self::new_deletion_events(revision, keys); (ops, events) } - - /// Insert the given pairs (key, `KeyRevision`) into the index - #[inline] - pub(crate) fn insert_index(&self, key_revisions: Vec<(Vec, KeyRevision)>) { - self.inner.index.insert(key_revisions); - } } #[cfg(test)] @@ -1023,8 +1018,7 @@ mod test { revision: i64, ) -> Result<(), ExecuteError> { let (_sync_res, ops) = store.after_sync(request, revision).await?; - let key_revs = store.inner.db.flush_ops(ops)?; - store.insert_index(key_revs); + let _key_revs = store.inner.db.flush_ops(ops)?; Ok(()) } diff --git a/crates/xline/src/storage/kvwatcher.rs b/crates/xline/src/storage/kvwatcher.rs index 496abfd1a..b26d60cb0 100644 --- a/crates/xline/src/storage/kvwatcher.rs +++ b/crates/xline/src/storage/kvwatcher.rs @@ -774,7 +774,6 @@ mod test { ..Default::default() }); let (_sync_res, ops) = store.after_sync(&req, revision).await.unwrap(); - let key_revisions = db.flush_ops(ops).unwrap(); - store.insert_index(key_revisions); + let _key_revisions = db.flush_ops(ops).unwrap(); } }