diff --git a/Cargo.toml b/Cargo.toml index e88d0850..4e18031a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ snap = "1" loom = { version = "0.5.1", optional = true } [dev-dependencies] -env_logger = "0.9.0" +env_logger = "0.10.0" fdlimit = "0.2.1" rand = { version = "0.8.2", features = ["small_rng"] } tempfile = "3.2" diff --git a/fuzz/src/lib.rs b/fuzz/src/lib.rs index 9be09679..1211311d 100644 --- a/fuzz/src/lib.rs +++ b/fuzz/src/lib.rs @@ -226,7 +226,7 @@ pub trait DbSimulator { }; Self::reset_model_from_database(&db.db, &mut layers, &old_layers); }, - Action::IterPrev => + Action::IterPrev => { if let Some(iter) = &mut db.iter { let mut old_key = if let Some(old_key) = db.iter_current_key.take() { old_key @@ -250,15 +250,14 @@ pub trait DbSimulator { old_key, expected ); - assert!(expected.contains(&new_key_value), "Prev lookup on iterator with old position {:?}, expecting one of {:?}, found {:?}", - old_key, - expected, new_key_value); + assert!(expected.contains(&new_key_value), "{}", "Prev lookup on iterator with old position {old_key:?}, expecting one of {expected:?}, found {new_key_value:?}"); db.iter_current_key = Some( new_key_value .map_or(IterPosition::Start, |(k, _)| IterPosition::Value(k[0])), ); - }, - Action::IterNext => + } + }, + Action::IterNext => { if let Some(iter) = &mut db.iter { let mut old_key = if let Some(old_key) = db.iter_current_key.take() { old_key @@ -282,12 +281,13 @@ pub trait DbSimulator { old_key, expected ); - assert!(expected.contains(&new_key_value), "Next lookup on iterator with old position {:?}, expecting one of {:?}, found {:?}", old_key, expected, new_key_value); + assert!(expected.contains(&new_key_value), "{}", "Next lookup on iterator with old position {old_key:?}, expecting one of {expected:?}, found {new_key_value:?}"); db.iter_current_key = Some( new_key_value .map_or(IterPosition::End, |(k, _)| IterPosition::Value(k[0])), ); - }, + } + }, } retry_operation(|| Self::check_db_and_model_are_equals(&db.db, &layers)).unwrap(); } diff --git a/src/btree/btree.rs b/src/btree/btree.rs index 8ec3f61c..01336d2f 100644 --- a/src/btree/btree.rs +++ b/src/btree/btree.rs @@ -7,6 +7,7 @@ use super::*; use crate::{ btree::BTreeTable, column::Column, + db::{RcKey, RcValue}, error::Result, log::{LogQuery, LogWriter}, table::key::TableKeyQuery, @@ -35,7 +36,7 @@ impl BTree { pub fn write_sorted_changes( &mut self, - mut changes: &[Operation, Vec>], + mut changes: &[Operation], btree: TablesRef, log: &mut LogWriter, ) -> Result<()> { diff --git a/src/btree/iter.rs b/src/btree/iter.rs index 606f4028..a14a4373 100644 --- a/src/btree/iter.rs +++ b/src/btree/iter.rs @@ -146,8 +146,8 @@ impl<'a> BTreeIterator<'a> { self.next_backend(record_id, self.table, &*log, direction)? }; let result = match (next_commit_overlay, next_backend) { - (Some((commit_key, commit_value)), Some((backend_key, backend_value))) => - match (direction, commit_key.cmp(&backend_key)) { + (Some((commit_key, commit_value)), Some((backend_key, backend_value))) => { + match (direction, commit_key.value().cmp(&backend_key)) { (IterDirection::Backward, std::cmp::Ordering::Greater) | (IterDirection::Forward, std::cmp::Ordering::Less) => { self.pending_backend = Some(PendingBackend { @@ -155,9 +155,9 @@ impl<'a> BTreeIterator<'a> { direction, }); if let Some(value) = commit_value { - Some((commit_key, value)) + Some((commit_key.value().clone(), value.value().clone())) } else { - self.last_key = LastKey::At(commit_key); + self.last_key = LastKey::At(commit_key.value().clone()); continue } }, @@ -165,19 +165,20 @@ impl<'a> BTreeIterator<'a> { (IterDirection::Forward, std::cmp::Ordering::Greater) => Some((backend_key, backend_value)), (_, std::cmp::Ordering::Equal) => if let Some(value) = commit_value { - Some((backend_key, value)) + Some((backend_key, value.value().clone())) } else { - self.last_key = LastKey::At(commit_key); + self.last_key = LastKey::At(commit_key.value().clone()); continue }, - }, + } + }, (Some((commit_key, Some(commit_value))), None) => { self.pending_backend = Some(PendingBackend { next_item: None, direction }); - Some((commit_key, commit_value)) + Some((commit_key.value().clone(), commit_value.value().clone())) }, (Some((k, None)), None) => { self.pending_backend = Some(PendingBackend { next_item: None, direction }); - self.last_key = LastKey::At(k); + self.last_key = LastKey::At(k.value().clone()); continue }, (None, Some((backend_key, backend_value))) => Some((backend_key, backend_value)), @@ -328,29 +329,32 @@ impl BTreeIterState { (IterDirection::Forward, LastIndex::At(sep)) if is_leaf => LastIndex::At(*sep + 1), (IterDirection::Forward, LastIndex::At(sep)) => LastIndex::Descend(*sep + 1), - (IterDirection::Forward, LastIndex::Before(sep)) if *sep == ORDER => + (IterDirection::Forward, LastIndex::Before(sep)) if *sep == ORDER => { if self.exit(direction) { break } else { continue - }, + } + }, (IterDirection::Forward, LastIndex::Before(sep)) => LastIndex::At(*sep), - (IterDirection::Backward, LastIndex::At(sep)) if is_leaf && *sep == 0 => + (IterDirection::Backward, LastIndex::At(sep)) if is_leaf && *sep == 0 => { if self.exit(direction) { break } else { continue - }, + } + }, (IterDirection::Backward, LastIndex::At(sep)) if is_leaf => LastIndex::At(*sep - 1), (IterDirection::Backward, LastIndex::At(sep)) => LastIndex::Descend(*sep), - (IterDirection::Backward, LastIndex::Before(sep)) if *sep == 0 => + (IterDirection::Backward, LastIndex::Before(sep)) if *sep == 0 => { if self.exit(direction) { break } else { continue - }, + } + }, (IterDirection::Backward, LastIndex::Before(sep)) => LastIndex::At(*sep - 1), }; match next { diff --git a/src/btree/mod.rs b/src/btree/mod.rs index 4aa14c57..bbe2f5a4 100644 --- a/src/btree/mod.rs +++ b/src/btree/mod.rs @@ -357,14 +357,14 @@ pub mod commit_overlay { use super::*; use crate::{ column::{ColId, Column}, - db::{BTreeCommitOverlay, Operation}, + db::{BTreeCommitOverlay, Operation, RcKey, RcValue}, error::Result, }; #[derive(Debug)] pub struct BTreeChangeSet { pub col: ColId, - pub changes: Vec, Vec>>, + pub changes: Vec>, } impl BTreeChangeSet { @@ -372,9 +372,13 @@ pub mod commit_overlay { BTreeChangeSet { col, changes: Default::default() } } - pub fn push(&mut self, change: Operation, Vec>) { + pub fn push(&mut self, change: Operation) { // No key hashing - self.changes.push(change); + self.changes.push(match change { + Operation::Set(k, v) => Operation::Set(k.into(), v.into()), + Operation::Dereference(k) => Operation::Dereference(k.into()), + Operation::Reference(k) => Operation::Reference(k.into()), + }); } pub fn copy_to_overlay( @@ -388,8 +392,8 @@ pub mod commit_overlay { for change in self.changes.iter() { match change { Operation::Set(key, value) => { - *bytes += key.len(); - *bytes += value.len(); + *bytes += key.value().len(); + *bytes += value.value().len(); overlay.insert(key.clone(), (record_id, Some(value.clone()))); }, Operation::Dereference(key) => { @@ -397,7 +401,7 @@ pub mod commit_overlay { // (current ref_counted implementation does not // make much sense for btree indexed content). if !ref_counted { - *bytes += key.len(); + *bytes += key.value().len(); overlay.insert(key.clone(), (record_id, None)); } }, diff --git a/src/btree/node.rs b/src/btree/node.rs index 828b8313..3e5d773a 100644 --- a/src/btree/node.rs +++ b/src/btree/node.rs @@ -9,6 +9,7 @@ use super::{ }; use crate::{ column::Column, + db::{RcKey, RcValue}, error::Result, index::Address, log::{LogQuery, LogWriter}, @@ -55,7 +56,7 @@ impl Node { &mut self, parent: Option<(&mut Self, usize)>, depth: u32, - changes: &mut &[Operation, Vec>], + changes: &mut &[Operation], btree: TablesRef, log: &mut LogWriter, ) -> Result<(Option<(Separator, Child)>, bool)> { @@ -70,7 +71,7 @@ impl Node { } let r = match &changes[0] { Operation::Set(key, value) => - self.insert(depth, key, value, changes, btree, log)?, + self.insert(depth, key.value(), value.value(), changes, btree, log)?, _ => self.on_existing(depth, changes, btree, log)?, }; if r.0.is_some() || r.1 { @@ -81,12 +82,12 @@ impl Node { } if let Some((parent, p)) = &parent { let key = &changes[1].key(); - let (at, i) = self.position(key)?; // TODO could start position from current + let (at, i) = self.position(key.value())?; // TODO could start position from current if at || i < self.number_separator() { *changes = &changes[1..]; continue } - let (at, i) = parent.position(key)?; + let (at, i) = parent.position(key.value())?; if !at && &i == p && i < parent.number_separator() { *changes = &changes[1..]; continue @@ -105,7 +106,7 @@ impl Node { depth: u32, key: &[u8], value: &[u8], - changes: &mut &[Operation, Vec>], + changes: &mut &[Operation], btree: TablesRef, log: &mut LogWriter, ) -> Result<(Option<(Separator, Child)>, bool)> { @@ -235,18 +236,18 @@ impl Node { fn on_existing( &mut self, depth: u32, - changes: &mut &[Operation, Vec>], + changes: &mut &[Operation], values: TablesRef, log: &mut LogWriter, ) -> Result<(Option<(Separator, Child)>, bool)> { let change = &changes[0]; let key = change.key(); let has_child = depth != 0; - let (at, i) = self.position(key)?; + let (at, i) = self.position(key.value())?; if at { let existing = self.separator_address(i); if let Some(existing) = existing { - if Column::write_existing_value_plan::<_, Vec>( + if Column::write_existing_value_plan::<_, RcValue>( &TableKey::NoHash, values, existing, diff --git a/src/column.rs b/src/column.rs index 036f861b..410b9037 100644 --- a/src/column.rs +++ b/src/column.rs @@ -4,7 +4,7 @@ use crate::{ btree::BTreeTable, compress::Compress, - db::{check::CheckDisplay, Operation}, + db::{check::CheckDisplay, Operation, RcValue}, display::hex, error::{Error, Result}, index::{Address, IndexTable, PlanOutcome, TableId as IndexTableId}, @@ -498,7 +498,7 @@ impl HashColumn { pub fn write_plan( &self, - change: &Operation>, + change: &Operation, log: &mut LogWriter, ) -> Result { let tables = self.tables.upgradable_read(); @@ -509,7 +509,8 @@ impl HashColumn { } else { match change { Operation::Set(key, value) => { - let (r, _, _) = self.write_plan_new(tables, reindex, key, value, log)?; + let (r, _, _) = + self.write_plan_new(tables, reindex, key, value.as_ref(), log)?; Ok(r) }, Operation::Dereference(key) => { @@ -534,7 +535,7 @@ impl HashColumn { fn write_plan_existing( &self, tables: &Tables, - change: &Operation>, + change: &Operation, log: &mut LogWriter, index: &IndexTable, sub_index: usize, diff --git a/src/db.rs b/src/db.rs index 6216da7e..ae42fb15 100644 --- a/src/db.rs +++ b/src/db.rs @@ -32,6 +32,7 @@ use crate::{ }; use fs2::FileExt; use std::{ + borrow::Borrow, collections::{BTreeMap, HashMap, VecDeque}, ops::Bound, sync::{ @@ -56,6 +57,50 @@ const KEEP_LOGS: usize = 16; /// Value is just a vector of bytes. Value sizes up to 4Gb are allowed. pub type Value = Vec; +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct RcValue(Arc); + +pub type RcKey = RcValue; + +impl RcValue { + pub fn value(&self) -> &Value { + &self.0 + } +} + +impl AsRef<[u8]> for RcValue { + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + +impl Borrow<[u8]> for RcValue { + fn borrow(&self) -> &[u8] { + self.value().borrow() + } +} + +impl Borrow> for RcValue { + fn borrow(&self) -> &Vec { + self.value().borrow() + } +} + +impl From for RcValue { + fn from(value: Value) -> Self { + Self(value.into()) + } +} + +#[cfg(test)] +impl TryFrom for [u8; N] { + type Error = <[u8; N] as TryFrom>>::Error; + + fn try_from(value: RcValue) -> std::result::Result { + value.value().clone().try_into() + } +} + // Commit data passed to `commit` #[derive(Debug, Default)] struct Commit { @@ -192,7 +237,7 @@ impl DbInner { let overlay = self.commit_overlay.read(); // Check commit overlay first if let Some(v) = overlay.get(col as usize).and_then(|o| o.get(&key)) { - return Ok(v) + return Ok(v.map(|i| i.value().clone())) } // Go into tables and log overlay. let log = self.log.overlays(); @@ -201,7 +246,7 @@ impl DbInner { Column::Tree(column) => { let overlay = self.commit_overlay.read(); if let Some(l) = overlay.get(col as usize).and_then(|o| o.btree_get(key)) { - return Ok(l.cloned()) + return Ok(l.map(|i| i.value().clone())) } // We lock log, if btree structure changed while reading that would be an issue. let log = self.log.overlays().read(); @@ -226,7 +271,7 @@ impl DbInner { Column::Tree(column) => { let overlay = self.commit_overlay.read(); if let Some(l) = overlay.get(col as usize).and_then(|o| o.btree_get(key)) { - return Ok(l.map(|v| v.len() as u32)) + return Ok(l.map(|v| v.value().len() as u32)) } let log = self.log.overlays().read(); let l = column.with_locked(|btree| BTreeTable::get(key, &*log, btree))?; @@ -1124,8 +1169,8 @@ impl Drop for Db { } } -pub type IndexedCommitOverlay = HashMap), IdentityBuildHasher>; -pub type BTreeCommitOverlay = BTreeMap, (u64, Option)>; +pub type IndexedCommitOverlay = HashMap), IdentityBuildHasher>; +pub type BTreeCommitOverlay = BTreeMap)>; #[derive(Debug)] pub struct CommitOverlay { @@ -1145,23 +1190,26 @@ impl CommitOverlay { } impl CommitOverlay { - fn get_ref(&self, key: &[u8]) -> Option> { + fn get_ref(&self, key: &[u8]) -> Option> { self.indexed.get(key).map(|(_, v)| v.as_ref()) } - fn get(&self, key: &[u8]) -> Option> { + fn get(&self, key: &[u8]) -> Option> { self.get_ref(key).map(|v| v.cloned()) } fn get_size(&self, key: &[u8]) -> Option> { - self.get_ref(key).map(|res| res.as_ref().map(|b| b.len() as u32)) + self.get_ref(key).map(|res| res.as_ref().map(|b| b.value().len() as u32)) } - fn btree_get(&self, key: &[u8]) -> Option> { + fn btree_get(&self, key: &[u8]) -> Option> { self.btree_indexed.get(key).map(|(_, v)| v.as_ref()) } - pub fn btree_next(&self, last_key: &crate::btree::LastKey) -> Option<(Value, Option)> { + pub fn btree_next( + &self, + last_key: &crate::btree::LastKey, + ) -> Option<(RcValue, Option)> { use crate::btree::LastKey; match &last_key { LastKey::Start => self @@ -1177,13 +1225,16 @@ impl CommitOverlay { .map(|(k, (_, v))| (k.clone(), v.clone())), LastKey::Seeked(key) => self .btree_indexed - .range::, _>(key..) + .range::(key..) .next() .map(|(k, (_, v))| (k.clone(), v.clone())), } } - pub fn btree_prev(&self, last_key: &crate::btree::LastKey) -> Option<(Value, Option)> { + pub fn btree_prev( + &self, + last_key: &crate::btree::LastKey, + ) -> Option<(RcValue, Option)> { use crate::btree::LastKey; match &last_key { LastKey::End => self @@ -1271,7 +1322,7 @@ pub struct CommitChangeSet { #[derive(Debug)] pub struct IndexedChangeSet { pub col: ColId, - pub changes: Vec>>, + pub changes: Vec>, } impl IndexedChangeSet { @@ -1291,13 +1342,13 @@ impl IndexedChangeSet { }; self.push_change_hashed(match change { - Operation::Set(k, v) => Operation::Set(hash_key(k.as_ref()), v), + Operation::Set(k, v) => Operation::Set(hash_key(k.as_ref()), v.into()), Operation::Dereference(k) => Operation::Dereference(hash_key(k.as_ref())), Operation::Reference(k) => Operation::Reference(hash_key(k.as_ref())), }) } - fn push_change_hashed(&mut self, change: Operation>) { + fn push_change_hashed(&mut self, change: Operation) { self.changes.push(change); } @@ -1313,7 +1364,7 @@ impl IndexedChangeSet { match &change { Operation::Set(k, v) => { *bytes += k.len(); - *bytes += v.len(); + *bytes += v.value().len(); overlay.indexed.insert(*k, (record_id, Some(v.clone()))); }, Operation::Dereference(k) => { @@ -1362,12 +1413,13 @@ impl IndexedChangeSet { use std::collections::hash_map::Entry; for change in self.changes.iter() { match change { - Operation::Set(k, _) | Operation::Dereference(k) => + Operation::Set(k, _) | Operation::Dereference(k) => { if let Entry::Occupied(e) = overlay.indexed.entry(*k) { if e.get().0 == record_id { e.remove_entry(); } - }, + } + }, Operation::Reference(..) => (), } } @@ -1419,12 +1471,11 @@ enum OpeningMode { #[cfg(test)] mod tests { - use crate::{ColumnOptions, Value}; - use super::{Db, Options}; use crate::{ column::ColId, db::{DbInner, OpeningMode}, + ColumnOptions, Value, }; use rand::Rng; use std::{ @@ -2305,7 +2356,7 @@ mod tests { let start_state: BTreeMap, Vec> = data_start.iter().cloned().map(|(_c, k, v)| (k, v.unwrap())).collect(); - let mut end_state: BTreeMap, Vec> = start_state.clone(); + let mut end_state = start_state.clone(); for (_c, k, v) in data_change.iter() { if let Some(v) = v { end_state.insert(k.clone(), v.clone()); @@ -2337,7 +2388,7 @@ mod tests { let data_change = vec![(0, b"key2".to_vec(), Some(b"val2".to_vec()))]; let start_state: BTreeMap, Vec> = data_start.iter().cloned().map(|(_c, k, v)| (k, v.unwrap())).collect(); - let mut end_state: BTreeMap, Vec> = start_state.clone(); + let mut end_state = start_state.clone(); for (_c, k, v) in data_change.iter() { if let Some(v) = v { end_state.insert(k.clone(), v.clone()); @@ -2367,7 +2418,7 @@ mod tests { let mut iter = db.iter(col_nb).unwrap(); let mut iter_state = start_state.iter(); - let mut last_key = Vec::new(); + let mut last_key = Value::new(); for _ in 0..commit_at { let next = iter.next().unwrap(); if let Some((k, _)) = next.as_ref() { diff --git a/src/error.rs b/src/error.rs index c8989a20..b0abbaac 100644 --- a/src/error.rs +++ b/src/error.rs @@ -29,8 +29,9 @@ impl fmt::Display for Error { Error::Io(e) => write!(f, "IO Error: {e}"), Error::Corruption(e) => write!(f, "Corruption: {e}"), Error::InvalidConfiguration(e) => write!(f, "Invalid configuration: {e}"), - Error::IncompatibleColumnConfig { id, reason } => - write!(f, "Invalid column {id} configuration : {reason}"), + Error::IncompatibleColumnConfig { id, reason } => { + write!(f, "Invalid column {id} configuration : {reason}") + }, Error::InvalidInput(e) => write!(f, "Invalid input: {e}"), Error::InvalidValueData => write!(f, "Invalid data in value table"), Error::Background(e) => write!(f, "Background worker error: {e}"), diff --git a/src/log.rs b/src/log.rs index 99f48fb2..36f90b76 100644 --- a/src/log.rs +++ b/src/log.rs @@ -547,7 +547,7 @@ impl Log { }, Err(e) => Err(e), Ok(id) => { - try_io!(file.seek(std::io::SeekFrom::Start(0))); + try_io!(file.rewind()); log::debug!(target: "parity-db", "Opened existing log {}, first record_id = {}", path.display(), id); Ok((file, Some(id))) }, @@ -707,7 +707,7 @@ impl Log { }; for (id, ref mut file) in cleaned.iter_mut() { log::debug!(target: "parity-db", "Cleaned: {}", id); - try_io!(file.seek(std::io::SeekFrom::Start(0))); + try_io!(file.rewind()); try_io!(file.set_len(0)); } // Move cleaned logs back to the pool @@ -733,7 +733,7 @@ impl Log { let mut reading = self.reading.write(); if reading.is_none() { if let Some((id, mut file)) = self.read_queue.write().pop_front() { - try_io!(file.seek(std::io::SeekFrom::Start(0))); + try_io!(file.rewind()); *reading = Some(Reading { id, file: std::io::BufReader::new(file) }); } else { log::trace!(target: "parity-db", "No active reader"); diff --git a/src/migration.rs b/src/migration.rs index 6a7e050e..3192c5e1 100644 --- a/src/migration.rs +++ b/src/migration.rs @@ -85,7 +85,7 @@ pub fn migrate(from: &Path, mut to: Options, overwrite: bool, force_migrate: &[u .entry(c) .or_insert_with(|| IndexedChangeSet::new(c)) .changes - .push(Operation::Set(key, value)); + .push(Operation::Set(key, value.into())); nb_commit += 1; if nb_commit == COMMIT_SIZE { ncommits += 1;