Skip to content

Commit

Permalink
fix(store): Fix refcount logic slowness
Browse files Browse the repository at this point in the history
Use rocksdb merge operator for ColState. No longer need to atomically
read + write on update.

Fixes #3065

Test plan
---------
sanity test
manually check performance
  • Loading branch information
mikhailOK committed Aug 10, 2020
1 parent 7ee16b5 commit 30c04f7
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 31 deletions.
173 changes: 164 additions & 9 deletions core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use std::sync::RwLock;
use borsh::{BorshDeserialize, BorshSerialize};

use rocksdb::{
BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Direction, IteratorMode, Options,
ReadOptions, WriteBatch, DB,
BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Direction, IteratorMode,
MergeOperands, Options, ReadOptions, WriteBatch, DB,
};
use strum_macros::EnumIter;

use crate::trie::merge_refcounted_records;
use near_primitives::version::DbVersion;
use rocksdb::compaction_filter::Decision;
use std::marker::PhantomPinned;

#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -210,6 +212,7 @@ pub struct DBTransaction {

pub enum DBOp {
Insert { col: DBCol, key: Vec<u8>, value: Vec<u8> },
UpdateRefcount { col: DBCol, key: Vec<u8>, value: Vec<u8> },
Delete { col: DBCol, key: Vec<u8> },
}

Expand All @@ -222,6 +225,19 @@ impl DBTransaction {
});
}

pub fn update_refcount<K: AsRef<[u8]>, V: AsRef<[u8]>>(
&mut self,
col: DBCol,
key: K,
value: V,
) {
self.ops.push(DBOp::UpdateRefcount {
col,
key: key.as_ref().to_owned(),
value: value.as_ref().to_owned(),
});
}

pub fn delete<K: AsRef<[u8]>>(&mut self, col: DBCol, key: K) {
self.ops.push(DBOp::Delete { col, key: key.as_ref().to_owned() });
}
Expand Down Expand Up @@ -259,15 +275,16 @@ pub trait Database: Sync + Send {
impl Database for RocksDB {
fn get(&self, col: DBCol, key: &[u8]) -> Result<Option<Vec<u8>>, DBError> {
let read_options = rocksdb_read_options();
unsafe { Ok(self.db.get_cf_opt(&*self.cfs[col as usize], key, &read_options)?) }
let result = self.db.get_cf_opt(unsafe { &*self.cfs[col as usize] }, key, &read_options)?;
Ok(RocksDB::empty_value_filtering(col, result))
}

fn iter<'a>(&'a self, col: DBCol) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a> {
let read_options = rocksdb_read_options();
unsafe {
let cf_handle = &*self.cfs[col as usize];
let iterator = self.db.iterator_cf_opt(cf_handle, read_options, IteratorMode::Start);
Box::new(iterator)
RocksDB::iter_empty_value_filtering(col, iterator)
}
}

Expand All @@ -292,7 +309,7 @@ impl Database for RocksDB {
IteratorMode::From(key_prefix, Direction::Forward),
)
.take_while(move |(key, _value)| key.starts_with(key_prefix));
Box::new(iterator)
RocksDB::iter_empty_value_filtering(col, iterator)
}
}

