Skip to content

Commit

Permalink
Validator proposals fix (#1154)
Browse files Browse the repository at this point in the history
* Fix issue that validator proposals were not propagated to Block::produce

* Speed up sync_state_stake_change and remove extra print
  • Loading branch information
ilblackdragon authored Aug 9, 2019
1 parent b40c2b3 commit d54d962
Show file tree
Hide file tree
Showing 12 changed files with 182 additions and 18 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 16 additions & 3 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use log::{debug, info};

use near_primitives::hash::CryptoHash;
use near_primitives::transaction::{ReceiptTransaction, TransactionResult};
use near_primitives::types::{BlockIndex, MerkleHash, ShardId};
use near_primitives::types::{BlockIndex, MerkleHash, ShardId, ValidatorStake};
use near_store::Store;

use crate::error::{Error, ErrorKind};
Expand Down Expand Up @@ -157,6 +157,7 @@ impl Chain {
.map_err(|err| ErrorKind::Other(err.to_string()))?;
store_update
.save_post_state_root(&genesis.hash(), &genesis.header.prev_state_root);
store_update.save_post_validator_proposals(&genesis.hash(), vec![]);
store_update.save_block_header(genesis.header.clone());
store_update.save_block(genesis.clone());
store_update.save_receipt(&genesis.header.hash(), vec![]);
Expand Down Expand Up @@ -532,6 +533,14 @@ impl Chain {
self.store.get_transaction_result(hash)
}

#[inline]
pub fn get_post_validator_proposals(
&mut self,
hash: &CryptoHash,
) -> Result<&Vec<ValidatorStake>, Error> {
self.store.get_post_validator_proposals(hash)
}

/// Returns underlying ChainStore.
#[inline]
pub fn store(&self) -> &ChainStore {
Expand Down Expand Up @@ -671,6 +680,11 @@ impl<'a> ChainUpdate<'a> {
)
.map_err(|e| ErrorKind::Other(e.to_string()))?;

// Save state root after applying transactions.
self.chain_store_update.save_post_state_root(&block.hash(), &state_root);
self.chain_store_update
.save_post_validator_proposals(&block.hash(), validator_proposals.clone());

// If block checks out, record validator proposals for given block.
self.runtime_adapter
.add_validator_proposals(
Expand All @@ -684,8 +698,7 @@ impl<'a> ChainUpdate<'a> {
.map_err(|err| ErrorKind::Other(err.to_string()))?;

self.chain_store_update.save_trie_changes(trie_changes);
// Save state root after applying transactions.
self.chain_store_update.save_post_state_root(&block.hash(), &state_root);

// Save resulting receipts.
// TODO: currently only taking into account one shard.
self.chain_store_update
Expand Down
52 changes: 51 additions & 1 deletion chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ use log::debug;

use near_primitives::hash::CryptoHash;
use near_primitives::transaction::{ReceiptTransaction, TransactionResult};
use near_primitives::types::{BlockIndex, MerkleHash};
use near_primitives::types::{BlockIndex, MerkleHash, ValidatorStake};
use near_primitives::utils::index_to_bytes;
use near_store::{
read_with_cache, Store, StoreUpdate, WrappedTrieChanges, COL_BLOCK, COL_BLOCK_HEADER,
COL_BLOCK_INDEX, COL_BLOCK_MISC, COL_RECEIPTS, COL_STATE_REF, COL_TRANSACTION_RESULT,
COL_VALIDATOR_PROPOSALS,
};

use crate::error::{Error, ErrorKind};
Expand Down Expand Up @@ -47,6 +48,11 @@ pub trait ChainStoreAccess {
fn get_previous_header(&mut self, header: &BlockHeader) -> Result<&BlockHeader, Error>;
/// Get state root hash after applying header with given hash.
fn get_post_state_root(&mut self, h: &CryptoHash) -> Result<&MerkleHash, Error>;
/// Get validator proposals.
fn get_post_validator_proposals(
&mut self,
h: &CryptoHash,
) -> Result<&Vec<ValidatorStake>, Error>;
/// Get block header.
fn get_block_header(&mut self, h: &CryptoHash) -> Result<&BlockHeader, Error>;
/// Returns hash of the block on the main chain for given height.
Expand All @@ -66,6 +72,8 @@ pub struct ChainStore {
blocks: SizedCache<Vec<u8>, Block>,
/// Cache with state roots.
post_state_roots: SizedCache<Vec<u8>, MerkleHash>,
/// Cache with validator proposals.
post_validator_proposals: SizedCache<Vec<u8>, Vec<ValidatorStake>>,
// Cache with index to hash on the main chain.
// block_index: SizedCache<Vec<u8>, CryptoHash>,
/// Cache with receipts.
Expand All @@ -89,6 +97,7 @@ impl ChainStore {
blocks: SizedCache::with_size(CACHE_SIZE),
headers: SizedCache::with_size(CACHE_SIZE),
post_state_roots: SizedCache::with_size(CACHE_SIZE),
post_validator_proposals: SizedCache::with_size(CACHE_SIZE),
// block_index: SizedCache::with_size(CACHE_SIZE),
receipts: SizedCache::with_size(CACHE_SIZE),
transaction_results: SizedCache::with_size(CACHE_SIZE),
Expand Down Expand Up @@ -155,6 +164,21 @@ impl ChainStoreAccess for ChainStore {
)
}

fn get_post_validator_proposals(
&mut self,
h: &CryptoHash,
) -> Result<&Vec<ValidatorStake>, Error> {
option_to_not_found(
read_with_cache(
&*self.store,
COL_VALIDATOR_PROPOSALS,
&mut self.post_validator_proposals,
h.as_ref(),
),
&format!("VALIDATOR ROOT: {}", h),
)
}

/// Get block header.
fn get_block_header(&mut self, h: &CryptoHash) -> Result<&BlockHeader, Error> {
option_to_not_found(
Expand Down Expand Up @@ -211,6 +235,7 @@ pub struct ChainStoreUpdate<'a, T> {
deleted_blocks: HashSet<CryptoHash>,
headers: HashMap<CryptoHash, BlockHeader>,
post_state_roots: HashMap<CryptoHash, MerkleHash>,
post_validator_proposals: HashMap<CryptoHash, Vec<ValidatorStake>>,
block_index: HashMap<BlockIndex, Option<CryptoHash>>,
receipts: HashMap<CryptoHash, Vec<ReceiptTransaction>>,
transaction_results: HashMap<CryptoHash, TransactionResult>,
Expand All @@ -231,6 +256,7 @@ impl<'a, T: ChainStoreAccess> ChainStoreUpdate<'a, T> {
headers: HashMap::default(),
block_index: HashMap::default(),
post_state_roots: HashMap::default(),
post_validator_proposals: HashMap::default(),
receipts: HashMap::default(),
transaction_results: HashMap::default(),
head: None,
Expand Down Expand Up @@ -315,6 +341,17 @@ impl<'a, T: ChainStoreAccess> ChainStoreAccess for ChainStoreUpdate<'a, T> {
}
}

fn get_post_validator_proposals(
&mut self,
hash: &CryptoHash,
) -> Result<&Vec<ValidatorStake>, Error> {
if let Some(post_validator_proposals) = self.post_validator_proposals.get(hash) {
Ok(post_validator_proposals)
} else {
self.chain_store.get_post_validator_proposals(hash)
}
}

/// Get block header.
fn get_block_header(&mut self, hash: &CryptoHash) -> Result<&BlockHeader, Error> {
if let Some(header) = self.headers.get(hash) {
Expand Down Expand Up @@ -410,6 +447,14 @@ impl<'a, T: ChainStoreAccess> ChainStoreUpdate<'a, T> {
self.post_state_roots.insert(*hash, *state_root);
}

pub fn save_post_validator_proposals(
&mut self,
hash: &CryptoHash,
validator_proposals: Vec<ValidatorStake>,
) {
self.post_validator_proposals.insert(*hash, validator_proposals);
}

pub fn delete_block(&mut self, hash: &CryptoHash) {
self.deleted_blocks.insert(*hash);
}
Expand Down Expand Up @@ -497,6 +542,11 @@ impl<'a, T: ChainStoreAccess> ChainStoreUpdate<'a, T> {
.set_ser(COL_STATE_REF, hash.as_ref(), &state_root)
.map_err::<Error, _>(|e| e.into())?;
}
for (hash, validator_proposals) in self.post_validator_proposals.drain() {
store_update
.set_ser(COL_VALIDATOR_PROPOSALS, hash.as_ref(), &validator_proposals)
.map_err::<Error, _>(|e| e.into())?;
}
for (height, hash) in self.block_index.drain() {
if let Some(hash) = hash {
store_update
Expand Down
22 changes: 14 additions & 8 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use actix::{
WrapFuture,
};
use chrono::{DateTime, Utc};
use futures::Future;
use log::{debug, error, info, warn};

use near_chain::{
Expand All @@ -27,6 +28,7 @@ use near_network::{
use near_pool::TransactionPool;
use near_primitives::crypto::signature::{verify, Signature};
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::rpc::ValidatorInfo;
use near_primitives::transaction::{ReceiptTransaction, SignedTransaction};
use near_primitives::types::{AccountId, BlockIndex, ShardId};
use near_primitives::unwrap_or_return;
Expand All @@ -39,8 +41,6 @@ use crate::types::{
BlockProducer, ClientConfig, Error, ShardSyncStatus, Status, StatusSyncInfo, SyncStatus,
};
use crate::{sync, StatusResponse};
use futures::Future;
use near_primitives::rpc::ValidatorInfo;

pub struct ClientActor {
config: ClientConfig,
Expand Down Expand Up @@ -675,6 +675,10 @@ impl ClientActor {
return Ok(());
}

// Get validator proposals.
let validator_proposals =
self.chain.get_post_validator_proposals(&head.last_block_hash)?.clone();

let prev_header = self.chain.get_block_header(&head.last_block_hash)?;

// Take transactions from the pool.
Expand All @@ -693,7 +697,7 @@ impl ClientActor {
epoch_hash,
transactions,
self.approvals.drain().collect(),
vec![],
validator_proposals,
block_producer.signer.clone(),
);

Expand Down Expand Up @@ -744,9 +748,9 @@ impl ClientActor {
if self.sync_status.is_syncing() {
// While syncing, we may receive blocks that are older or from next epochs.
// This leads to Old Block or EpochOutOfBounds errors.
info!(target: "client", "Error on receival of block: {}", err);
// info!(target: "client", "Error on receival of block: {}", err);
} else {
error!(target: "client", "Error on receival of block: {}", err);
// error!(target: "client", "Error on receival of block: {}", err);
}
NetworkClientResponses::NoResponse
}
Expand Down Expand Up @@ -778,7 +782,7 @@ impl ClientActor {
}
// Some error that worth surfacing.
Err(ref e) if e.is_error() => {
error!(target: "client", "Error on receival of header: {}", e);
// error!(target: "client", "Error on receival of header: {}", e);
return NetworkClientResponses::NoResponse;
}
// Got an error when trying to process the block header, but it's not due to
Expand Down Expand Up @@ -958,9 +962,11 @@ impl ClientActor {
highest_height,
&self.network_info.most_weight_peers
));
// Only body / state sync if header height is latest.
// Only body / state sync if header height is close to the latest.
let header_head = unwrap_or_run_later!(self.chain.header_head());
if header_head.height == highest_height {
if highest_height <= self.config.block_header_fetch_horizon
|| header_head.height >= highest_height - self.config.block_header_fetch_horizon
{
// Sync state if already running sync state or if block sync is too far.
let sync_state = match self.sync_status {
SyncStatus::StateSync(_, _) => true,
Expand Down
8 changes: 6 additions & 2 deletions chain/client/src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use near_primitives::serialize::to_base;
use near_telemetry::{telemetry, TelemetryActor};

use crate::types::{BlockProducer, ShardSyncStatus, SyncStatus};
use std::cmp::min;

/// A helper that prints information about current chain and reports to telemetry.
pub struct InfoHelper {
Expand Down Expand Up @@ -134,8 +135,11 @@ fn display_sync_status(sync_status: &SyncStatus, head: &Tip) -> String {
SyncStatus::AwaitingPeers => format!("#{:>8} Waiting for peers", head.height),
SyncStatus::NoSync => format!("#{:>8} {}", head.height, head.last_block_hash),
SyncStatus::HeaderSync { current_height, highest_height } => {
let percent =
if *highest_height == 0 { 0 } else { current_height * 100 / highest_height };
let percent = if *highest_height == 0 {
0
} else {
min(current_height, highest_height) * 100 / highest_height
};
format!("#{:>8} Downloading headers {}%", head.height, percent)
}
SyncStatus::BodySync { current_height, highest_height } => {
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use near_primitives::unwrap_or_return;
use crate::types::{ShardSyncStatus, SyncStatus};

/// Maximum number of block headers send over the network.
pub const MAX_BLOCK_HEADERS: u64 = 256;
pub const MAX_BLOCK_HEADERS: u64 = 512;

const BLOCK_HEADER_PROGRESS_TIMEOUT: i64 = 2;

Expand Down
4 changes: 4 additions & 0 deletions chain/client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ pub struct ClientConfig {
pub block_fetch_horizon: BlockIndex,
/// Horizon to step from the latest block when fetching state.
pub state_fetch_horizon: BlockIndex,
/// Behind this horizon header fetch kicks in.
pub block_header_fetch_horizon: BlockIndex,
}

impl ClientConfig {
Expand All @@ -124,6 +126,7 @@ impl ClientConfig {
announce_account_horizon: 5,
block_fetch_horizon: 50,
state_fetch_horizon: 5,
block_header_fetch_horizon: 50,
}
}
}
Expand All @@ -150,6 +153,7 @@ impl ClientConfig {
announce_account_horizon: 5,
block_fetch_horizon: 50,
state_fetch_horizon: 5,
block_header_fetch_horizon: 50,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ pub const COL_PEERS: Option<u32> = Some(8);
pub const COL_PROPOSALS: Option<u32> = Some(9);
pub const COL_VALIDATORS: Option<u32> = Some(10);
pub const COL_LAST_EPOCH_PROPOSALS: Option<u32> = Some(11);
const NUM_COLS: u32 = 12;
pub const COL_VALIDATOR_PROPOSALS: Option<u32> = Some(12);
const NUM_COLS: u32 = 13;

pub struct Store {
storage: Arc<dyn KeyValueDB>,
Expand Down
1 change: 1 addition & 0 deletions near/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ near-telemetry = { path = "../chain/telemetry" }

[dev-dependencies]
tempdir = "0.3"
tokio = "0.1"
1 change: 1 addition & 0 deletions near/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ impl NearConfig {
// TODO(1047): this should be adjusted depending on the speed of sync of state.
block_fetch_horizon: 50,
state_fetch_horizon: 5,
block_header_fetch_horizon: 50,
},
network_config: NetworkConfig {
public_key: network_key_pair.public_key,
Expand Down
Loading

0 comments on commit d54d962

Please sign in to comment.