From 30c04f79e88588adb38c688788becdab200bfd57 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Mon, 10 Aug 2020 15:05:35 -0700 Subject: [PATCH] fix(store): Fix refcount logic slowness Use rocksdb merge operator for ColState. No longer need to atomically read + write on update. Fixes #3065 Test plan --------- sanity test manually check performance --- core/store/src/db.rs | 173 ++++++++++++++++++++++++++-- core/store/src/lib.rs | 11 ++ core/store/src/trie/mod.rs | 9 +- core/store/src/trie/shard_tries.rs | 19 +-- core/store/src/trie/trie_storage.rs | 26 ++++- 5 files changed, 207 insertions(+), 31 deletions(-) diff --git a/core/store/src/db.rs b/core/store/src/db.rs index ef92729c48b..0743e29891e 100644 --- a/core/store/src/db.rs +++ b/core/store/src/db.rs @@ -6,12 +6,14 @@ use std::sync::RwLock; use borsh::{BorshDeserialize, BorshSerialize}; use rocksdb::{ - BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Direction, IteratorMode, Options, - ReadOptions, WriteBatch, DB, + BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Direction, IteratorMode, + MergeOperands, Options, ReadOptions, WriteBatch, DB, }; use strum_macros::EnumIter; +use crate::trie::merge_refcounted_records; use near_primitives::version::DbVersion; +use rocksdb::compaction_filter::Decision; use std::marker::PhantomPinned; #[derive(Debug, Clone, PartialEq)] @@ -210,6 +212,7 @@ pub struct DBTransaction { pub enum DBOp { Insert { col: DBCol, key: Vec, value: Vec }, + UpdateRefcount { col: DBCol, key: Vec, value: Vec }, Delete { col: DBCol, key: Vec }, } @@ -222,6 +225,19 @@ impl DBTransaction { }); } + pub fn update_refcount, V: AsRef<[u8]>>( + &mut self, + col: DBCol, + key: K, + value: V, + ) { + self.ops.push(DBOp::UpdateRefcount { + col, + key: key.as_ref().to_owned(), + value: value.as_ref().to_owned(), + }); + } + pub fn delete>(&mut self, col: DBCol, key: K) { self.ops.push(DBOp::Delete { col, key: key.as_ref().to_owned() }); } @@ -259,7 +275,8 @@ pub trait Database: Sync + Send { impl Database for RocksDB { fn get(&self, col: DBCol, key: &[u8]) -> Result>, DBError> { let read_options = rocksdb_read_options(); - unsafe { Ok(self.db.get_cf_opt(&*self.cfs[col as usize], key, &read_options)?) } + let result = self.db.get_cf_opt(unsafe { &*self.cfs[col as usize] }, key, &read_options)?; + Ok(RocksDB::empty_value_filtering(col, result)) } fn iter<'a>(&'a self, col: DBCol) -> Box, Box<[u8]>)> + 'a> { @@ -267,7 +284,7 @@ impl Database for RocksDB { unsafe { let cf_handle = &*self.cfs[col as usize]; let iterator = self.db.iterator_cf_opt(cf_handle, read_options, IteratorMode::Start); - Box::new(iterator) + RocksDB::iter_empty_value_filtering(col, iterator) } } @@ -292,7 +309,7 @@ impl Database for RocksDB { IteratorMode::From(key_prefix, Direction::Forward), ) .take_while(move |(key, _value)| key.starts_with(key_prefix)); - Box::new(iterator) + RocksDB::iter_empty_value_filtering(col, iterator) } } @@ -303,6 +320,9 @@ impl Database for RocksDB { DBOp::Insert { col, key, value } => unsafe { batch.put_cf(&*self.cfs[col as usize], key, value); }, + DBOp::UpdateRefcount { col, key, value } => unsafe { + batch.merge_cf(&*self.cfs[col as usize], key, value); + }, DBOp::Delete { col, key } => unsafe { batch.delete_cf(&*self.cfs[col as usize], key); }, @@ -338,6 +358,15 @@ impl Database for TestDB { for op in transaction.ops { match op { DBOp::Insert { col, key, value } => db[col as usize].insert(key, value), + DBOp::UpdateRefcount { col, key, value } => { + let mut val = db[col as usize].get(&key).cloned().unwrap_or_default(); + merge_refcounted_records(&mut val, &value).unwrap(); + if val.len() != 0 { + db[col as usize].insert(key, val) + } else { + db[col as usize].remove(&key) + } + } DBOp::Delete { col, key } => db[col as usize].remove(&key), }; } @@ -380,13 +409,17 @@ fn rocksdb_block_based_options() -> BlockBasedOptions { block_opts } -fn rocksdb_column_options() -> Options { +fn rocksdb_column_options(col_index: usize) -> Options { let mut opts = Options::default(); opts.set_level_compaction_dynamic_level_bytes(true); opts.set_block_based_table_factory(&rocksdb_block_based_options()); opts.optimize_level_style_compaction(1024 * 1024 * 128); opts.set_target_file_size_base(1024 * 1024 * 64); opts.set_compression_per_level(&[]); + if col_index == DBCol::ColState as usize { + opts.set_merge_operator("refcount merge", RocksDB::refcount_merge, None); + opts.set_compaction_filter("empty value filter", RocksDB::empty_value_filter); + } opts } @@ -415,9 +448,9 @@ impl RocksDB { pub fn new>(path: P) -> Result { let options = rocksdb_options(); let cf_names: Vec<_> = (0..NUM_COLS).map(|col| format!("col{}", col)).collect(); - let cf_descriptors = cf_names - .iter() - .map(|cf_name| ColumnFamilyDescriptor::new(cf_name, rocksdb_column_options())); + let cf_descriptors = cf_names.iter().enumerate().map(|(col_index, cf_name)| { + ColumnFamilyDescriptor::new(cf_name, rocksdb_column_options(col_index)) + }); let db = DB::open_cf_descriptors(&options, path, cf_descriptors)?; let cfs = cf_names.iter().map(|n| db.cf_handle(n).unwrap() as *const ColumnFamily).collect(); @@ -431,3 +464,125 @@ impl TestDB { Self { db: RwLock::new(db) } } } + +impl RocksDB { + /// ColState has refcounted values. + /// Merge adds refcounts, zero refcount becomes empty value. + /// Empty values get filtered by get methods, and removed by compaction. + fn refcount_merge( + _new_key: &[u8], + existing_val: Option<&[u8]>, + operands: &mut MergeOperands, + ) -> Option> { + let mut result = vec![]; + if let Some(val) = existing_val { + // Error is only possible if decoding refcount fails (=value is between 1 and 3 bytes) + merge_refcounted_records(&mut result, val).unwrap(); + } + for val in operands { + // Error is only possible if decoding refcount fails (=value is between 1 and 3 bytes) + merge_refcounted_records(&mut result, val).unwrap(); + } + Some(result) + } + + fn empty_value_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision { + if value.len() == 0 { + Decision::Remove + } else { + Decision::Keep + } + } + + fn empty_value_filtering(column: DBCol, value: Option>) -> Option> { + if column == DBCol::ColState && Some(vec![]) == value { + None + } else { + value + } + } + + fn iter_empty_value_filtering<'a, I>( + column: DBCol, + iterator: I, + ) -> Box, Box<[u8]>)> + 'a> + where + I: Iterator, Box<[u8]>)> + 'a, + { + if column == DBCol::ColState { + Box::new(iterator.filter(|(_k, v)| v.len() != 0)) + } else { + Box::new(iterator) + } + } +} + +#[cfg(test)] +mod tests { + use crate::db::DBCol::ColState; + use crate::db::{rocksdb_read_options, DBError, Database, RocksDB}; + use crate::{create_store, DBCol}; + + impl RocksDB { + fn compact(&self, col: DBCol) { + self.db.compact_range_cf::<&[u8], &[u8]>( + unsafe { &*self.cfs[col as usize] }, + None, + None, + ); + } + + fn get_no_empty_filtering( + &self, + col: DBCol, + key: &[u8], + ) -> Result>, DBError> { + let read_options = rocksdb_read_options(); + let result = + self.db.get_cf_opt(unsafe { &*self.cfs[col as usize] }, key, &read_options)?; + Ok(result) + } + } + + #[test] + fn rocksdb_merge_sanity() { + let tmp_dir = tempfile::Builder::new().prefix("_test_snapshot_sanity").tempdir().unwrap(); + let store = create_store(tmp_dir.path().to_str().unwrap()); + assert_eq!(store.get(ColState, &[1]).unwrap(), None); + { + let mut store_update = store.store_update(); + store_update.update_refcount(ColState, &[1], &[1, 1, 0, 0, 0]); + store_update.commit().unwrap(); + } + { + let mut store_update = store.store_update(); + store_update.update_refcount(ColState, &[1], &[1, 1, 0, 0, 0]); + store_update.commit().unwrap(); + } + assert_eq!(store.get(ColState, &[1]).unwrap(), Some(vec![1, 2, 0, 0, 0])); + { + let mut store_update = store.store_update(); + store_update.update_refcount(ColState, &[1], &[1, 255, 255, 255, 255]); + store_update.commit().unwrap(); + } + assert_eq!(store.get(ColState, &[1]).unwrap(), Some(vec![1, 1, 0, 0, 0])); + { + let mut store_update = store.store_update(); + store_update.update_refcount(ColState, &[1], &[1, 255, 255, 255, 255]); + store_update.commit().unwrap(); + } + // Refcount goes to 0 -> get() returns None + assert_eq!(store.get(ColState, &[1]).unwrap(), None); + let ptr = (&*store.storage) as *const (dyn Database + 'static); + let rocksdb = unsafe { &*(ptr as *const RocksDB) }; + // Internally there is an empty value + assert_eq!(rocksdb.get_no_empty_filtering(ColState, &[1]).unwrap(), Some(vec![])); + + rocksdb.compact(ColState); + rocksdb.compact(ColState); + + // After compaction the empty value disappears + assert_eq!(rocksdb.get_no_empty_filtering(ColState, &[1]).unwrap(), None); + assert_eq!(store.get(ColState, &[1]).unwrap(), None); + } +} diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index ec20d074a30..396f296fa19 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -154,6 +154,10 @@ impl StoreUpdate { StoreUpdate { storage, transaction, tries: Some(tries) } } + pub fn update_refcount(&mut self, column: DBCol, key: &[u8], value: &[u8]) { + self.transaction.update_refcount(column, key, value) + } + pub fn set(&mut self, column: DBCol, key: &[u8], value: &[u8]) { self.transaction.put(column, key, value) } @@ -195,6 +199,9 @@ impl StoreUpdate { match op { DBOp::Insert { col, key, value } => self.transaction.put(col, &key, &value), DBOp::Delete { col, key } => self.transaction.delete(col, &key), + DBOp::UpdateRefcount { col, key, value } => { + self.transaction.update_refcount(col, &key, &value) + } } } } @@ -209,6 +216,7 @@ impl StoreUpdate { .map(|op| match op { DBOp::Insert { col, key, .. } => (*col as u8, key), DBOp::Delete { col, key } => (*col as u8, key), + DBOp::UpdateRefcount { col, key, .. } => (*col as u8, key), }) .collect::>() .len(), @@ -232,6 +240,9 @@ impl fmt::Debug for StoreUpdate { for op in self.transaction.ops.iter() { match op { DBOp::Insert { col, key, .. } => writeln!(f, " + {:?} {}", col, to_base(key))?, + DBOp::UpdateRefcount { col, key, .. } => { + writeln!(f, " +- {:?} {}", col, to_base(key))? + } DBOp::Delete { col, key } => writeln!(f, " - {:?} {}", col, to_base(key))?, } } diff --git a/core/store/src/trie/mod.rs b/core/store/src/trie/mod.rs index f4b7a1fb981..39899a4da61 100644 --- a/core/store/src/trie/mod.rs +++ b/core/store/src/trie/mod.rs @@ -17,6 +17,7 @@ use crate::trie::insert_delete::NodesStorage; use crate::trie::iterator::TrieIterator; use crate::trie::nibble_slice::NibbleSlice; pub use crate::trie::shard_tries::{KeyForStateChanges, ShardTries, WrappedTrieChanges}; +pub(crate) use crate::trie::trie_storage::merge_refcounted_records; use crate::trie::trie_storage::{ TouchedNodesCounter, TrieCachingStorage, TrieMemoryPartialStorage, TrieRecordingStorage, TrieStorage, @@ -395,21 +396,21 @@ impl RawTrieNodeWithSize { } } -fn encode_trie_node_with_rc(data: &[u8], rc: u32) -> Vec { +fn encode_trie_node_with_rc(data: &[u8], rc: i32) -> Vec { let mut cursor = Cursor::new(Vec::with_capacity(data.len() + 4)); cursor.write_all(data).unwrap(); - cursor.write_u32::(rc).unwrap(); + cursor.write_i32::(rc).unwrap(); cursor.into_inner() } -fn decode_trie_node_with_rc(bytes: &[u8]) -> Result<(&[u8], u32), StorageError> { +fn decode_trie_node_with_rc(bytes: &[u8]) -> Result<(&[u8], i32), StorageError> { if bytes.len() < 4 { return Err(StorageError::StorageInconsistentState( "Decode node with RC failed".to_string(), )); } let mut cursor = Cursor::new(&bytes[bytes.len() - 4..]); - let rc = cursor.read_u32::().unwrap(); + let rc = cursor.read_i32::().unwrap(); Ok((&bytes[..bytes.len() - 4], rc)) } diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index e2117c81bcc..9fa68542019 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -70,18 +70,10 @@ impl ShardTries { store_update: &mut StoreUpdate, ) -> Result<(), StorageError> { store_update.tries = Some(tries.clone()); - let trie = tries.get_trie_for_shard(shard_id); - let storage = trie.storage.as_caching_storage().expect("Must be caching storage"); for (hash, value, rc) in deletions.iter() { - let storage_rc = storage.retrieve_rc(&hash)?; - assert!(*rc <= storage_rc); let key = TrieCachingStorage::get_key_from_shard_id_and_hash(shard_id, hash); - if *rc < storage_rc { - let bytes = encode_trie_node_with_rc(&value, storage_rc - rc); - store_update.set(DBCol::ColState, key.as_ref(), &bytes); - } else { - store_update.delete(DBCol::ColState, key.as_ref()); - } + let bytes = encode_trie_node_with_rc(&value, -(*rc as i32)); + store_update.update_refcount(DBCol::ColState, key.as_ref(), &bytes); } Ok(()) } @@ -92,14 +84,11 @@ impl ShardTries { shard_id: ShardId, store_update: &mut StoreUpdate, ) -> Result<(), StorageError> { - let trie = tries.get_trie_for_shard(shard_id); store_update.tries = Some(tries); - let storage = trie.storage.as_caching_storage().expect("Must be caching storage"); for (hash, value, rc) in insertions.iter() { - let storage_rc = storage.retrieve_rc(&hash)?; let key = TrieCachingStorage::get_key_from_shard_id_and_hash(shard_id, hash); - let bytes = encode_trie_node_with_rc(&value, storage_rc + rc); - store_update.set(DBCol::ColState, key.as_ref(), &bytes); + let bytes = encode_trie_node_with_rc(&value, *rc as i32); + store_update.update_refcount(DBCol::ColState, key.as_ref(), &bytes); } Ok(()) } diff --git a/core/store/src/trie/trie_storage.rs b/core/store/src/trie/trie_storage.rs index c7804011d67..a17e207d5fd 100644 --- a/core/store/src/trie/trie_storage.rs +++ b/core/store/src/trie/trie_storage.rs @@ -113,16 +113,36 @@ pub struct TrieCachingStorage { pub(crate) shard_id: ShardId, } +pub fn merge_refcounted_records(result: &mut Vec, val: &[u8]) -> Result<(), StorageError> { + if val.len() == 0 { + return Ok(()); + } + let add_rc = TrieCachingStorage::vec_to_rc(val)?; + if result.len() != 0 { + let result_rc = TrieCachingStorage::vec_to_rc(result)? + add_rc; + + debug_assert_eq!(result[0..(result.len() - 4)], val[0..(val.len() - 4)]); + let len = result.len(); + result[(len - 4)..].copy_from_slice(&result_rc.to_le_bytes()); + if result_rc == 0 { + *result = vec![]; + } + } else { + *result = val.to_vec(); + } + Ok(()) +} + impl TrieCachingStorage { pub fn new(store: Arc, cache: TrieCache, shard_id: ShardId) -> TrieCachingStorage { TrieCachingStorage { store, cache, shard_id } } - fn vec_to_rc(val: &Vec) -> Result { + fn vec_to_rc(val: &[u8]) -> Result { decode_trie_node_with_rc(&val).map(|(_bytes, rc)| rc) } - fn vec_to_bytes(val: &Vec) -> Result, StorageError> { + fn vec_to_bytes(val: &[u8]) -> Result, StorageError> { decode_trie_node_with_rc(&val).map(|(bytes, _rc)| bytes.to_vec()) } @@ -147,7 +167,7 @@ impl TrieCachingStorage { /// Get storage refcount, or 0 if hash is not present /// # Errors /// StorageError::StorageInternalError if the storage fails internally. - pub fn retrieve_rc(&self, hash: &CryptoHash) -> Result { + pub fn retrieve_rc(&self, hash: &CryptoHash) -> Result { // Ignore cache to be safe. retrieve_rc is used only when writing storage and cache is shared with readers. let key = Self::get_key_from_shard_id_and_hash(self.shard_id, hash); let val = self