Skip to content

Commit

Permalink
State sync for shard state between epochs (a.k.a. Catchup)
Browse files Browse the repository at this point in the history
1. Initiating state download (aka Catchup) on each first block of each epoch for the *next* epoch;
2. Reusing the state sync machinery we already have (and fixing various bugs in it) to actually sync state for chunks;
3. Immediately applying state transition for the next epoch if the state is downloaded, putting block into a queue if not;
4. Processing the queue once the state is downloaded;
5. Orhpaning the block if the state still not downloaded when the next epoch starts, unorphaning on (4);

Addresses #1046
  • Loading branch information
SkidanovAlex committed Jul 29, 2019
1 parent 2372f1b commit d1627b8
Show file tree
Hide file tree
Showing 17 changed files with 1,376 additions and 255 deletions.
425 changes: 369 additions & 56 deletions chain/chain/src/chain.rs

Large diffs are not rendered by default.

114 changes: 108 additions & 6 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@ use near_primitives::transaction::{ReceiptTransaction, TransactionResult};
use near_primitives::types::{AccountId, BlockIndex, MerkleHash, ShardId};
use near_primitives::utils::index_to_bytes;
use near_store::{
read_with_cache, Store, StoreUpdate, WrappedTrieChanges, COL_BLOCK, COL_BLOCK_HEADER,
COL_BLOCK_INDEX, COL_BLOCK_MISC, COL_CHUNKS, COL_CHUNK_ONE_PARTS, COL_INCOMING_RECEIPTS,
COL_OUTGOING_RECEIPTS, COL_STATE_REF, COL_TRANSACTION_RESULT,
read_with_cache, Store, StoreUpdate, WrappedTrieChanges, COL_BLOCK, COL_BLOCKS_TO_CATCHUP,
COL_BLOCK_HEADER, COL_BLOCK_INDEX, COL_BLOCK_MISC, COL_CHUNKS, COL_CHUNK_ONE_PARTS,
COL_INCOMING_RECEIPTS, COL_OUTGOING_RECEIPTS, COL_STATE_DL_INFOS, COL_STATE_REF,
COL_TRANSACTION_RESULT,
};

use crate::error::{Error, ErrorKind};
use crate::types::{Block, BlockHeader, ShardFullChunkOrOnePart, Tip};
use crate::RuntimeAdapter;
use near_primitives::serialize::Decode;
use near_primitives::sharding::{ChunkHash, ChunkOnePart, ShardChunk, ShardChunkHeader};
use std::collections::hash_map::Entry;
use std::convert::TryFrom;

const HEAD_KEY: &[u8; 4] = b"HEAD";
const TAIL_KEY: &[u8; 4] = b"TAIL";
Expand All @@ -29,6 +32,15 @@ const HEADER_HEAD_KEY: &[u8; 11] = b"HEADER_HEAD";
/// lru cache size
const CACHE_SIZE: usize = 20;

/// Contains the information that is used to sync state for shards as epochs switch
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct StateSyncInfo {
/// The first block of the epoch for which syncing is happening
pub epoch_tail_hash: CryptoHash,
/// Shards to fetch state
pub shards: Vec<(ShardId, ChunkHash)>,
}

/// Accesses the chain store. Used to create atomic editable views that can be reverted.
pub trait ChainStoreAccess {
/// Returns underlaying store.
Expand Down Expand Up @@ -79,6 +91,8 @@ pub trait ChainStoreAccess {
) -> Result<&Vec<ReceiptTransaction>, Error>;
/// Returns transaction result for given tx hash.
fn get_transaction_result(&mut self, hash: &CryptoHash) -> Result<&TransactionResult, Error>;

fn get_blocks_to_catchup(&self, prev_hash: &CryptoHash) -> Result<Vec<CryptoHash>, Error>;
}

/// All chain-related database operations.
Expand Down Expand Up @@ -130,6 +144,15 @@ impl ChainStore {
pub fn store_update(&mut self) -> ChainStoreUpdate<Self> {
ChainStoreUpdate::new(self)
}

pub fn iterate_state_sync_infos(&self) -> Vec<(CryptoHash, StateSyncInfo)> {
self.store
.iter(COL_STATE_DL_INFOS)
.map(|(k, v)| {
(CryptoHash::try_from(k.as_ref()).unwrap(), Decode::decode(v.as_ref()).unwrap())
})
.collect()
}
}

