Skip to content

Commit

Permalink
feat: simple flat storage (#7345)
Browse files Browse the repository at this point in the history
First step towards flat storage: #7327
Continue work here because #7295 was broken.

Here we start building it under protocol feature without adding it to nightly. The goal is to support it for localnet with one node which never sends skip approvals, so later we could extend it to more nodes and eventually implement a migration to mainnet.

Now, everyone should be able to build neard with this feature, set `max_block_production_delay` to something big and run localnet.

I currently couldn't make all tests work with enabled feature, e.g. `test_care_about_shard`, but I open it for review because I don't think it affects the idea.

## Testing

* Check that encoding and decoding value reference gives expected results. `ValueRef` can be reused in other places in code
* Check that flat state is used in regular trie handler and not used in view trie handler
* Check that after block processing, getting value from regular trie and view trie gives the same result
  • Loading branch information
Longarithm authored Aug 9, 2022
1 parent 7e7cac0 commit 1d2a9e3
Show file tree
Hide file tree
Showing 18 changed files with 316 additions and 44 deletions.
1 change: 1 addition & 0 deletions chain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ protocol_feature_chunk_only_producers = [
"near-chain-configs/protocol_feature_chunk_only_producers",
"near-primitives/protocol_feature_chunk_only_producers",
]
protocol_feature_flat_state = ["near-store/protocol_feature_flat_state"]

nightly = [
"nightly_protocol",
Expand Down
4 changes: 4 additions & 0 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2440,6 +2440,10 @@ impl<'a> ChainStoreUpdate<'a> {
| DBCol::CachedContractCode => {
unreachable!();
}
#[cfg(feature = "protocol_feature_flat_state")]
DBCol::FlatState => {
unreachable!();
}
}
self.inc_gc(col);
self.merge(store_update);
Expand Down
1 change: 1 addition & 0 deletions core/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod runtime;
pub mod sandbox_state_patch;
pub mod shard_layout;
pub mod sharding;
pub mod state;
pub mod state_part;
pub mod state_record;
pub mod syncing;
Expand Down
49 changes: 49 additions & 0 deletions core/primitives/src/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use byteorder::{LittleEndian, ReadBytesExt};
use near_primitives_core::hash::{hash, CryptoHash};
use std::io::{Cursor, Read};

/// State value reference. Used to charge fees for value length before retrieving the value itself.
pub struct ValueRef {
/// Value length in bytes.
pub length: u32,
/// Unique value hash.
pub hash: CryptoHash,
}

impl ValueRef {
/// Create serialized value reference by the value.
/// Resulting array stores 4 bytes of length and then 32 bytes of hash.
/// TODO (#7327): consider passing hash here to avoid double computation
pub fn create_serialized(value: &[u8]) -> [u8; 36] {
let mut result = [0u8; 36];
result[0..4].copy_from_slice(&(value.len() as u32).to_le_bytes());
result[4..36].copy_from_slice(&hash(value).0);
result
}

/// Decode value reference from the raw byte array.
/// TODO (#7327): use &[u8; 36] and get rid of Cursor; also check that there are no leftover bytes
pub fn decode(bytes: &[u8]) -> Result<Self, std::io::Error> {
let mut cursor = Cursor::new(bytes);
let value_length = cursor.read_u32::<LittleEndian>()?;
let mut arr = [0; 32];
cursor.read_exact(&mut arr)?;
let value_hash = CryptoHash(arr);
Ok(ValueRef { length: value_length, hash: value_hash })
}
}

#[cfg(test)]
mod tests {
use crate::state::ValueRef;
use near_primitives_core::hash::hash;

#[test]
fn test_encode_decode() {
let value = vec![1, 2, 3];
let value_ref_ser = ValueRef::create_serialized(&value);
let value_ref = ValueRef::decode(&value_ref_ser).unwrap();
assert_eq!(value_ref.length, value.len() as u32);
assert_eq!(value_ref.hash, hash(&value));
}
}
6 changes: 6 additions & 0 deletions core/primitives/src/state_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,11 @@ pub fn state_record_to_account_id(state_record: &StateRecord) -> &AccountId {
}

pub fn is_contract_code_key(key: &[u8]) -> bool {
debug_assert!(!key.is_empty());
key[0] == col::CONTRACT_CODE
}

pub fn is_delayed_receipt_key(key: &[u8]) -> bool {
debug_assert!(!key.is_empty());
key[0] == col::DELAYED_RECEIPT || key[0] == col::DELAYED_RECEIPT_INDICES
}
2 changes: 2 additions & 0 deletions core/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ no_cache = []
single_thread_rocksdb = [] # Deactivate RocksDB IO background threads
test_features = []
protocol_feature_chunk_only_producers = []
protocol_feature_flat_state = []

nightly_protocol = []
nightly = [
"nightly_protocol",
Expand Down
7 changes: 7 additions & 0 deletions core/store/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ pub enum DBCol {
/// - *Rows*: BlockShardId (BlockHash || ShardId) - 40 bytes
/// - *Column type*: StateChangesForSplitStates
StateChangesForSplitStates = 49,
/// State changes made by a chunk, used for splitting states
/// - *Rows*: serialized TrieKey (Vec<u8>)
/// - *Column type*: ValueRef
#[cfg(feature = "protocol_feature_flat_state")]
FlatState = 50,
}

impl DBCol {
Expand Down Expand Up @@ -406,6 +411,8 @@ impl fmt::Display for DBCol {
Self::EpochValidatorInfo => "epoch validator info",
Self::HeaderHashesByHeight => "header hashes indexed by their height",
Self::StateChangesForSplitStates => "state changes indexed by block hash and shard id",
#[cfg(feature = "protocol_feature_flat_state")]
Self::FlatState => "flat state",
};
write!(f, "{}", desc)
}
Expand Down
64 changes: 64 additions & 0 deletions core/store/src/flat_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//! Contains flat state optimization logic.
//!
//! The state of the contract is a key-value map, `Map<Vec<u8>, Vec<u8>>`.
//! In the database, we store this map as a trie, which allows us to construct succinct proofs that a certain key/value
//! belongs to contract's state. Using a trie has a drawback -- reading a single key/value requires traversing the trie
//! from the root, loading many nodes from the database.
//! To optimize this, we want to use flat state: alongside the trie, we store a mapping from keys to value
//! references so that, if you don't need a proof, you can do a db lookup in just two db accesses - one to get value
//! reference, one to get value itself.
/// TODO (#7327): consider inlining small values, so we could use only one db access.

#[cfg(feature = "protocol_feature_flat_state")]
use crate::DBCol;
use crate::Store;
use near_primitives::errors::StorageError;
use near_primitives::hash::CryptoHash;
use near_primitives::state::ValueRef;

/// Struct for getting value references from the flat storage.
/// Used to speed up `get` and `get_ref` trie methods.
/// It should store all trie keys for state on top of chain head, except delayed receipt keys, because they are the
/// same for each shard and they are requested only once during applying chunk.
/// TODO (#7327): implement flat state deltas to support forks
/// TODO (#7327): store on top of final head (or earlier) so updates will only go forward
#[derive(Clone)]
pub struct FlatState {
#[allow(dead_code)]
store: Store,
}

impl FlatState {
#[cfg(feature = "protocol_feature_flat_state")]
pub fn new(store: Store) -> Self {
Self { store }
}

#[allow(unused_variables)]
fn get_raw_ref(&self, key: &[u8]) -> Result<Option<Vec<u8>>, StorageError> {
#[cfg(feature = "protocol_feature_flat_state")]
return self
.store
.get(DBCol::FlatState, key)
.map_err(|_| StorageError::StorageInternalError);
#[cfg(not(feature = "protocol_feature_flat_state"))]
unreachable!();
}

/// Get value reference using raw trie key and state root. We assume that flat state contains data for this root.
/// To avoid duplication, we don't store values themselves in flat state, they are stored in `DBCol::State`. Also
/// the separation is done so we could charge users for the value length before loading the value.
/// TODO (#7327): support different roots (or block hashes).
pub fn get_ref(
&self,
_root: &CryptoHash,
key: &[u8],
) -> Result<Option<ValueRef>, StorageError> {
match self.get_raw_ref(key)? {
Some(bytes) => {
ValueRef::decode(&bytes).map(Some).map_err(|_| StorageError::StorageInternalError)
}
None => Ok(None),
}
}
}
1 change: 1 addition & 0 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub use crate::trie::{
mod columns;
mod config;
pub mod db;
pub mod flat_state;
mod metrics;
pub mod migrations;
pub mod test_utils;
Expand Down
40 changes: 25 additions & 15 deletions core/store/src/trie/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ use near_primitives::challenge::PartialState;
use near_primitives::contract::ContractCode;
use near_primitives::hash::{hash, CryptoHash};
pub use near_primitives::shard_layout::ShardUId;
use near_primitives::state::ValueRef;
use near_primitives::state_record::is_delayed_receipt_key;
use near_primitives::types::{StateRoot, StateRootNode};

use crate::flat_state::FlatState;
use crate::trie::insert_delete::NodesStorage;
use crate::trie::iterator::TrieIterator;
use crate::trie::nibble_slice::NibbleSlice;
Expand Down Expand Up @@ -403,7 +406,8 @@ impl RawTrieNodeWithSize {
}

pub struct Trie {
pub(crate) storage: Box<dyn TrieStorage>,
pub storage: Box<dyn TrieStorage>,
pub flat_state: Option<FlatState>,
}

/// Stores reference count change for some key-value pair in DB.
Expand Down Expand Up @@ -468,8 +472,8 @@ pub struct ApplyStatePartResult {
impl Trie {
pub const EMPTY_ROOT: StateRoot = StateRoot::new();

pub fn new(store: Box<dyn TrieStorage>) -> Self {
Trie { storage: store }
pub fn new(storage: Box<dyn TrieStorage>, flat_state: Option<FlatState>) -> Self {
Trie { storage, flat_state }
}

pub fn recording_reads(&self) -> Self {
Expand All @@ -480,7 +484,7 @@ impl Trie {
shard_uid: storage.shard_uid,
recorded: RefCell::new(Default::default()),
};
Trie { storage: Box::new(storage) }
Trie { storage: Box::new(storage), flat_state: None }
}

pub fn recorded_storage(&self) -> Option<PartialStorage> {
Expand All @@ -499,6 +503,7 @@ impl Trie {
recorded_storage,
visited_nodes: Default::default(),
}),
flat_state: None,
}
}

Expand Down Expand Up @@ -612,7 +617,7 @@ impl Trie {
&self,
root: &CryptoHash,
mut key: NibbleSlice<'_>,
) -> Result<Option<(u32, CryptoHash)>, StorageError> {
) -> Result<Option<ValueRef>, StorageError> {
let mut hash = *root;
loop {
let node = match self.retrieve_raw_node(&hash)? {
Expand All @@ -622,7 +627,7 @@ impl Trie {
match node {
RawTrieNode::Leaf(existing_key, value_length, value_hash) => {
if NibbleSlice::from_encoded(&existing_key).0 == key {
return Ok(Some((value_length, value_hash)));
return Ok(Some(ValueRef { length: value_length, hash: value_hash }));
} else {
return Ok(None);
}
Expand All @@ -640,7 +645,10 @@ impl Trie {
if key.is_empty() {
match value {
Some((value_length, value_hash)) => {
return Ok(Some((value_length, value_hash)));
return Ok(Some(ValueRef {
length: value_length,
hash: value_hash,
}));
}
None => return Ok(None),
}
Expand All @@ -658,18 +666,20 @@ impl Trie {
}
}

pub fn get_ref(
&self,
root: &CryptoHash,
key: &[u8],
) -> Result<Option<(u32, CryptoHash)>, StorageError> {
let key = NibbleSlice::new(key);
self.lookup(root, key)
pub fn get_ref(&self, root: &CryptoHash, key: &[u8]) -> Result<Option<ValueRef>, StorageError> {
let is_delayed = is_delayed_receipt_key(key);
match &self.flat_state {
Some(flat_state) if !is_delayed => flat_state.get_ref(root, &key),
_ => {
let key = NibbleSlice::new(key);
self.lookup(root, key)
}
}
}

pub fn get(&self, root: &CryptoHash, key: &[u8]) -> Result<Option<Vec<u8>>, StorageError> {
match self.get_ref(root, key)? {
Some((_length, hash)) => {
Some(ValueRef { hash, .. }) => {
self.storage.retrieve_raw_bytes(&hash).map(|bytes| Some(bytes.to_vec()))
}
None => Ok(None),
Expand Down
Loading

0 comments on commit 1d2a9e3

Please sign in to comment.