Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force value-only iteration in the public API #192

Merged
merged 7 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ pub fn run() -> Result<(), String> {
SubCommand::Check(check) => {
let db = parity_db::Db::open_read_only(&options)
.map_err(|e| format!("Invalid db: {e:?}"))?;
if !check.index_value {
// Note that we should use enum parameter instead.
return Err("Requires one of the following check flag: --index-value".to_string())
}
let check_param = parity_db::CheckOptions::new(
check.column,
check.range_start,
Expand Down Expand Up @@ -274,11 +270,6 @@ pub struct Check {
#[clap(long)]
pub column: Option<u8>,

/// Parse indexes and
/// lookup values.
#[clap(long)]
pub index_value: bool,

/// Start range for operation.
/// Index start chunk in db.
#[clap(long)]
Expand Down
146 changes: 92 additions & 54 deletions src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,35 @@ pub struct TablesRef<'a> {
pub ref_counted: bool,
}

/// Value iteration state
pub struct ValueIterState {
/// Reference counter.
pub rc: u32,
/// Value.
pub value: Vec<u8>,
}

// Only used for DB validation and migration.
pub struct CorruptedIndexEntryInfo {
pub chunk_index: u64,
pub sub_index: u32,
pub entry: crate::index::Entry,
pub value_entry: Option<Vec<u8>>,
pub error: Option<Error>,
}

// Only used for DB validation and migration.
pub struct IterState {
pub chunk_index: u64,
pub key: Key,
pub rc: u32,
pub value: Vec<u8>,
}

// Only used for DB validation and migration.
enum IterStateOrCorrupted {
Item(IterState),
Corrupted(crate::index::Entry, Option<Error>),
Corrupted(CorruptedIndexEntryInfo),
}

#[inline]
Expand Down Expand Up @@ -724,59 +743,49 @@ impl HashColumn {
tables.index.write_stats(&self.stats)
}

pub fn iter_while(&self, log: &Log, mut f: impl FnMut(IterState) -> bool) -> Result<()> {
pub fn iter_values(&self, log: &Log, mut f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
let tables = self.tables.read();
for table in &tables.value {
log::debug!( target: "parity-db", "{}: Iterating table {}", tables.index.id, table.id);
table.iter_while(log.overlays(), |_, rc, value, compressed| {
let value = if compressed {
if let Ok(value) = self.compression.decompress(&value) {
value
} else {
return false
}
} else {
value
};
let state = ValueIterState { rc, value };
f(state)
})?;
log::debug!( target: "parity-db", "{}: Done iterating table {}", tables.index.id, table.id);
}
Ok(())
}

pub fn iter_index(&self, log: &Log, mut f: impl FnMut(IterState) -> bool) -> Result<()> {
let action = |state| match state {
IterStateOrCorrupted::Item(item) => Ok(f(item)),
IterStateOrCorrupted::Corrupted(..) =>
Err(Error::Corruption("Missing indexed value".into())),
};
self.iter_while_inner(log, action, 0, true)
self.iter_index_internal(log, action, 0)
}

fn iter_while_inner(
fn iter_index_internal(
&self,
log: &Log,
mut f: impl FnMut(IterStateOrCorrupted) -> Result<bool>,
start_chunk: u64,
skip_preimage_indexes: bool,
) -> Result<()> {
use blake2::{digest::typenum::U32, Blake2b, Digest};

let tables = self.tables.read();
let source = &tables.index;

if skip_preimage_indexes && self.preimage {
// It is much faster to iterate over the value table than index.
// We have to assume hashing scheme however.
for table in &tables.value[..tables.value.len() - 1] {
log::debug!( target: "parity-db", "{}: Iterating table {}", source.id, table.id);
table.iter_while(log.overlays(), |index, rc, value, compressed| {
let value = if compressed {
if let Ok(value) = self.compression.decompress(&value) {
value
} else {
return false
}
} else {
value
};
let key = Blake2b::<U32>::digest(&value);
let key = self.hash_key(&key);
let state = IterStateOrCorrupted::Item(IterState {
chunk_index: index,
key,
rc,
value,
});
f(state).unwrap_or(false)
})?;
log::debug!( target: "parity-db", "{}: Done iterating table {}", source.id, table.id);
}
}

for c in start_chunk..source.id.total_chunks() {
let entries = source.entries(c, log.overlays())?;
for entry in entries.iter() {
for (sub_index, entry) in entries.iter().enumerate() {
if entry.is_empty() {
continue
}
Expand All @@ -785,20 +794,37 @@ impl HashColumn {
(address.size_tier(), address.offset())
};

if skip_preimage_indexes &&
self.preimage && size_tier as usize != tables.value.len() - 1
{
continue
}
let value = tables.value[size_tier as usize].get_with_meta(offset, log.overlays());
let (value, rc, pk, compressed) = match value {
Ok(Some(v)) => v,
Ok(None) => {
f(IterStateOrCorrupted::Corrupted(*entry, None))?;
let value_entry = tables.value[size_tier as usize].dump_entry(offset).ok();
if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
chunk_index: c,
sub_index: sub_index as u32,
value_entry,
entry: *entry,
error: None,
}))? {
return Ok(())
}
continue
},
Err(e) => {
f(IterStateOrCorrupted::Corrupted(*entry, Some(e)))?;
let value_entry = if let Error::Corruption(_) = &e {
tables.value[size_tier as usize].dump_entry(offset).ok()
} else {
None
};
if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
chunk_index: c,
sub_index: sub_index as u32,
value_entry,
entry: *entry,
error: Some(e),
}))? {
return Ok(())
}
continue
},
};
Expand Down Expand Up @@ -828,19 +854,22 @@ impl HashColumn {
let start_chunk = check_param.from.unwrap_or(0);
let end_chunk = check_param.bound;

let step = 1000;
let step = 10000;
let mut next_info_chunk = step;
let start_time = std::time::Instant::now();
log::info!(target: "parity-db", "Starting full index iteration at {:?}", start_time);
log::info!(target: "parity-db", "for {} chunks of column {}", self.tables.read().index.id.total_chunks(), col);
self.iter_while_inner(
let total_chunks = self.tables.read().index.id.total_chunks();
let index_id = self.tables.read().index.id;
log::info!(target: "parity-db", "Column {} (hash): Starting index validation", col);
self.iter_index_internal(
log,
|state| match state {
IterStateOrCorrupted::Item(IterState { chunk_index, key, rc, value }) => {
if Some(chunk_index) == end_chunk {
return Ok(false)
}
if chunk_index % step == 0 {
log::info!(target: "parity-db", "Chunk iteration at {}", chunk_index);
if chunk_index >= next_info_chunk {
next_info_chunk += step;
log::info!(target: "parity-db", "Validated {} / {} chunks", chunk_index, total_chunks);
}

match check_param.display {
Expand All @@ -865,16 +894,25 @@ impl HashColumn {
}
Ok(true)
},
IterStateOrCorrupted::Corrupted(entry, e) => {
log::info!("Corrupted value for index entry: {}:\n\t{:?}", entry.as_u64(), e);
IterStateOrCorrupted::Corrupted(c) => {
log::error!(
"Corrupted value for index entry: [{}][{}]: {} ({:?}). Error: {:?}",
c.chunk_index,
c.sub_index,
c.entry.address(index_id.index_bits()),
hex(&c.entry.as_u64().to_le_bytes()),
c.error,
);
if let Some(v) = c.value_entry {
log::error!("Value entry: {:?}", hex(v.as_slice()));
}
Ok(true)
},
},
start_chunk,
false,
)?;

log::info!(target: "parity-db", "Ended full index check, elapsed {:?}", start_time.elapsed());
log::info!(target: "parity-db", "Index validation complete successfully, elapsed {:?}", start_time.elapsed());
Ok(())
}

Expand Down
54 changes: 44 additions & 10 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use crate::{
btree::{commit_overlay::BTreeChangeSet, BTreeIterator, BTreeTable},
column::{hash_key, ColId, Column, IterState, ReindexBatch},
column::{hash_key, ColId, Column, IterState, ReindexBatch, ValueIterState},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shoudn't we make ValueIterState pub in src/lib.rs to allow its usage from other crates?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, even though public types from private modules are allowed to be leaked. They can be used but can't be named. I've aded it to public exports anyway.

error::{try_io, Error, Result},
hash::IdentityBuildHasher,
index::PlanOutcome,
Expand Down Expand Up @@ -853,14 +853,22 @@ impl DbInner {
}
}

fn iter_column_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> {
fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
match &self.columns[c as usize] {
Column::Hash(column) => column.iter_while(&self.log, f),
Column::Hash(column) => column.iter_values(&self.log, f),
Column::Tree(_) => unimplemented!(),
}
}

fn iter_column_index_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> {
match &self.columns[c as usize] {
Column::Hash(column) => column.iter_index(&self.log, f),
Column::Tree(_) => unimplemented!(),
}
}
}

/// Database instance.
pub struct Db {
inner: Arc<DbInner>,
commit_thread: Option<thread::JoinHandle<()>>,
Expand All @@ -870,22 +878,25 @@ pub struct Db {
}

impl Db {
pub fn with_columns(path: &std::path::Path, num_columns: u8) -> Result<Db> {
#[cfg(test)]
pub(crate) fn with_columns(path: &std::path::Path, num_columns: u8) -> Result<Db> {
let options = Options::with_columns(path, num_columns);
Self::open_inner(&options, OpeningMode::Create)
}

/// Open the database with given options.
/// Open the database with given options. An error will be returned if the database does not
/// exist.
pub fn open(options: &Options) -> Result<Db> {
Self::open_inner(options, OpeningMode::Write)
}

/// Create the database using given options.
/// Open the database using given options. If the database does not exist it will be created
/// empty.
pub fn open_or_create(options: &Options) -> Result<Db> {
Self::open_inner(options, OpeningMode::Create)
}

/// Read the database using given options
/// Open an existing database in read-only mode.
pub fn open_read_only(options: &Options) -> Result<Db> {
Self::open_inner(options, OpeningMode::ReadOnly)
}
Expand Down Expand Up @@ -987,12 +998,23 @@ impl Db {
self.inner.columns.len() as u8
}

/// Iterate a column and call a function for each value. This is only supported for columns with
/// `btree_index` set to `false`. Iteration order is unspecified.
/// Unlike `get` the iteration may not include changes made in recent `commit` calls.
pub fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
self.inner.iter_column_while(c, f)
}

/// Iterate a column and call a function for each value. This is only supported for columns with
/// `btree_index` set to `false`. Iteration order is unspecified. Note that the
/// `key` field in the state is the hash of the original key.
/// Unlinke `get` the iteration may not include changes made in recent `commit` calls.
pub fn iter_column_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> {
self.inner.iter_column_while(c, f)
/// Unlike `get` the iteration may not include changes made in recent `commit` calls.
pub(crate) fn iter_column_index_while(
&self,
c: ColId,
f: impl FnMut(IterState) -> bool,
) -> Result<()> {
self.inner.iter_column_index_while(c, f)
}

fn commit_worker(db: Arc<DbInner>) -> Result<()> {
Expand Down Expand Up @@ -1052,6 +1074,7 @@ impl Db {
Ok(())
}

/// Dump full database stats to the text output.
pub fn write_stats_text(
&self,
writer: &mut impl std::io::Write,
Expand All @@ -1060,6 +1083,7 @@ impl Db {
self.inner.write_stats_text(writer, column)
}

/// Reset internal database statistics for the database or specified column.
pub fn clear_stats(&self, column: Option<u8>) -> Result<()> {
self.inner.clear_stats(column)
}
Expand Down Expand Up @@ -1418,20 +1442,30 @@ impl IndexedChangeSet {

/// Verification operation utilities.
pub mod check {
/// Database dump verbosity.
pub enum CheckDisplay {
/// Don't output any data.
None,
/// Output full data.
Full,
/// Limit value output to the specified size.
Short(u64),
}

/// Options for producing a database dump.
pub struct CheckOptions {
/// Only process this column. If this is `None` all columns will be processed.
pub column: Option<u8>,
/// Start with this index.
pub from: Option<u64>,
/// End with this index.
pub bound: Option<u64>,
/// Verbosity.
pub display: CheckDisplay,
}

impl CheckOptions {
/// Create a new instance.
pub fn new(
column: Option<u8>,
from: Option<u64>,
Expand Down
Loading