Skip to content

Commit

Permalink
fix(gc): address slowness in forks cleaning (#3121)
Browse files Browse the repository at this point in the history
Currently we always iterate from `tail` to `gc_stop_height` when cleaning forks, which is needlessly slow especially at the epoch beginning. This caused validators to potentially miss blocks at the beginning of an epoch when epoch length is 43k. To address this, we change it so that we only go through all heights in epoch once. More specifically, we maintain a fork tail that represents the height of the current fork cleaning process and when `gc_stop_height` changes, it changes to `gc_stop_height`. Otherwise it decreases monotonically from `gc_stop_height` to `tail`. To make this fully work, we also need to prevent blocks with height smaller than `gc_stop_height` from being accepted, which I find no problem with.

Test plan
----------
* existing gc tests.
* `test_block_height_too_old`.
  • Loading branch information
bowenwang1996 committed Aug 11, 2020
1 parent c36b59c commit 8be8a00
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 26 deletions.
36 changes: 32 additions & 4 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ pub struct BlockEconomicsConfig {
pub max_gas_price: Balance,
}

/// Maximum number of height to go through at each step when cleaning forks during garbage collection.
const GC_FORK_CLEAN_STEP: u64 = 1000;

enum ApplyChunksMode {
ThisEpoch,
NextEpoch,
Expand Down Expand Up @@ -604,20 +607,38 @@ impl Chain {
_ => return Err(e),
},
};

if gc_stop_height > head.height {
return Err(ErrorKind::GCError(
"gc_stop_height cannot be larger than head.height".into(),
)
.into());
}
let prev_epoch_id = self.get_block_header(&head.prev_block_hash)?.epoch_id();
let epoch_change = prev_epoch_id != &head.epoch_id;
let fork_tail = if epoch_change {
// if head doesn't change on the epoch boundary, we may update fork tail several times
// but that is fine since it doesn't affect correctness and also we limit the number of
// heights that fork cleaning goes through so it doesn't slow down client either.
let mut chain_store_update = self.store.store_update();
chain_store_update.update_fork_tail(gc_stop_height);
chain_store_update.commit()?;
gc_stop_height
} else {
self.store.fork_tail()?
};
let mut gc_blocks_remaining = gc_blocks_limit;

// Forks Cleaning
for height in tail..gc_stop_height {
let stop_height = std::cmp::max(tail, fork_tail.saturating_sub(GC_FORK_CLEAN_STEP));
for height in (stop_height..fork_tail).rev() {
self.clear_forks_data(tries.clone(), height, &mut gc_blocks_remaining)?;
if gc_blocks_remaining == 0 {
return Ok(());
}
self.clear_forks_data(tries.clone(), height, &mut gc_blocks_remaining)?;
let mut chain_store_update = self.store.store_update();
chain_store_update.update_fork_tail(height);
chain_store_update.commit()?;
}

// Canonical Chain Clearing
Expand Down Expand Up @@ -2947,6 +2968,7 @@ impl<'a> ChainUpdate<'a> {
let prev_gas_price = prev.gas_price();
let prev_epoch_id = prev.epoch_id().clone();
let prev_random_value = *prev.random_value();
let prev_height = prev.height();

// Block is an orphan if we do not know about the previous full block.
if !is_next && !self.chain_store_update.block_exists(&prev_hash)? {
Expand All @@ -2955,8 +2977,14 @@ impl<'a> ChainUpdate<'a> {

// A heuristic to prevent block height to jump too fast towards BlockHeight::max and cause
// overflow-related problems
if block.header().height() > head.height + self.epoch_length * 20 {
return Err(ErrorKind::InvalidBlockHeight.into());
let block_height = block.header().height();
if block_height > head.height + self.epoch_length * 20 {
return Err(ErrorKind::InvalidBlockHeight(block_height).into());
}

// Do not accept old forks
if prev_height < self.runtime_adapter.get_gc_stop_height(&head.last_block_hash)? {
return Err(ErrorKind::InvalidBlockHeight(prev_height).into());
}

let (is_caught_up, needs_to_start_fetching_state) =
Expand Down
8 changes: 4 additions & 4 deletions chain/chain/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use near_primitives::errors::{EpochError, StorageError};
use near_primitives::hash::CryptoHash;
use near_primitives::serialize::to_base;
use near_primitives::sharding::{ChunkHash, ShardChunkHeader};
use near_primitives::types::ShardId;
use near_primitives::types::{BlockHeight, ShardId};

#[derive(Debug)]
pub struct Error {
Expand Down Expand Up @@ -42,8 +42,8 @@ pub enum ErrorKind {
#[fail(display = "Invalid Block Time: Too far in the future: {}", _0)]
InvalidBlockFutureTime(DateTime<Utc>),
/// Block height is invalid (not previous + 1).
#[fail(display = "Invalid Block Height")]
InvalidBlockHeight,
#[fail(display = "Invalid Block Height {}", _0)]
InvalidBlockHeight(BlockHeight),
/// Invalid block proposed signature.
#[fail(display = "Invalid Block Proposer Signature")]
InvalidBlockProposer,
Expand Down Expand Up @@ -240,7 +240,7 @@ impl Error {
| ErrorKind::DBNotFoundErr(_) => false,
ErrorKind::InvalidBlockPastTime(_, _)
| ErrorKind::InvalidBlockFutureTime(_)
| ErrorKind::InvalidBlockHeight
| ErrorKind::InvalidBlockHeight(_)
| ErrorKind::InvalidBlockProposer
| ErrorKind::InvalidChunk
| ErrorKind::InvalidChunkProofs(_)
Expand Down
42 changes: 37 additions & 5 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use near_store::{
ColReceiptIdToShardId, ColState, ColStateChanges, ColStateDlInfos, ColStateHeaders,
ColStateParts, ColTransactionRefCount, ColTransactionResult, ColTransactions, ColTrieChanges,
DBCol, KeyForStateChanges, ShardTries, Store, StoreUpdate, TrieChanges, WrappedTrieChanges,
CHUNK_TAIL_KEY, HEADER_HEAD_KEY, HEAD_KEY, LARGEST_TARGET_HEIGHT_KEY, LATEST_KNOWN_KEY,
SHOULD_COL_GC, SYNC_HEAD_KEY, TAIL_KEY,
CHUNK_TAIL_KEY, FORK_TAIL_KEY, HEADER_HEAD_KEY, HEAD_KEY, LARGEST_TARGET_HEIGHT_KEY,
LATEST_KNOWN_KEY, SHOULD_COL_GC, SYNC_HEAD_KEY, TAIL_KEY,
};

use crate::byzantine_assert;
Expand Down Expand Up @@ -85,6 +85,8 @@ pub trait ChainStoreAccess {
fn tail(&self) -> Result<BlockHeight, Error>;
/// The chain Chunks Tail height.
fn chunk_tail(&self) -> Result<BlockHeight, Error>;
/// Tail height of the fork cleaning process.
fn fork_tail(&self) -> Result<BlockHeight, Error>;
/// Head of the header chain (not the same thing as head_header).
fn header_head(&self) -> Result<Tip, Error>;
/// The "sync" head: last header we received from syncing.
Expand Down Expand Up @@ -182,8 +184,9 @@ pub trait ChainStoreAccess {
hash = *header.prev_hash();
header = self.get_block_header(&hash)?;
}
if header.height() < height {
return Err(ErrorKind::InvalidBlockHeight.into());
let header_height = header.height();
if header_height < height {
return Err(ErrorKind::InvalidBlockHeight(header_height).into());
}
self.get_block_header(&hash)
}
Expand Down Expand Up @@ -527,6 +530,13 @@ impl ChainStoreAccess for ChainStore {
.map_err(|e| e.into())
}

fn fork_tail(&self) -> Result<BlockHeight, Error> {
self.store
.get_ser(ColBlockMisc, FORK_TAIL_KEY)
.map(|option| option.unwrap_or_else(|| self.genesis_height))
.map_err(|e| e.into())
}

/// The "sync" head: last header we received from syncing.
fn sync_head(&self) -> Result<Tip, Error> {
option_to_not_found(self.store.get_ser(ColBlockMisc, SYNC_HEAD_KEY), "SYNC_HEAD")
Expand Down Expand Up @@ -1138,6 +1148,7 @@ pub struct ChainStoreUpdate<'a> {
head: Option<Tip>,
tail: Option<BlockHeight>,
chunk_tail: Option<BlockHeight>,
fork_tail: Option<BlockHeight>,
header_head: Option<Tip>,
sync_head: Option<Tip>,
largest_target_height: Option<BlockHeight>,
Expand All @@ -1161,6 +1172,7 @@ impl<'a> ChainStoreUpdate<'a> {
head: None,
tail: None,
chunk_tail: None,
fork_tail: None,
header_head: None,
sync_head: None,
largest_target_height: None,
Expand Down Expand Up @@ -1249,6 +1261,15 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
}
}

/// Fork tail used by GC
fn fork_tail(&self) -> Result<BlockHeight, Error> {
if let Some(fork_tail) = &self.fork_tail {
Ok(fork_tail.clone())
} else {
self.chain_store.fork_tail()
}
}

/// The "sync" head: last header we received from syncing.
fn sync_head(&self) -> Result<Tip, Error> {
if let Some(sync_head) = &self.sync_head {
Expand Down Expand Up @@ -1932,18 +1953,28 @@ impl<'a> ChainStoreUpdate<'a> {
pub fn reset_tail(&mut self) {
self.tail = None;
self.chunk_tail = None;
self.fork_tail = None;
}

pub fn update_tail(&mut self, height: BlockHeight) {
self.tail = Some(height);
let genesis_height = self.get_genesis_height();
let chunk_tail = self.chunk_tail().unwrap_or_else(|_| genesis_height);
// When fork tail is behind tail, it doesn't hurt to set it to tail for consistency.
if self.fork_tail.unwrap_or(genesis_height) < height {
self.fork_tail = Some(height);
}

let chunk_tail = self.chunk_tail().unwrap_or(genesis_height);
if chunk_tail == genesis_height {
// For consistency, Chunk Tail should be set if Tail is set
self.chunk_tail = Some(self.get_genesis_height());
}
}

pub fn update_fork_tail(&mut self, height: BlockHeight) {
self.fork_tail = Some(height);
}

pub fn update_chunk_tail(&mut self, height: BlockHeight) {
self.chunk_tail = Some(height);
}
Expand Down Expand Up @@ -2366,6 +2397,7 @@ impl<'a> ChainStoreUpdate<'a> {
Self::write_col_misc(&mut store_update, HEAD_KEY, &mut self.head)?;
Self::write_col_misc(&mut store_update, TAIL_KEY, &mut self.tail)?;
Self::write_col_misc(&mut store_update, CHUNK_TAIL_KEY, &mut self.chunk_tail)?;
Self::write_col_misc(&mut store_update, FORK_TAIL_KEY, &mut self.fork_tail)?;
Self::write_col_misc(&mut store_update, SYNC_HEAD_KEY, &mut self.sync_head)?;
Self::write_col_misc(&mut store_update, HEADER_HEAD_KEY, &mut self.header_head)?;
Self::write_col_misc(
Expand Down
21 changes: 18 additions & 3 deletions chain/chain/src/store_validator/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use near_primitives::utils::{get_block_shard_id, index_to_bytes};
use near_store::{
ColBlock, ColBlockHeader, ColBlockHeight, ColBlockInfo, ColBlockMisc, ColBlockPerHeight,
ColChunkExtra, ColChunkHashesByHeight, ColChunks, ColOutcomesByBlockHash, ColStateHeaders,
ColTransactionResult, DBCol, TrieChanges, TrieIterator, CHUNK_TAIL_KEY, HEADER_HEAD_KEY,
HEAD_KEY, NUM_COLS, SHOULD_COL_GC, TAIL_KEY,
ColTransactionResult, DBCol, TrieChanges, TrieIterator, CHUNK_TAIL_KEY, FORK_TAIL_KEY,
HEADER_HEAD_KEY, HEAD_KEY, NUM_COLS, SHOULD_COL_GC, TAIL_KEY,
};

use crate::StoreValidator;
Expand Down Expand Up @@ -107,6 +107,7 @@ macro_rules! unwrap_or_err_db {
pub(crate) fn head_tail_validity(sv: &mut StoreValidator) -> Result<(), StoreValidatorError> {
let mut tail = sv.config.genesis_height;
let mut chunk_tail = sv.config.genesis_height;
let mut fork_tail = sv.config.genesis_height;
let tail_db = unwrap_or_err!(
sv.store.get_ser::<BlockHeight>(ColBlockMisc, TAIL_KEY),
"Can't get Tail from storage"
Expand All @@ -115,13 +116,21 @@ pub(crate) fn head_tail_validity(sv: &mut StoreValidator) -> Result<(), StoreVal
sv.store.get_ser::<BlockHeight>(ColBlockMisc, CHUNK_TAIL_KEY),
"Can't get Chunk Tail from storage"
);
let fork_tail_db = unwrap_or_err!(
sv.store.get_ser::<BlockHeight>(ColBlockMisc, FORK_TAIL_KEY),
"Can't get Chunk Tail from storage"
);
if tail_db.is_none() && chunk_tail_db.is_some() || tail_db.is_some() && chunk_tail_db.is_none()
{
err!("Tail is {:?} and Chunk Tail is {:?}", tail_db, chunk_tail_db);
}
if tail_db.is_some() && chunk_tail_db.is_some() {
if tail_db.is_some() && fork_tail_db.is_none() {
err!("Tail is {:?} but fork tail is None", tail_db);
}
if tail_db.is_some() {
tail = tail_db.unwrap();
chunk_tail = chunk_tail_db.unwrap();
fork_tail = fork_tail_db.unwrap();
}
let head = unwrap_or_err_db!(
sv.store.get_ser::<Tip>(ColBlockMisc, HEAD_KEY),
Expand All @@ -141,6 +150,12 @@ pub(crate) fn head_tail_validity(sv: &mut StoreValidator) -> Result<(), StoreVal
if tail > head.height {
err!("tail > head.height, {:?} > {:?}", tail, head);
}
if tail > fork_tail {
err!("tail > fork_tail, {} > {}", tail, fork_tail);
}
if fork_tail > head.height {
err!("fork tail > head.height, {} > {:?}", fork_tail, head);
}
if head.height > header_head.height {
err!("head.height > header_head.height, {:?} > {:?}", tail, head);
}
Expand Down
73 changes: 65 additions & 8 deletions chain/client/tests/process_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,20 +966,25 @@ fn test_invalid_gas_price() {
}

#[test]
fn test_invalid_height() {
fn test_invalid_height_too_large() {
let mut env = TestEnv::new(ChainGenesis::test(), 1, 1);
let b1 = env.clients[0].produce_block(1).unwrap().unwrap();
let _ = env.clients[0].process_block(b1.clone(), Provenance::PRODUCED);
let signer = InMemoryValidatorSigner::from_seed("test0", KeyType::ED25519, "test0");
let b2 = Block::empty_with_height(&b1, std::u64::MAX, &signer);
let (_, tip) = env.clients[0].process_block(b2, Provenance::NONE);
match tip {
Err(e) => match e.kind() {
ErrorKind::InvalidBlockHeight => {}
_ => assert!(false, "wrong error: {}", e),
},
_ => assert!(false, "succeeded, tip: {:?}", tip),
let (_, res) = env.clients[0].process_block(b2, Provenance::NONE);
assert!(matches!(res.unwrap_err().kind(), ErrorKind::InvalidBlockHeight(_)));
}

#[test]
fn test_invalid_height_too_old() {
let mut env = TestEnv::new(ChainGenesis::test(), 1, 1);
let b1 = env.clients[0].produce_block(1).unwrap().unwrap();
for i in 2..100 {
env.produce_block(0, i);
}
let (_, res) = env.clients[0].process_block(b1, Provenance::NONE);
assert!(matches!(res.unwrap_err().kind(), ErrorKind::InvalidBlockHeight(_)));
}

#[test]
Expand Down Expand Up @@ -1014,6 +1019,11 @@ fn test_gc_with_epoch_length_common(epoch_length: NumBlocks) {
for i in 1..=epoch_length * (NUM_EPOCHS_TO_KEEP_STORE_DATA + 1) {
let block = env.clients[0].produce_block(i).unwrap().unwrap();
env.process_block(0, block.clone(), Provenance::PRODUCED);
assert!(
env.clients[0].chain.store().fork_tail().unwrap()
<= env.clients[0].chain.store().tail().unwrap()
);

blocks.push(block);
}
for i in 1..=epoch_length * (NUM_EPOCHS_TO_KEEP_STORE_DATA + 1) {
Expand Down Expand Up @@ -1233,6 +1243,53 @@ fn test_gc_after_state_sync() {
assert!(env.clients[1].chain.clear_data(tries, 2).is_ok());
}

#[test]
fn test_gc_fork_tail() {
let epoch_length = 101;
let mut genesis = Genesis::test(vec!["test0", "test1"], 1);
genesis.config.epoch_length = epoch_length;
let runtimes: Vec<Arc<dyn RuntimeAdapter>> = vec![
Arc::new(neard::NightshadeRuntime::new(
Path::new("."),
create_test_store(),
Arc::new(genesis.clone()),
vec![],
vec![],
)),
Arc::new(neard::NightshadeRuntime::new(
Path::new("."),
create_test_store(),
Arc::new(genesis.clone()),
vec![],
vec![],
)),
];
let mut chain_genesis = ChainGenesis::test();
chain_genesis.epoch_length = epoch_length;
let mut env = TestEnv::new_with_runtime(chain_genesis.clone(), 2, 1, runtimes);
let b1 = env.clients[0].produce_block(1).unwrap().unwrap();
for i in 0..2 {
env.process_block(i, b1.clone(), Provenance::NONE);
}
// create 100 forks
for i in 2..102 {
let block = env.clients[0].produce_block(i).unwrap().unwrap();
env.process_block(1, block, Provenance::NONE);
}
for i in 102..epoch_length * NUM_EPOCHS_TO_KEEP_STORE_DATA + 5 {
let block = env.clients[0].produce_block(i).unwrap().unwrap();
for j in 0..2 {
env.process_block(j, block.clone(), Provenance::NONE);
}
}
let head = env.clients[1].chain.head().unwrap();
assert!(
env.clients[1].runtime_adapter.get_gc_stop_height(&head.last_block_hash).unwrap()
> epoch_length
);
assert_eq!(env.clients[1].chain.store().fork_tail().unwrap(), 3);
}

#[test]
fn test_tx_forwarding() {
let mut chain_genesis = ChainGenesis::test();
Expand Down
1 change: 1 addition & 0 deletions core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ lazy_static! {
pub const HEAD_KEY: &[u8; 4] = b"HEAD";
pub const TAIL_KEY: &[u8; 4] = b"TAIL";
pub const CHUNK_TAIL_KEY: &[u8; 10] = b"CHUNK_TAIL";
pub const FORK_TAIL_KEY: &[u8; 9] = b"FORK_TAIL";
pub const SYNC_HEAD_KEY: &[u8; 9] = b"SYNC_HEAD";
pub const HEADER_HEAD_KEY: &[u8; 11] = b"HEADER_HEAD";
pub const LATEST_KNOWN_KEY: &[u8; 12] = b"LATEST_KNOWN";
Expand Down
4 changes: 2 additions & 2 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use cached::{Cached, SizedCache};

pub use db::DBCol::{self, *};
pub use db::{
CHUNK_TAIL_KEY, HEADER_HEAD_KEY, HEAD_KEY, LARGEST_TARGET_HEIGHT_KEY, LATEST_KNOWN_KEY,
NUM_COLS, SHOULD_COL_GC, SKIP_COL_GC, SYNC_HEAD_KEY, TAIL_KEY,
CHUNK_TAIL_KEY, FORK_TAIL_KEY, HEADER_HEAD_KEY, HEAD_KEY, LARGEST_TARGET_HEIGHT_KEY,
LATEST_KNOWN_KEY, NUM_COLS, SHOULD_COL_GC, SKIP_COL_GC, SYNC_HEAD_KEY, TAIL_KEY,
};
use near_crypto::PublicKey;
use near_primitives::account::{AccessKey, Account};
Expand Down

0 comments on commit 8be8a00

Please sign in to comment.