impl ChainStoreAccess for ChainStore {
Expand Down Expand Up @@ -203,7 +226,10 @@ impl ChainStoreAccess for ChainStore {
let chunk_hash = chunk_header.chunk_hash();
if me.as_ref().map_or_else(
|| false,
|me| runtime_adapter.cares_about_shard(me, parent_hash, shard_id),
|me| {
runtime_adapter.cares_about_shard(me, parent_hash, shard_id)
|| runtime_adapter.will_care_about_shard(me, parent_hash, shard_id)
},
) {
let entry = self.chunks.entry(chunk_hash.clone());
match entry {
Expand Down Expand Up @@ -247,7 +273,10 @@ impl ChainStoreAccess for ChainStore {
ret.push(ShardFullChunkOrOnePart::NoChunk);
} else if me.as_ref().map_or_else(
|| false,
|me| runtime_adapter.cares_about_shard(&me, parent_hash, shard_id),
|me| {
runtime_adapter.cares_about_shard(&me, parent_hash, shard_id)
|| runtime_adapter.will_care_about_shard(&me, parent_hash, shard_id)
},
) {
ret.push(ShardFullChunkOrOnePart::FullChunk(
self.chunks.get(&chunk_header.chunk_hash()).unwrap(),
Expand Down Expand Up @@ -349,9 +378,13 @@ impl ChainStoreAccess for ChainStore {
&format!("TRANSACTION: {}", hash),
)
}

fn get_blocks_to_catchup(&self, hash: &CryptoHash) -> Result<Vec<CryptoHash>, Error> {
Ok(self.store.get_ser(COL_BLOCKS_TO_CATCHUP, hash.as_ref())?.unwrap_or_else(|| vec![]))
}
}

/// Provides layer to update chain without touching underlaying database.
/// Provides layer to update chain without touching the underlying database.
/// This serves few purposes, main one is that even if executable exists/fails during update the database is in consistent state.
pub struct ChainStoreUpdate<'a, T> {
chain_store: &'a mut T,
Expand All @@ -370,6 +403,10 @@ pub struct ChainStoreUpdate<'a, T> {
header_head: Option<Tip>,
sync_head: Option<Tip>,
trie_changes: Option<WrappedTrieChanges>,
add_blocks_to_catchup: Vec<(CryptoHash, CryptoHash)>,
remove_blocks_to_catchup: Vec<CryptoHash>,
add_state_dl_infos: Vec<StateSyncInfo>,
remove_state_dl_infos: Vec<CryptoHash>,
}

impl<'a, T: ChainStoreAccess> ChainStoreUpdate<'a, T> {
Expand All @@ -390,6 +427,10 @@ impl<'a, T: ChainStoreAccess> ChainStoreUpdate<'a, T> {
header_head: None,
sync_head: None,
trie_changes: None,
add_blocks_to_catchup: vec![],
remove_blocks_to_catchup: vec![],
add_state_dl_infos: vec![],
remove_state_dl_infos: vec![],
}
}

Expand Down Expand Up @@ -556,6 +597,14 @@ impl<'a, T: ChainStoreAccess> ChainStoreAccess for ChainStoreUpdate<'a, T> {
fn get_chunk(&mut self, header: &ShardChunkHeader) -> Result<&ShardChunk, Error> {
self.chain_store.get_chunk(header)
}

fn get_blocks_to_catchup(&self, prev_hash: &CryptoHash) -> Result<Vec<CryptoHash>, Error> {
// Make sure we never request a block to catchup after altering the data structure
assert_eq!(self.add_blocks_to_catchup.len(), 0);
assert_eq!(self.remove_blocks_to_catchup.len(), 0);

self.chain_store.get_blocks_to_catchup(prev_hash)
}
}

impl<'a, T: ChainStoreAccess> ChainStoreUpdate<'a, T> {
Expand Down Expand Up @@ -685,6 +734,21 @@ impl<'a, T: ChainStoreAccess> ChainStoreUpdate<'a, T> {
self.trie_changes = Some(trie_changes);
}

pub fn add_block_to_catchup(&mut self, prev_hash: CryptoHash, block_hash: CryptoHash) {
self.add_blocks_to_catchup.push((prev_hash, block_hash));
}

pub fn remove_block_to_catchup(&mut self, prev_hash: CryptoHash) {
self.remove_blocks_to_catchup.push(prev_hash);
}

pub fn add_state_dl_info(&mut self, info: StateSyncInfo) {
self.add_state_dl_infos.push(info);
}
pub fn remove_state_dl_info(&mut self, hash: CryptoHash) {
self.remove_state_dl_infos.push(hash);
}

/// Merge another StoreUpdate into this one
pub fn merge(&mut self, store_update: StoreUpdate) {
self.store_updates.push(store_update);
Expand Down Expand Up @@ -758,6 +822,44 @@ impl<'a, T: ChainStoreAccess> ChainStoreUpdate<'a, T> {
.map_err(|err| ErrorKind::Other(err.to_string()))?;
// TODO: save deletions separately for garbage collection.
}
let mut affected_catchup_blocks = HashSet::new();
for hash in self.remove_blocks_to_catchup {
assert!(!affected_catchup_blocks.contains(&hash));
if affected_catchup_blocks.contains(&hash) {
return Err(ErrorKind::Other(
"Multiple changes to the store affect the same catchup block".to_string(),
)
.into());
}
affected_catchup_blocks.insert(hash);

store_update.delete(COL_BLOCKS_TO_CATCHUP, hash.as_ref());
}
for (prev_hash, new_hash) in self.add_blocks_to_catchup {
assert!(!affected_catchup_blocks.contains(&prev_hash));
if affected_catchup_blocks.contains(&prev_hash) {
return Err(ErrorKind::Other(
"Multiple changes to the store affect the same catchup block".to_string(),
)
.into());
}
affected_catchup_blocks.insert(prev_hash);

let mut prev_table =
self.chain_store.get_blocks_to_catchup(&prev_hash).unwrap_or(vec![]);
prev_table.push(new_hash);
store_update.set_ser(COL_BLOCKS_TO_CATCHUP, prev_hash.as_ref(), &prev_table)?;
}
for state_dl_info in self.add_state_dl_infos {
store_update.set_ser(
COL_STATE_DL_INFOS,
state_dl_info.epoch_tail_hash.as_ref(),
&state_dl_info,
)?;
}
for hash in self.remove_state_dl_infos {
store_update.delete(COL_STATE_DL_INFOS, hash.as_ref());
}
for other in self.store_updates {
store_update.merge(other);
}
Expand Down
Loading

0 comments on commit d1627b8

Please sign in to comment.