diff --git a/admin/src/lib.rs b/admin/src/lib.rs index 478fa4ad..16c449a4 100644 --- a/admin/src/lib.rs +++ b/admin/src/lib.rs @@ -89,10 +89,6 @@ pub fn run() -> Result<(), String> { SubCommand::Check(check) => { let db = parity_db::Db::open_read_only(&options) .map_err(|e| format!("Invalid db: {e:?}"))?; - if !check.index_value { - // Note that we should use enum parameter instead. - return Err("Requires one of the following check flag: --index-value".to_string()) - } let check_param = parity_db::CheckOptions::new( check.column, check.range_start, @@ -274,11 +270,6 @@ pub struct Check { #[clap(long)] pub column: Option, - /// Parse indexes and - /// lookup values. - #[clap(long)] - pub index_value: bool, - /// Start range for operation. /// Index start chunk in db. #[clap(long)] diff --git a/src/column.rs b/src/column.rs index 64f1ca22..1f8c9c67 100644 --- a/src/column.rs +++ b/src/column.rs @@ -114,6 +114,24 @@ pub struct TablesRef<'a> { pub ref_counted: bool, } +/// Value iteration state +pub struct ValueIterState { + /// Reference counter. + pub rc: u32, + /// Value. + pub value: Vec, +} + +// Only used for DB validation and migration. +pub struct CorruptedIndexEntryInfo { + pub chunk_index: u64, + pub sub_index: u32, + pub entry: crate::index::Entry, + pub value_entry: Option>, + pub error: Option, +} + +// Only used for DB validation and migration. pub struct IterState { pub chunk_index: u64, pub key: Key, @@ -121,9 +139,10 @@ pub struct IterState { pub value: Vec, } +// Only used for DB validation and migration. enum IterStateOrCorrupted { Item(IterState), - Corrupted(crate::index::Entry, Option), + Corrupted(CorruptedIndexEntryInfo), } #[inline] @@ -724,59 +743,49 @@ impl HashColumn { tables.index.write_stats(&self.stats) } - pub fn iter_while(&self, log: &Log, mut f: impl FnMut(IterState) -> bool) -> Result<()> { + pub fn iter_values(&self, log: &Log, mut f: impl FnMut(ValueIterState) -> bool) -> Result<()> { + let tables = self.tables.read(); + for table in &tables.value { + log::debug!( target: "parity-db", "{}: Iterating table {}", tables.index.id, table.id); + table.iter_while(log.overlays(), |_, rc, value, compressed| { + let value = if compressed { + if let Ok(value) = self.compression.decompress(&value) { + value + } else { + return false + } + } else { + value + }; + let state = ValueIterState { rc, value }; + f(state) + })?; + log::debug!( target: "parity-db", "{}: Done iterating table {}", tables.index.id, table.id); + } + Ok(()) + } + + pub fn iter_index(&self, log: &Log, mut f: impl FnMut(IterState) -> bool) -> Result<()> { let action = |state| match state { IterStateOrCorrupted::Item(item) => Ok(f(item)), IterStateOrCorrupted::Corrupted(..) => Err(Error::Corruption("Missing indexed value".into())), }; - self.iter_while_inner(log, action, 0, true) + self.iter_index_internal(log, action, 0) } - fn iter_while_inner( + fn iter_index_internal( &self, log: &Log, mut f: impl FnMut(IterStateOrCorrupted) -> Result, start_chunk: u64, - skip_preimage_indexes: bool, ) -> Result<()> { - use blake2::{digest::typenum::U32, Blake2b, Digest}; - let tables = self.tables.read(); let source = &tables.index; - if skip_preimage_indexes && self.preimage { - // It is much faster to iterate over the value table than index. - // We have to assume hashing scheme however. - for table in &tables.value[..tables.value.len() - 1] { - log::debug!( target: "parity-db", "{}: Iterating table {}", source.id, table.id); - table.iter_while(log.overlays(), |index, rc, value, compressed| { - let value = if compressed { - if let Ok(value) = self.compression.decompress(&value) { - value - } else { - return false - } - } else { - value - }; - let key = Blake2b::::digest(&value); - let key = self.hash_key(&key); - let state = IterStateOrCorrupted::Item(IterState { - chunk_index: index, - key, - rc, - value, - }); - f(state).unwrap_or(false) - })?; - log::debug!( target: "parity-db", "{}: Done iterating table {}", source.id, table.id); - } - } - for c in start_chunk..source.id.total_chunks() { let entries = source.entries(c, log.overlays())?; - for entry in entries.iter() { + for (sub_index, entry) in entries.iter().enumerate() { if entry.is_empty() { continue } @@ -785,20 +794,37 @@ impl HashColumn { (address.size_tier(), address.offset()) }; - if skip_preimage_indexes && - self.preimage && size_tier as usize != tables.value.len() - 1 - { - continue - } let value = tables.value[size_tier as usize].get_with_meta(offset, log.overlays()); let (value, rc, pk, compressed) = match value { Ok(Some(v)) => v, Ok(None) => { - f(IterStateOrCorrupted::Corrupted(*entry, None))?; + let value_entry = tables.value[size_tier as usize].dump_entry(offset).ok(); + if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo { + chunk_index: c, + sub_index: sub_index as u32, + value_entry, + entry: *entry, + error: None, + }))? { + return Ok(()) + } continue }, Err(e) => { - f(IterStateOrCorrupted::Corrupted(*entry, Some(e)))?; + let value_entry = if let Error::Corruption(_) = &e { + tables.value[size_tier as usize].dump_entry(offset).ok() + } else { + None + }; + if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo { + chunk_index: c, + sub_index: sub_index as u32, + value_entry, + entry: *entry, + error: Some(e), + }))? { + return Ok(()) + } continue }, }; @@ -828,19 +854,22 @@ impl HashColumn { let start_chunk = check_param.from.unwrap_or(0); let end_chunk = check_param.bound; - let step = 1000; + let step = 10000; + let mut next_info_chunk = step; let start_time = std::time::Instant::now(); - log::info!(target: "parity-db", "Starting full index iteration at {:?}", start_time); - log::info!(target: "parity-db", "for {} chunks of column {}", self.tables.read().index.id.total_chunks(), col); - self.iter_while_inner( + let total_chunks = self.tables.read().index.id.total_chunks(); + let index_id = self.tables.read().index.id; + log::info!(target: "parity-db", "Column {} (hash): Starting index validation", col); + self.iter_index_internal( log, |state| match state { IterStateOrCorrupted::Item(IterState { chunk_index, key, rc, value }) => { if Some(chunk_index) == end_chunk { return Ok(false) } - if chunk_index % step == 0 { - log::info!(target: "parity-db", "Chunk iteration at {}", chunk_index); + if chunk_index >= next_info_chunk { + next_info_chunk += step; + log::info!(target: "parity-db", "Validated {} / {} chunks", chunk_index, total_chunks); } match check_param.display { @@ -865,16 +894,25 @@ impl HashColumn { } Ok(true) }, - IterStateOrCorrupted::Corrupted(entry, e) => { - log::info!("Corrupted value for index entry: {}:\n\t{:?}", entry.as_u64(), e); + IterStateOrCorrupted::Corrupted(c) => { + log::error!( + "Corrupted value for index entry: [{}][{}]: {} ({:?}). Error: {:?}", + c.chunk_index, + c.sub_index, + c.entry.address(index_id.index_bits()), + hex(&c.entry.as_u64().to_le_bytes()), + c.error, + ); + if let Some(v) = c.value_entry { + log::error!("Value entry: {:?}", hex(v.as_slice())); + } Ok(true) }, }, start_chunk, - false, )?; - log::info!(target: "parity-db", "Ended full index check, elapsed {:?}", start_time.elapsed()); + log::info!(target: "parity-db", "Index validation complete successfully, elapsed {:?}", start_time.elapsed()); Ok(()) } diff --git a/src/db.rs b/src/db.rs index 54db38fb..2d6de555 100644 --- a/src/db.rs +++ b/src/db.rs @@ -20,7 +20,7 @@ use crate::{ btree::{commit_overlay::BTreeChangeSet, BTreeIterator, BTreeTable}, - column::{hash_key, ColId, Column, IterState, ReindexBatch}, + column::{hash_key, ColId, Column, IterState, ReindexBatch, ValueIterState}, error::{try_io, Error, Result}, hash::IdentityBuildHasher, index::PlanOutcome, @@ -853,14 +853,22 @@ impl DbInner { } } - fn iter_column_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> { + fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> { match &self.columns[c as usize] { - Column::Hash(column) => column.iter_while(&self.log, f), + Column::Hash(column) => column.iter_values(&self.log, f), + Column::Tree(_) => unimplemented!(), + } + } + + fn iter_column_index_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> { + match &self.columns[c as usize] { + Column::Hash(column) => column.iter_index(&self.log, f), Column::Tree(_) => unimplemented!(), } } } +/// Database instance. pub struct Db { inner: Arc, commit_thread: Option>, @@ -870,22 +878,25 @@ pub struct Db { } impl Db { - pub fn with_columns(path: &std::path::Path, num_columns: u8) -> Result { + #[cfg(test)] + pub(crate) fn with_columns(path: &std::path::Path, num_columns: u8) -> Result { let options = Options::with_columns(path, num_columns); Self::open_inner(&options, OpeningMode::Create) } - /// Open the database with given options. + /// Open the database with given options. An error will be returned if the database does not + /// exist. pub fn open(options: &Options) -> Result { Self::open_inner(options, OpeningMode::Write) } - /// Create the database using given options. + /// Open the database using given options. If the database does not exist it will be created + /// empty. pub fn open_or_create(options: &Options) -> Result { Self::open_inner(options, OpeningMode::Create) } - /// Read the database using given options + /// Open an existing database in read-only mode. pub fn open_read_only(options: &Options) -> Result { Self::open_inner(options, OpeningMode::ReadOnly) } @@ -987,12 +998,23 @@ impl Db { self.inner.columns.len() as u8 } + /// Iterate a column and call a function for each value. This is only supported for columns with + /// `btree_index` set to `false`. Iteration order is unspecified. + /// Unlike `get` the iteration may not include changes made in recent `commit` calls. + pub fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> { + self.inner.iter_column_while(c, f) + } + /// Iterate a column and call a function for each value. This is only supported for columns with /// `btree_index` set to `false`. Iteration order is unspecified. Note that the /// `key` field in the state is the hash of the original key. - /// Unlinke `get` the iteration may not include changes made in recent `commit` calls. - pub fn iter_column_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> { - self.inner.iter_column_while(c, f) + /// Unlike `get` the iteration may not include changes made in recent `commit` calls. + pub(crate) fn iter_column_index_while( + &self, + c: ColId, + f: impl FnMut(IterState) -> bool, + ) -> Result<()> { + self.inner.iter_column_index_while(c, f) } fn commit_worker(db: Arc) -> Result<()> { @@ -1052,6 +1074,7 @@ impl Db { Ok(()) } + /// Dump full database stats to the text output. pub fn write_stats_text( &self, writer: &mut impl std::io::Write, @@ -1060,6 +1083,7 @@ impl Db { self.inner.write_stats_text(writer, column) } + /// Reset internal database statistics for the database or specified column. pub fn clear_stats(&self, column: Option) -> Result<()> { self.inner.clear_stats(column) } @@ -1418,20 +1442,30 @@ impl IndexedChangeSet { /// Verification operation utilities. pub mod check { + /// Database dump verbosity. pub enum CheckDisplay { + /// Don't output any data. None, + /// Output full data. Full, + /// Limit value output to the specified size. Short(u64), } + /// Options for producing a database dump. pub struct CheckOptions { + /// Only process this column. If this is `None` all columns will be processed. pub column: Option, + /// Start with this index. pub from: Option, + /// End with this index. pub bound: Option, + /// Verbosity. pub display: CheckDisplay, } impl CheckOptions { + /// Create a new instance. pub fn new( column: Option, from: Option, diff --git a/src/file.rs b/src/file.rs index 185e5c0d..18ca4e77 100644 --- a/src/file.rs +++ b/src/file.rs @@ -4,7 +4,7 @@ //! Utilities for db file. use crate::{ - error::{try_io, Result}, + error::{try_io, Error, Result}, parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard}, table::TableId, }; @@ -113,7 +113,12 @@ impl TableFile { #[cfg(unix)] pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<()> { use std::os::unix::fs::FileExt; - try_io!(self.file.read().as_ref().unwrap().read_exact_at(buf, offset)); + try_io!(self + .file + .read() + .as_ref() + .ok_or_else(|| Error::Corruption("File does not exist.".into()))? + .read_exact_at(buf, offset)); Ok(()) } @@ -131,7 +136,7 @@ impl TableFile { use std::{io, os::windows::fs::FileExt}; let file = self.file.read(); - let file = file.as_ref().unwrap(); + let file = file.as_ref().ok_or_else(|| Error::Corruption("File does not exist.".into()))?; while !buf.is_empty() { match file.seek_read(buf, offset) { diff --git a/src/lib.rs b/src/lib.rs index b675ea2d..ca2c155d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,7 +20,7 @@ mod stats; mod table; pub use btree::BTreeIterator; -pub use column::ColId; +pub use column::{ColId, ValueIterState}; pub use compress::CompressionType; pub use db::{check::CheckOptions, Db, Operation, Value}; #[cfg(feature = "instrumentation")] diff --git a/src/migration.rs b/src/migration.rs index 3192c5e1..b54406e9 100644 --- a/src/migration.rs +++ b/src/migration.rs @@ -76,33 +76,36 @@ pub fn migrate(from: &Path, mut to: Options, overwrite: bool, force_migrate: &[u continue } log::info!("Migrating col {}", c); - source.iter_column_while(c, |IterState { chunk_index: index, key, rc, mut value }| { - //TODO: more efficient ref migration - for _ in 0..rc { - let value = std::mem::take(&mut value); - commit - .indexed - .entry(c) - .or_insert_with(|| IndexedChangeSet::new(c)) - .changes - .push(Operation::Set(key, value.into())); - nb_commit += 1; - if nb_commit == COMMIT_SIZE { - ncommits += 1; - if let Err(e) = dest.commit_raw(std::mem::take(&mut commit)) { - log::warn!("Migration error: {:?}", e); - return false - } - nb_commit = 0; + source.iter_column_index_while( + c, + |IterState { chunk_index: index, key, rc, mut value }| { + //TODO: more efficient ref migration + for _ in 0..rc { + let value = std::mem::take(&mut value); + commit + .indexed + .entry(c) + .or_insert_with(|| IndexedChangeSet::new(c)) + .changes + .push(Operation::Set(key, value.into())); + nb_commit += 1; + if nb_commit == COMMIT_SIZE { + ncommits += 1; + if let Err(e) = dest.commit_raw(std::mem::take(&mut commit)) { + log::warn!("Migration error: {:?}", e); + return false + } + nb_commit = 0; - if last_time.elapsed() > std::time::Duration::from_secs(3) { - last_time = std::time::Instant::now(); - log::info!("Migrating {} #{}, commit {}", c, index, ncommits); + if last_time.elapsed() > std::time::Duration::from_secs(3) { + last_time = std::time::Instant::now(); + log::info!("Migrating {} #{}, commit {}", c, index, ncommits); + } } } - } - true - })?; + true + }, + )?; if overwrite { dest.commit_raw(commit)?; commit = Default::default(); diff --git a/src/options.rs b/src/options.rs index 25da3f90..abdb621b 100644 --- a/src/options.rs +++ b/src/options.rs @@ -46,6 +46,7 @@ pub struct Options { pub always_flush: bool, } +/// Database column configuration. #[derive(Clone, Debug, PartialEq, Eq)] pub struct ColumnOptions { /// Indicates that the column value is the preimage of the key. @@ -56,14 +57,13 @@ pub struct ColumnOptions { /// the first 32 bytes have uniform distribution. /// Allows for skipping additional key hashing. pub uniform: bool, - /// Use reference counting for values. - /// - /// Reference counting do not enforce immediate removal - /// and user should not check for missing value. + /// Use reference counting for values. Reference operations are allowed for a column. The value + /// is deleted when the counter reaches zero. pub ref_counted: bool, /// Compression to use for this column. pub compression: CompressionType, - /// Column is using a btree indexing. + /// Column is configured to use Btree storage. Btree columns allow for ordered key iteration + /// and key retrieval, but are significantly less performant and require more disk space. pub btree_index: bool, } diff --git a/src/table.rs b/src/table.rs index 389567c2..d660080b 100644 --- a/src/table.rs +++ b/src/table.rs @@ -82,6 +82,8 @@ const MULTIHEAD_COMPRESSED: &[u8] = &[0xfd, 0x7f]; // When a rc reach locked ref, it is locked in db. const LOCKED_REF: u32 = u32::MAX; +const MULTIPART_ENTRY_SIZE: u16 = 4096; + pub type Value = Vec; #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] @@ -374,7 +376,7 @@ impl ValueTable { ) -> Result { let (multipart, entry_size) = match entry_size { Some(s) => (false, s), - None => (true, 4096), + None => (true, MULTIPART_ENTRY_SIZE), }; assert!(entry_size >= MIN_ENTRY_SIZE as u16); assert!(entry_size <= MAX_ENTRY_SIZE as u16); @@ -452,6 +454,11 @@ impl ValueTable { return Ok((0, false)) } + if self.multipart && part == 0 && !buf.is_multihead() { + // This may only happen during value iteration. + return Ok((0, false)) + } + let (entry_end, next) = if self.multipart && buf.is_multi(self.db_version) { if part == 0 && self.db_version > 6 && buf.is_multihead_compressed() { compressed = true; @@ -529,6 +536,13 @@ impl ValueTable { } } + pub fn dump_entry(&self, index: u64) -> Result> { + let entry_size = self.entry_size as usize; + let mut buf = FullEntry::new_uninit_full_entry(); + self.file.read_at(&mut buf[0..entry_size], index * self.entry_size as u64)?; + Ok(buf[0..entry_size].to_vec()) + } + pub fn query( &self, key: &mut TableKeyQuery, @@ -722,7 +736,7 @@ impl ValueTable { let init_offset = buf.offset(); if offset == 0 { if self.ref_counted { - // first rc. + // First reference. buf.write_rc(1u32); } key.write(&mut buf); @@ -1115,7 +1129,7 @@ pub mod key { mod test { const ENTRY_SIZE: u16 = 64; - use super::{TableId, Value, ValueTable}; + use super::{TableId, Value, ValueTable, MULTIPART_ENTRY_SIZE}; use crate::{ log::{Log, LogAction, LogWriter}, options::{ColumnOptions, Options, CURRENT_VERSION}, @@ -1395,10 +1409,67 @@ mod test { } } + #[test] + fn iteration() { + for multipart in [false, true] { + for compressed in [false, true] { + let (entry_size, size_mul) = + if multipart { (None, 100) } else { (Some(MULTIPART_ENTRY_SIZE / 2), 1) }; + + let dir = tempdir().unwrap(); + let table = new_table(&dir, entry_size, &rc_options()); + let log = new_log(&dir); + + let (v1, v2, v3) = ( + value(MULTIPART_ENTRY_SIZE as usize / 8 * size_mul), + value(MULTIPART_ENTRY_SIZE as usize / 4 * size_mul), + value(MULTIPART_ENTRY_SIZE as usize * 3 / 8 * size_mul), + ); + let entries = [ + (TableKey::Partial(key(1)), &v1), + (TableKey::Partial(key(2)), &v2), + (TableKey::Partial(key(3)), &v3), + ]; + + write_ops(&table, &log, |writer| { + for (k, v) in &entries { + table.write_insert_plan(k, &v, writer, compressed).unwrap(); + } + }); + + let mut res = Vec::new(); + table + .iter_while(log.overlays(), |_index, _rc, v, cmpr| { + res.push((v.len(), cmpr)); + true + }) + .unwrap(); + assert_eq!( + res, + vec![(v1.len(), compressed), (v2.len(), compressed), (v3.len(), compressed)] + ); + + let v2_index = 2 + v1.len() as u64 / super::MULTIPART_ENTRY_SIZE as u64; + write_ops(&table, &log, |writer| { + table.write_remove_plan(v2_index, writer).unwrap(); + }); + + let mut res = Vec::new(); + table + .iter_while(log.overlays(), |_index, _rc, v, cmpr| { + res.push((v.len(), cmpr)); + true + }) + .unwrap(); + assert_eq!(res, vec![(v1.len(), compressed), (v3.len(), compressed)]); + } + } + } + #[test] fn ref_underflow() { let dir = tempdir().unwrap(); - let table = new_table(&dir, None, &rc_options()); + let table = new_table(&dir, Some(ENTRY_SIZE), &rc_options()); let log = new_log(&dir); let key = key(1);