Expand All @@ -303,6 +320,9 @@ impl Database for RocksDB {
DBOp::Insert { col, key, value } => unsafe {
batch.put_cf(&*self.cfs[col as usize], key, value);
},
DBOp::UpdateRefcount { col, key, value } => unsafe {
batch.merge_cf(&*self.cfs[col as usize], key, value);
},
DBOp::Delete { col, key } => unsafe {
batch.delete_cf(&*self.cfs[col as usize], key);
},
Expand Down Expand Up @@ -338,6 +358,15 @@ impl Database for TestDB {
for op in transaction.ops {
match op {
DBOp::Insert { col, key, value } => db[col as usize].insert(key, value),
DBOp::UpdateRefcount { col, key, value } => {
let mut val = db[col as usize].get(&key).cloned().unwrap_or_default();
merge_refcounted_records(&mut val, &value).unwrap();
if val.len() != 0 {
db[col as usize].insert(key, val)
} else {
db[col as usize].remove(&key)
}
}
DBOp::Delete { col, key } => db[col as usize].remove(&key),
};
}
Expand Down Expand Up @@ -380,13 +409,17 @@ fn rocksdb_block_based_options() -> BlockBasedOptions {
block_opts
}

fn rocksdb_column_options() -> Options {
fn rocksdb_column_options(col_index: usize) -> Options {
let mut opts = Options::default();
opts.set_level_compaction_dynamic_level_bytes(true);
opts.set_block_based_table_factory(&rocksdb_block_based_options());
opts.optimize_level_style_compaction(1024 * 1024 * 128);
opts.set_target_file_size_base(1024 * 1024 * 64);
opts.set_compression_per_level(&[]);
if col_index == DBCol::ColState as usize {
opts.set_merge_operator("refcount merge", RocksDB::refcount_merge, None);
opts.set_compaction_filter("empty value filter", RocksDB::empty_value_filter);
}
opts
}

Expand Down Expand Up @@ -415,9 +448,9 @@ impl RocksDB {
pub fn new<P: AsRef<std::path::Path>>(path: P) -> Result<Self, DBError> {
let options = rocksdb_options();
let cf_names: Vec<_> = (0..NUM_COLS).map(|col| format!("col{}", col)).collect();
let cf_descriptors = cf_names
.iter()
.map(|cf_name| ColumnFamilyDescriptor::new(cf_name, rocksdb_column_options()));
let cf_descriptors = cf_names.iter().enumerate().map(|(col_index, cf_name)| {
ColumnFamilyDescriptor::new(cf_name, rocksdb_column_options(col_index))
});
let db = DB::open_cf_descriptors(&options, path, cf_descriptors)?;
let cfs =
cf_names.iter().map(|n| db.cf_handle(n).unwrap() as *const ColumnFamily).collect();
Expand All @@ -431,3 +464,125 @@ impl TestDB {
Self { db: RwLock::new(db) }
}
}

impl RocksDB {
/// ColState has refcounted values.
/// Merge adds refcounts, zero refcount becomes empty value.
/// Empty values get filtered by get methods, and removed by compaction.
fn refcount_merge(
_new_key: &[u8],
existing_val: Option<&[u8]>,
operands: &mut MergeOperands,
) -> Option<Vec<u8>> {
let mut result = vec![];
if let Some(val) = existing_val {
// Error is only possible if decoding refcount fails (=value is between 1 and 3 bytes)
merge_refcounted_records(&mut result, val).unwrap();
}
for val in operands {
// Error is only possible if decoding refcount fails (=value is between 1 and 3 bytes)
merge_refcounted_records(&mut result, val).unwrap();
}
Some(result)
}

fn empty_value_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision {
if value.len() == 0 {
Decision::Remove
} else {
Decision::Keep
}
}

fn empty_value_filtering(column: DBCol, value: Option<Vec<u8>>) -> Option<Vec<u8>> {
if column == DBCol::ColState && Some(vec![]) == value {
None
} else {
value
}
}

fn iter_empty_value_filtering<'a, I>(
column: DBCol,
iterator: I,
) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a>
where
I: Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a,
{
if column == DBCol::ColState {
Box::new(iterator.filter(|(_k, v)| v.len() != 0))
} else {
Box::new(iterator)
}
}
}

#[cfg(test)]
mod tests {
use crate::db::DBCol::ColState;
use crate::db::{rocksdb_read_options, DBError, Database, RocksDB};
use crate::{create_store, DBCol};

impl RocksDB {
fn compact(&self, col: DBCol) {
self.db.compact_range_cf::<&[u8], &[u8]>(
unsafe { &*self.cfs[col as usize] },
None,
None,
);
}

fn get_no_empty_filtering(
&self,
col: DBCol,
key: &[u8],
) -> Result<Option<Vec<u8>>, DBError> {
let read_options = rocksdb_read_options();
let result =
self.db.get_cf_opt(unsafe { &*self.cfs[col as usize] }, key, &read_options)?;
Ok(result)
}
}

#[test]
fn rocksdb_merge_sanity() {
let tmp_dir = tempfile::Builder::new().prefix("_test_snapshot_sanity").tempdir().unwrap();
let store = create_store(tmp_dir.path().to_str().unwrap());
assert_eq!(store.get(ColState, &[1]).unwrap(), None);
{
let mut store_update = store.store_update();
store_update.update_refcount(ColState, &[1], &[1, 1, 0, 0, 0]);
store_update.commit().unwrap();
}
{
let mut store_update = store.store_update();
store_update.update_refcount(ColState, &[1], &[1, 1, 0, 0, 0]);
store_update.commit().unwrap();
}
assert_eq!(store.get(ColState, &[1]).unwrap(), Some(vec![1, 2, 0, 0, 0]));
{
let mut store_update = store.store_update();
store_update.update_refcount(ColState, &[1], &[1, 255, 255, 255, 255]);
store_update.commit().unwrap();
}
assert_eq!(store.get(ColState, &[1]).unwrap(), Some(vec![1, 1, 0, 0, 0]));
{
let mut store_update = store.store_update();
store_update.update_refcount(ColState, &[1], &[1, 255, 255, 255, 255]);
store_update.commit().unwrap();
}
// Refcount goes to 0 -> get() returns None
assert_eq!(store.get(ColState, &[1]).unwrap(), None);
let ptr = (&*store.storage) as *const (dyn Database + 'static);
let rocksdb = unsafe { &*(ptr as *const RocksDB) };
// Internally there is an empty value
assert_eq!(rocksdb.get_no_empty_filtering(ColState, &[1]).unwrap(), Some(vec![]));

rocksdb.compact(ColState);
rocksdb.compact(ColState);

// After compaction the empty value disappears
assert_eq!(rocksdb.get_no_empty_filtering(ColState, &[1]).unwrap(), None);
assert_eq!(store.get(ColState, &[1]).unwrap(), None);
}
}
11 changes: 11 additions & 0 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ impl StoreUpdate {
StoreUpdate { storage, transaction, tries: Some(tries) }
}

pub fn update_refcount(&mut self, column: DBCol, key: &[u8], value: &[u8]) {
self.transaction.update_refcount(column, key, value)
}

pub fn set(&mut self, column: DBCol, key: &[u8], value: &[u8]) {
self.transaction.put(column, key, value)
}
Expand Down Expand Up @@ -195,6 +199,9 @@ impl StoreUpdate {
match op {
DBOp::Insert { col, key, value } => self.transaction.put(col, &key, &value),
DBOp::Delete { col, key } => self.transaction.delete(col, &key),
DBOp::UpdateRefcount { col, key, value } => {
self.transaction.update_refcount(col, &key, &value)
}
}
}
}
Expand All @@ -209,6 +216,7 @@ impl StoreUpdate {
.map(|op| match op {
DBOp::Insert { col, key, .. } => (*col as u8, key),
DBOp::Delete { col, key } => (*col as u8, key),
DBOp::UpdateRefcount { col, key, .. } => (*col as u8, key),
})
.collect::<std::collections::HashSet<_>>()
.len(),
Expand All @@ -232,6 +240,9 @@ impl fmt::Debug for StoreUpdate {
for op in self.transaction.ops.iter() {
match op {
DBOp::Insert { col, key, .. } => writeln!(f, " + {:?} {}", col, to_base(key))?,
DBOp::UpdateRefcount { col, key, .. } => {
writeln!(f, " +- {:?} {}", col, to_base(key))?
}
DBOp::Delete { col, key } => writeln!(f, " - {:?} {}", col, to_base(key))?,
}
}
Expand Down
9 changes: 5 additions & 4 deletions core/store/src/trie/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::trie::insert_delete::NodesStorage;
use crate::trie::iterator::TrieIterator;
use crate::trie::nibble_slice::NibbleSlice;
pub use crate::trie::shard_tries::{KeyForStateChanges, ShardTries, WrappedTrieChanges};
pub(crate) use crate::trie::trie_storage::merge_refcounted_records;
use crate::trie::trie_storage::{
TouchedNodesCounter, TrieCachingStorage, TrieMemoryPartialStorage, TrieRecordingStorage,
TrieStorage,
Expand Down Expand Up @@ -395,21 +396,21 @@ impl RawTrieNodeWithSize {
}
}

fn encode_trie_node_with_rc(data: &[u8], rc: u32) -> Vec<u8> {
fn encode_trie_node_with_rc(data: &[u8], rc: i32) -> Vec<u8> {
let mut cursor = Cursor::new(Vec::with_capacity(data.len() + 4));
cursor.write_all(data).unwrap();
cursor.write_u32::<LittleEndian>(rc).unwrap();
cursor.write_i32::<LittleEndian>(rc).unwrap();
cursor.into_inner()
}

fn decode_trie_node_with_rc(bytes: &[u8]) -> Result<(&[u8], u32), StorageError> {
fn decode_trie_node_with_rc(bytes: &[u8]) -> Result<(&[u8], i32), StorageError> {
if bytes.len() < 4 {
return Err(StorageError::StorageInconsistentState(
"Decode node with RC failed".to_string(),
));
}
let mut cursor = Cursor::new(&bytes[bytes.len() - 4..]);
let rc = cursor.read_u32::<LittleEndian>().unwrap();
let rc = cursor.read_i32::<LittleEndian>().unwrap();
Ok((&bytes[..bytes.len() - 4], rc))
}

Expand Down
19 changes: 4 additions & 15 deletions core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,10 @@ impl ShardTries {
store_update: &mut StoreUpdate,
) -> Result<(), StorageError> {
store_update.tries = Some(tries.clone());
let trie = tries.get_trie_for_shard(shard_id);
let storage = trie.storage.as_caching_storage().expect("Must be caching storage");
for (hash, value, rc) in deletions.iter() {
let storage_rc = storage.retrieve_rc(&hash)?;
assert!(*rc <= storage_rc);
let key = TrieCachingStorage::get_key_from_shard_id_and_hash(shard_id, hash);
if *rc < storage_rc {
let bytes = encode_trie_node_with_rc(&value, storage_rc - rc);
store_update.set(DBCol::ColState, key.as_ref(), &bytes);
} else {
store_update.delete(DBCol::ColState, key.as_ref());
}
let bytes = encode_trie_node_with_rc(&value, -(*rc as i32));
store_update.update_refcount(DBCol::ColState, key.as_ref(), &bytes);
}
Ok(())
}
Expand All @@ -92,14 +84,11 @@ impl ShardTries {
shard_id: ShardId,
store_update: &mut StoreUpdate,
) -> Result<(), StorageError> {
let trie = tries.get_trie_for_shard(shard_id);
store_update.tries = Some(tries);
let storage = trie.storage.as_caching_storage().expect("Must be caching storage");
for (hash, value, rc) in insertions.iter() {
let storage_rc = storage.retrieve_rc(&hash)?;
let key = TrieCachingStorage::get_key_from_shard_id_and_hash(shard_id, hash);
let bytes = encode_trie_node_with_rc(&value, storage_rc + rc);
store_update.set(DBCol::ColState, key.as_ref(), &bytes);
let bytes = encode_trie_node_with_rc(&value, *rc as i32);
store_update.update_refcount(DBCol::ColState, key.as_ref(), &bytes);
}
Ok(())
}
Expand Down
Loading

0 comments on commit 30c04f7

Please sign in to comment.