Skip to content

Commit

Permalink
Streamline kvdb API and upgrade rocksdb to 0.19 (#661)
Browse files Browse the repository at this point in the history
* upgrade kvdb to 0.19 and tikv-jemallocator to 0.5

* EndOnErrorIterator

* fmt

* add a warning

* exhaustive match

* kvdb: more consistent API

* kvdb: expose KeyValuePair and fmt

* kvdb: changelog amends

* kvdb: change DBKeyValue

* kvdb-rocksdb: return an error on column OOB

* no warn in size_of as before

* small cleanup
  • Loading branch information
ordian authored Aug 15, 2022
1 parent 8941d3f commit 26d712d
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 83 deletions.
31 changes: 15 additions & 16 deletions kvdb-memorydb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use kvdb::{DBOp, DBTransaction, DBValue, KeyValueDB};
use kvdb::{DBKeyValue, DBOp, DBTransaction, DBValue, KeyValueDB};
use parity_util_mem::MallocSizeOf;
use parking_lot::RwLock;
use std::{
Expand All @@ -33,23 +33,24 @@ pub fn create(num_cols: u32) -> InMemory {
InMemory { columns: RwLock::new(cols) }
}

fn invalid_column(col: u32) -> io::Error {
io::Error::new(io::ErrorKind::Other, format!("No such column family: {:?}", col))
}

impl KeyValueDB for InMemory {
fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
let columns = self.columns.read();
match columns.get(&col) {
None => Err(io::Error::new(io::ErrorKind::Other, format!("No such column family: {:?}", col))),
None => Err(invalid_column(col)),
Some(map) => Ok(map.get(key).cloned()),
}
}

fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> Option<Box<[u8]>> {
fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> io::Result<Option<DBValue>> {
let columns = self.columns.read();
match columns.get(&col) {
None => None,
Some(map) => map
.iter()
.find(|&(ref k, _)| k.starts_with(prefix))
.map(|(_, v)| v.to_vec().into_boxed_slice()),
None => Err(invalid_column(col)),
Some(map) => Ok(map.iter().find(|&(ref k, _)| k.starts_with(prefix)).map(|(_, v)| v.to_vec())),
}
}

Expand Down Expand Up @@ -90,31 +91,29 @@ impl KeyValueDB for InMemory {
Ok(())
}

fn iter<'a>(&'a self, col: u32) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a> {
fn iter<'a>(&'a self, col: u32) -> Box<dyn Iterator<Item = io::Result<DBKeyValue>> + 'a> {
match self.columns.read().get(&col) {
Some(map) => Box::new(
// TODO: worth optimizing at all?
map.clone()
.into_iter()
.map(|(k, v)| (k.into_boxed_slice(), v.into_boxed_slice())),
map.clone().into_iter().map(|(k, v)| Ok((k.into(), v))),
),
None => Box::new(None.into_iter()),
None => Box::new(std::iter::once(Err(invalid_column(col)))),
}
}

fn iter_with_prefix<'a>(
&'a self,
col: u32,
prefix: &'a [u8],
) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a> {
) -> Box<dyn Iterator<Item = io::Result<DBKeyValue>> + 'a> {
match self.columns.read().get(&col) {
Some(map) => Box::new(
map.clone()
.into_iter()
.filter(move |&(ref k, _)| k.starts_with(prefix))
.map(|(k, v)| (k.into_boxed_slice(), v.into_boxed_slice())),
.map(|(k, v)| Ok((k.into(), v))),
),
None => Box::new(None.into_iter()),
None => Box::new(std::iter::once(Err(invalid_column(col)))),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions kvdb-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ parity-util-mem = { path = "../parity-util-mem", version = "0.11", default-featu
[target.'cfg(any(target_os = "openbsd", target_env = "msvc"))'.dependencies.rocksdb]
default-features = false
features = ["snappy"]
version = "0.18.0"
version = "0.19.0"

[target.'cfg(not(any(target_os = "openbsd", target_env = "msvc")))'.dependencies.rocksdb]
default-features = false
features = ["snappy", "jemalloc"]
version = "0.18.0"
version = "0.19.0"

[dev-dependencies]
alloc_counter = "0.0.4"
Expand Down
2 changes: 1 addition & 1 deletion kvdb-rocksdb/benches/bench_read_perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ fn iter(c: &mut Criterion) {
let (alloc_stats, _) = count_alloc(|| {
let start = Instant::now();
for _ in 0..iterations {
black_box(db.iter(0).next().unwrap());
black_box(db.iter(0).next().unwrap().unwrap());
}
elapsed = start.elapsed();
});
Expand Down
72 changes: 58 additions & 14 deletions kvdb-rocksdb/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,80 @@
//! To work around this we set an upper bound to the prefix successor.
//! See https://github.com/facebook/rocksdb/wiki/Prefix-Seek-API-Changes for details.
use crate::DBAndColumns;
use crate::{other_io_err, DBAndColumns, DBKeyValue};
use rocksdb::{DBIterator, Direction, IteratorMode, ReadOptions};
use std::io;

/// A tuple holding key and value data, used as the iterator item type.
pub type KeyValuePair = (Box<[u8]>, Box<[u8]>);

/// Instantiate iterators yielding `KeyValuePair`s.
/// Instantiate iterators yielding `io::Result<DBKeyValue>`s.
pub trait IterationHandler {
type Iterator: Iterator<Item = KeyValuePair>;
type Iterator: Iterator<Item = io::Result<DBKeyValue>>;

/// Create an `Iterator` over a `ColumnFamily` corresponding to the passed index. Takes
/// `ReadOptions` to allow configuration of the new iterator (see
/// https://github.com/facebook/rocksdb/blob/master/include/rocksdb/options.h#L1169).
fn iter(&self, col: u32, read_opts: ReadOptions) -> Self::Iterator;
fn iter(self, col: u32, read_opts: ReadOptions) -> Self::Iterator;
/// Create an `Iterator` over a `ColumnFamily` corresponding to the passed index. Takes
/// `ReadOptions` to allow configuration of the new iterator (see
/// https://github.com/facebook/rocksdb/blob/master/include/rocksdb/options.h#L1169).
/// The `Iterator` iterates over keys which start with the provided `prefix`.
fn iter_with_prefix(&self, col: u32, prefix: &[u8], read_opts: ReadOptions) -> Self::Iterator;
fn iter_with_prefix(self, col: u32, prefix: &[u8], read_opts: ReadOptions) -> Self::Iterator;
}

impl<'a> IterationHandler for &'a DBAndColumns {
type Iterator = DBIterator<'a>;
type Iterator = EitherIter<KvdbAdapter<DBIterator<'a>>, std::iter::Once<io::Result<DBKeyValue>>>;

fn iter(self, col: u32, read_opts: ReadOptions) -> Self::Iterator {
match self.cf(col as usize) {
Ok(cf) => EitherIter::A(KvdbAdapter(self.db.iterator_cf_opt(cf, read_opts, IteratorMode::Start))),
Err(e) => EitherIter::B(std::iter::once(Err(e))),
}
}

fn iter_with_prefix(self, col: u32, prefix: &[u8], read_opts: ReadOptions) -> Self::Iterator {
match self.cf(col as usize) {
Ok(cf) => EitherIter::A(KvdbAdapter(self.db.iterator_cf_opt(
cf,
read_opts,
IteratorMode::From(prefix, Direction::Forward),
))),
Err(e) => EitherIter::B(std::iter::once(Err(e))),
}
}
}

fn iter(&self, col: u32, read_opts: ReadOptions) -> Self::Iterator {
self.db.iterator_cf_opt(self.cf(col as usize), read_opts, IteratorMode::Start)
/// Small enum to avoid boxing iterators.
pub enum EitherIter<A, B> {
A(A),
B(B),
}

impl<A, B, I> Iterator for EitherIter<A, B>
where
A: Iterator<Item = I>,
B: Iterator<Item = I>,
{
type Item = I;

fn next(&mut self) -> Option<Self::Item> {
match self {
Self::A(a) => a.next(),
Self::B(b) => b.next(),
}
}
}

/// A simple wrapper that adheres to the `kvdb` interface.
pub struct KvdbAdapter<T>(T);

impl<T> Iterator for KvdbAdapter<T>
where
T: Iterator<Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>>,
{
type Item = io::Result<DBKeyValue>;

fn iter_with_prefix(&self, col: u32, prefix: &[u8], read_opts: ReadOptions) -> Self::Iterator {
self.db
.iterator_cf_opt(self.cf(col as usize), read_opts, IteratorMode::From(prefix, Direction::Forward))
fn next(&mut self) -> Option<Self::Item> {
self.0
.next()
.map(|r| r.map_err(other_io_err).map(|(k, v)| (k.into_vec().into(), v.into())))
}
}
74 changes: 44 additions & 30 deletions kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ use rocksdb::{
BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Error, Options, ReadOptions, WriteBatch, WriteOptions, DB,
};

use crate::iter::KeyValuePair;
use kvdb::{DBOp, DBTransaction, DBValue, KeyValueDB};
use kvdb::{DBKeyValue, DBOp, DBTransaction, DBValue, KeyValueDB};
use log::warn;

#[cfg(target_os = "linux")]
Expand All @@ -40,6 +39,10 @@ where
io::Error::new(io::ErrorKind::Other, e)
}

fn invalid_column(col: u32) -> io::Error {
other_io_err(format!("No such column family: {:?}", col))
}

// Used for memory budget.
type MiB = usize;

Expand Down Expand Up @@ -254,11 +257,12 @@ impl MallocSizeOf for DBAndColumns {
fn size_of(&self, ops: &mut parity_util_mem::MallocSizeOfOps) -> usize {
let mut total = self.column_names.size_of(ops)
// we have at least one column always, so we can call property on it
+ self.db
.property_int_value_cf(self.cf(0), "rocksdb.block-cache-usage")
+ self.cf(0).map(|cf| self.db
.property_int_value_cf(cf, "rocksdb.block-cache-usage")
.unwrap_or(Some(0))
.map(|x| x as usize)
.unwrap_or(0);
.unwrap_or(0)
).unwrap_or(0);

for v in 0..self.column_names.len() {
total += self.static_property_or_warn(v, "rocksdb.estimate-table-readers-mem");
Expand All @@ -270,14 +274,22 @@ impl MallocSizeOf for DBAndColumns {
}

impl DBAndColumns {
fn cf(&self, i: usize) -> &ColumnFamily {
fn cf(&self, i: usize) -> io::Result<&ColumnFamily> {
let name = self.column_names.get(i).ok_or_else(|| invalid_column(i as u32))?;
self.db
.cf_handle(&self.column_names[i])
.expect("the specified column name is correct; qed")
.cf_handle(&name)
.ok_or_else(|| other_io_err(format!("invalid column name: {name}")))
}

fn static_property_or_warn(&self, col: usize, prop: &str) -> usize {
match self.db.property_int_value_cf(self.cf(col), prop) {
let cf = match self.cf(col) {
Ok(cf) => cf,
Err(_) => {
warn!("RocksDB column index out of range: {}", col);
return 0
},
};
match self.db.property_int_value_cf(cf, prop) {
Ok(Some(v)) => v as usize,
_ => {
warn!("Cannot read expected static property of RocksDb database: {}", prop);
Expand Down Expand Up @@ -513,7 +525,8 @@ impl Database {
let mut stats_total_bytes = 0;

for op in ops {
let cf = cfs.cf(op.col() as usize);
let col = op.col();
let cf = cfs.cf(col as usize)?;

match op {
DBOp::Insert { col: _, key, value } => {
Expand All @@ -531,13 +544,9 @@ impl Database {
let end_range = end_prefix.unwrap_or_else(|| vec![u8::max_value(); 16]);
batch.delete_range_cf(cf, &prefix[..], &end_range[..]);
if no_end {
use crate::iter::IterationHandler as _;

let prefix = if prefix.len() > end_range.len() { &prefix[..] } else { &end_range[..] };
// We call `iter_with_prefix` directly on `cfs` to avoid taking a lock twice
// See https://github.com/paritytech/parity-common/pull/396.
let read_opts = generate_read_options();
for (key, _) in cfs.iter_with_prefix(col, prefix, read_opts) {
for result in self.iter_with_prefix(col, prefix) {
let (key, _) = result?;
batch.delete_cf(cf, &key[..]);
}
}
Expand All @@ -552,13 +561,11 @@ impl Database {
/// Get value by key.
pub fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
let cfs = &self.inner;
if cfs.column_names.get(col as usize).is_none() {
return Err(other_io_err("column index is out of bounds"))
}
let cf = cfs.cf(col as usize)?;
self.stats.tally_reads(1);
let value = cfs
.db
.get_pinned_cf_opt(cfs.cf(col as usize), key, &self.read_opts)
.get_pinned_cf_opt(cf, key, &self.read_opts)
.map(|r| r.map(|v| v.to_vec()))
.map_err(other_io_err);

Expand All @@ -572,28 +579,31 @@ impl Database {
}

/// Get value by partial key. Prefix size should match configured prefix size.
pub fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> Option<Box<[u8]>> {
self.iter_with_prefix(col, prefix).next().map(|(_, v)| v)
pub fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> io::Result<Option<DBValue>> {
self.iter_with_prefix(col, prefix)
.next()
.transpose()
.map(|m| m.map(|(_k, v)| v))
}

/// Iterator over the data in the given database column index.
/// Will hold a lock until the iterator is dropped
/// preventing the database from being closed.
pub fn iter<'a>(&'a self, col: u32) -> impl Iterator<Item = KeyValuePair> + 'a {
pub fn iter<'a>(&'a self, col: u32) -> impl Iterator<Item = io::Result<DBKeyValue>> + 'a {
let read_opts = generate_read_options();
iter::IterationHandler::iter(&&self.inner, col, read_opts)
iter::IterationHandler::iter(&self.inner, col, read_opts)
}

/// Iterator over data in the `col` database column index matching the given prefix.
/// Will hold a lock until the iterator is dropped
/// preventing the database from being closed.
fn iter_with_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> impl Iterator<Item = iter::KeyValuePair> + 'a {
fn iter_with_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> impl Iterator<Item = io::Result<DBKeyValue>> + 'a {
let mut read_opts = generate_read_options();
// rocksdb doesn't work with an empty upper bound
if let Some(end_prefix) = kvdb::end_prefix(prefix) {
read_opts.set_iterate_upper_bound(end_prefix);
}
iter::IterationHandler::iter_with_prefix(&&self.inner, col, prefix, read_opts)
iter::IterationHandler::iter_with_prefix(&self.inner, col, prefix, read_opts)
}

/// The number of column families in the db.
Expand All @@ -605,7 +615,7 @@ impl Database {
pub fn num_keys(&self, col: u32) -> io::Result<u64> {
const ESTIMATE_NUM_KEYS: &str = "rocksdb.estimate-num-keys";
let cfs = &self.inner;
let cf = cfs.cf(col as usize);
let cf = cfs.cf(col as usize)?;
match cfs.db.property_int_value_cf(cf, ESTIMATE_NUM_KEYS) {
Ok(estimate) => Ok(estimate.unwrap_or_default()),
Err(err_string) => Err(other_io_err(err_string)),
Expand Down Expand Up @@ -673,20 +683,24 @@ impl KeyValueDB for Database {
Database::get(self, col, key)
}

fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> Option<Box<[u8]>> {
fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> io::Result<Option<DBValue>> {
Database::get_by_prefix(self, col, prefix)
}

fn write(&self, transaction: DBTransaction) -> io::Result<()> {
Database::write(self, transaction)
}

fn iter<'a>(&'a self, col: u32) -> Box<dyn Iterator<Item = KeyValuePair> + 'a> {
fn iter<'a>(&'a self, col: u32) -> Box<dyn Iterator<Item = io::Result<DBKeyValue>> + 'a> {
let unboxed = Database::iter(self, col);
Box::new(unboxed.into_iter())
}

fn iter_with_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> Box<dyn Iterator<Item = KeyValuePair> + 'a> {
fn iter_with_prefix<'a>(
&'a self,
col: u32,
prefix: &'a [u8],
) -> Box<dyn Iterator<Item = io::Result<DBKeyValue>> + 'a> {
let unboxed = Database::iter_with_prefix(self, col, prefix);
Box::new(unboxed.into_iter())
}
Expand Down
Loading

0 comments on commit 26d712d

Please sign in to comment.