Skip to content

Commit

Permalink
fix: reduce allocations in copy_to_overlay
Browse files Browse the repository at this point in the history
  • Loading branch information
hanabi1224 committed Feb 3, 2023
1 parent f8136c5 commit 43331db
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 89 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ snap = "1"
loom = { version = "0.5.1", optional = true }

[dev-dependencies]
env_logger = "0.9.0"
env_logger = "0.10.0"
fdlimit = "0.2.1"
rand = { version = "0.8.2", features = ["small_rng"] }
tempfile = "3.2"
Expand Down
67 changes: 35 additions & 32 deletions src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::{
btree::BTreeTable,
compress::Compress,
db::{check::CheckDisplay, Operation},
db::{check::CheckDisplay, Operation, ValuePtr},
display::hex,
error::{Error, Result},
index::{Address, IndexTable, PlanOutcome, TableId as IndexTableId},
Expand Down Expand Up @@ -167,14 +167,14 @@ impl HashColumn {
if self.collect_stats {
self.stats.query_hit(tier);
}
return Ok(Some(value))
return Ok(Some(value));
}
for r in &self.reindex.read().queue {
if let Some((tier, value)) = self.get_in_index(key, r, values, log)? {
if self.collect_stats {
self.stats.query_hit(tier);
}
return Ok(Some(value))
return Ok(Some(value));
}
}
if self.collect_stats {
Expand Down Expand Up @@ -238,7 +238,7 @@ impl Column {
tables.tables[size_tier].query(&mut key, address.offset(), log)?
{
let value = if compressed { tables.compression.decompress(&value)? } else { value };
return Ok(Some((size_tier as u8, value)))
return Ok(Some((size_tier as u8, value)));
}
Ok(None)
}
Expand Down Expand Up @@ -419,7 +419,7 @@ impl HashColumn {
) -> Result<PlanOutcome> {
if Self::contains_partial_key_with_address(key, address, &tables.index, log)? {
log::trace!(target: "parity-db", "{}: Skipped reindex entry {} when reindexing", tables.index.id, hex(key));
return Ok(PlanOutcome::Skipped)
return Ok(PlanOutcome::Skipped);
}
let mut outcome = PlanOutcome::Written;
while let PlanOutcome::NeedReindex =
Expand Down Expand Up @@ -448,7 +448,7 @@ impl HashColumn {
&table_key,
log,
)? {
return Ok(Some((index, sub_index, existing_address)))
return Ok(Some((index, sub_index, existing_address)));
}

let (next_entry, next_index) = index.get(key, sub_index + 1, log)?;
Expand All @@ -468,7 +468,7 @@ impl HashColumn {
while !existing_entry.is_empty() {
let existing_address = existing_entry.address(index.id.index_bits());
if existing_address == address {
return Ok(true)
return Ok(true);
}
let (next_entry, next_index) = index.get(key, sub_index + 1, log)?;
existing_entry = next_entry;
Expand All @@ -484,21 +484,21 @@ impl HashColumn {
log: &LogWriter,
) -> Result<Option<(&'a IndexTable, usize, Address)>> {
if let Some(r) = Self::search_index(key, &tables.index, tables, log)? {
return Ok(Some(r))
return Ok(Some(r));
}
// Check old indexes
// TODO: don't search if index precedes reindex progress
for index in &reindex.queue {
if let Some(r) = Self::search_index(key, index, tables, log)? {
return Ok(Some(r))
return Ok(Some(r));
}
}
Ok(None)
}

pub fn write_plan(
&self,
change: &Operation<Key, Vec<u8>>,
change: &Operation<Key, ValuePtr>,
log: &mut LogWriter,
) -> Result<PlanOutcome> {
let tables = self.tables.upgradable_read();
Expand All @@ -509,7 +509,8 @@ impl HashColumn {
} else {
match change {
Operation::Set(key, value) => {
let (r, _, _) = self.write_plan_new(tables, reindex, key, value, log)?;
let (r, _, _) =
self.write_plan_new(tables, reindex, key, value.as_ref(), log)?;
Ok(r)
},
Operation::Dereference(key) => {
Expand All @@ -534,7 +535,7 @@ impl HashColumn {
fn write_plan_existing(
&self,
tables: &Tables,
change: &Operation<Key, Vec<u8>>,
change: &Operation<Key, ValuePtr>,
log: &mut LogWriter,
index: &IndexTable,
sub_index: usize,
Expand Down Expand Up @@ -645,7 +646,7 @@ impl HashColumn {
if record.table.index_bits() < tables.index.id.index_bits() {
// Insertion into a previously dropped index.
log::warn!( target: "parity-db", "Index {} is too old. Current is {}", record.table, tables.index.id);
return Err(Error::Corruption("Unexpected log index id".to_string()))
return Err(Error::Corruption("Unexpected log index id".to_string()));
}
// Re-launch previously started reindex
// TODO: add explicit log records for reindexing events.
Expand All @@ -656,15 +657,15 @@ impl HashColumn {
);
let lock = Self::trigger_reindex(tables, reindex, self.path.as_path());
std::mem::drop(lock);
return self.validate_plan(LogAction::InsertIndex(record), log)
return self.validate_plan(LogAction::InsertIndex(record), log);
}
},
LogAction::InsertValue(record) => {
tables.value[record.table.size_tier() as usize].validate_plan(record.index, log)?;
},
_ => {
log::error!(target: "parity-db", "Unexpected log action");
return Err(Error::Corruption("Unexpected log action".to_string()))
return Err(Error::Corruption("Unexpected log action".to_string()));
},
}
Ok(())
Expand Down Expand Up @@ -708,8 +709,9 @@ impl HashColumn {
pub fn iter_while(&self, log: &Log, mut f: impl FnMut(IterState) -> bool) -> Result<()> {
let action = |state| match state {
IterStateOrCorrupted::Item(item) => Ok(f(item)),
IterStateOrCorrupted::Corrupted(..) =>
Err(Error::Corruption("Missing indexed value".into())),
IterStateOrCorrupted::Corrupted(..) => {
Err(Error::Corruption("Missing indexed value".into()))
},
};
self.iter_while_inner(log, action, 0, true)
}
Expand All @@ -736,7 +738,7 @@ impl HashColumn {
if let Ok(value) = self.compression.decompress(&value) {
value
} else {
return false
return false;
}
} else {
value
Expand All @@ -759,7 +761,7 @@ impl HashColumn {
let entries = source.entries(c, log.overlays())?;
for entry in entries.iter() {
if entry.is_empty() {
continue
continue;
}
let (size_tier, offset) = if self.db_version >= 4 {
let address = entry.address(source.id.index_bits());
Expand All @@ -772,21 +774,21 @@ impl HashColumn {
(size_tier, offset)
};

if skip_preimage_indexes &&
self.preimage && size_tier as usize != tables.value.len() - 1
if skip_preimage_indexes
&& self.preimage && size_tier as usize != tables.value.len() - 1
{
continue
continue;
}
let value = tables.value[size_tier as usize].get_with_meta(offset, log.overlays());
let (value, rc, pk, compressed) = match value {
Ok(Some(v)) => v,
Ok(None) => {
f(IterStateOrCorrupted::Corrupted(*entry, None))?;
continue
continue;
},
Err(e) => {
f(IterStateOrCorrupted::Corrupted(*entry, Some(e)))?;
continue
continue;
},
};
let mut key = source.recover_key_prefix(c, *entry);
Expand All @@ -804,7 +806,7 @@ impl HashColumn {
let state =
IterStateOrCorrupted::Item(IterState { chunk_index: c, key, rc, value });
if !f(state)? {
return Ok(())
return Ok(());
}
}
}
Expand All @@ -824,7 +826,7 @@ impl HashColumn {
|state| match state {
IterStateOrCorrupted::Item(IterState { chunk_index, key, rc, value }) => {
if Some(chunk_index) == end_chunk {
return Ok(false)
return Ok(false);
}
if chunk_index % step == 0 {
log::info!(target: "parity-db", "Chunk iteration at {}", chunk_index);
Expand Down Expand Up @@ -883,7 +885,7 @@ impl HashColumn {
let entries = source.entries(source_index, log.overlays())?;
for entry in entries.iter() {
if entry.is_empty() {
continue
continue;
}
// We only need key prefix to reindex.
let key = source.recover_key_prefix(source_index, *entry);
Expand Down Expand Up @@ -911,7 +913,7 @@ impl HashColumn {
table.unwrap().drop_file()?;
} else {
log::warn!(target: "parity-db", "Dropping invalid index {}", id);
return Ok(())
return Ok(());
}
log::debug!(target: "parity-db", "Dropped {}", id);
Ok(())
Expand Down Expand Up @@ -948,7 +950,7 @@ impl Column {
};

match change {
Operation::Reference(_) =>
Operation::Reference(_) => {
if ref_counted {
log::trace!(target: "parity-db", "{}: Increment ref {}", tables.col, key);
tables.tables[tier].write_inc_ref(address.offset(), log)?;
Expand All @@ -958,16 +960,17 @@ impl Column {
Ok((Some(PlanOutcome::Written), None))
} else {
Ok((Some(PlanOutcome::Skipped), None))
},
}
},
Operation::Set(_, val) => {
if ref_counted {
log::trace!(target: "parity-db", "{}: Increment ref {}", tables.col, key);
tables.tables[tier].write_inc_ref(address.offset(), log)?;
return Ok((Some(PlanOutcome::Written), None))
return Ok((Some(PlanOutcome::Written), None));
}
if tables.preimage {
// Replace is not supported
return Ok((Some(PlanOutcome::Skipped), None))
return Ok((Some(PlanOutcome::Skipped), None));
}

let (cval, target_tier) =
Expand Down
Loading

0 comments on commit 43331db

Please sign in to comment.