Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Keep all enacted blocks notify in order #8524

Merged
merged 17 commits into from
May 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ethcore/private-tx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ use ethcore::executed::{Executed};
use transaction::{SignedTransaction, Transaction, Action, UnverifiedTransaction};
use ethcore::{contract_address as ethcore_contract_address};
use ethcore::client::{
Client, ChainNotify, ChainMessageType, ClientIoMessage, BlockId, CallContract
Client, ChainNotify, ChainRoute, ChainMessageType, ClientIoMessage, BlockId, CallContract
};
use ethcore::account_provider::AccountProvider;
use ethcore::miner::{self, Miner, MinerService};
Expand Down Expand Up @@ -668,7 +668,7 @@ fn find_account_password(passwords: &Vec<String>, account_provider: &AccountProv
}

impl ChainNotify for Provider {
fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: Duration) {
fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _route: ChainRoute, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: Duration) {
if !imported.is_empty() {
trace!("New blocks imported, try to prune the queue");
if let Err(err) = self.process_queue() {
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/blockchain/import_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use ethereum_types::H256;
use blockchain::block_info::{BlockInfo, BlockLocation};

/// Import route for newly inserted block.
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct ImportRoute {
/// Blocks that were invalidated by new block.
pub retracted: Vec<H256>,
Expand Down
88 changes: 86 additions & 2 deletions ethcore/src/client/chain_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
use bytes::Bytes;
use ethereum_types::H256;
use transaction::UnverifiedTransaction;
use blockchain::ImportRoute;
use std::time::Duration;
use std::collections::HashMap;

/// Messages to broadcast via chain
pub enum ChainMessageType {
Expand All @@ -29,15 +31,97 @@ pub enum ChainMessageType {
SignedPrivateTransaction(Vec<u8>),
}

/// Route type to indicate whether it is enacted or retracted.
#[derive(Clone)]
pub enum ChainRouteType {
/// Enacted block
Enacted,
/// Retracted block
Retracted
}

/// A complete chain enacted retracted route.
#[derive(Default, Clone)]
pub struct ChainRoute {
route: Vec<(H256, ChainRouteType)>,
enacted: Vec<H256>,
retracted: Vec<H256>,
}

impl<'a> From<&'a [ImportRoute]> for ChainRoute {
fn from(import_results: &'a [ImportRoute]) -> ChainRoute {
ChainRoute::new(import_results.iter().flat_map(|route| {
route.retracted.iter().map(|h| (*h, ChainRouteType::Retracted))
.chain(route.enacted.iter().map(|h| (*h, ChainRouteType::Enacted)))
}).collect())
}
}

impl ChainRoute {
/// Create a new ChainRoute based on block hash and route type pairs.
pub fn new(route: Vec<(H256, ChainRouteType)>) -> Self {
let (enacted, retracted) = Self::to_enacted_retracted(&route);

Self { route, enacted, retracted }
}

/// Gather all non-duplicate enacted and retracted blocks.
fn to_enacted_retracted(route: &[(H256, ChainRouteType)]) -> (Vec<H256>, Vec<H256>) {
fn map_to_vec(map: Vec<(H256, bool)>) -> Vec<H256> {
map.into_iter().map(|(k, _v)| k).collect()
}

// Because we are doing multiple inserts some of the blocks that were enacted in import `k`
// could be retracted in import `k+1`. This is why to understand if after all inserts
// the block is enacted or retracted we iterate over all routes and at the end final state
// will be in the hashmap
let map = route.iter().fold(HashMap::new(), |mut map, route| {
match &route.1 {
&ChainRouteType::Enacted => {
map.insert(route.0, true);
},
&ChainRouteType::Retracted => {
map.insert(route.0, false);
},
}
map
});

// Split to enacted retracted (using hashmap value)
let (enacted, retracted) = map.into_iter().partition(|&(_k, v)| v);
// And convert tuples to keys
(map_to_vec(enacted), map_to_vec(retracted))
}

/// Consume route and return the enacted retracted form.
pub fn into_enacted_retracted(self) -> (Vec<H256>, Vec<H256>) {
(self.enacted, self.retracted)
}

/// All non-duplicate enacted blocks.
pub fn enacted(&self) -> &[H256] {
&self.enacted
}

/// All non-duplicate retracted blocks.
pub fn retracted(&self) -> &[H256] {
&self.retracted
}

/// All blocks in the route.
pub fn route(&self) -> &[(H256, ChainRouteType)] {
&self.route
}
}

/// Represents what has to be handled by actor listening to chain events
pub trait ChainNotify : Send + Sync {
/// fires when chain has new blocks.
fn new_blocks(
&self,
_imported: Vec<H256>,
_invalid: Vec<H256>,
_enacted: Vec<H256>,
_retracted: Vec<H256>,
_route: ChainRoute,
_sealed: Vec<H256>,
// Block bytes.
_proposed: Vec<Bytes>,
Expand Down
49 changes: 10 additions & 39 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{HashSet, HashMap, BTreeMap, BTreeSet, VecDeque};
use std::collections::{HashSet, BTreeMap, BTreeSet, VecDeque};
use std::str::FromStr;
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
Expand Down Expand Up @@ -45,7 +45,7 @@ use client::{
use client::{
BlockId, TransactionId, UncleId, TraceId, ClientConfig, BlockChainClient,
TraceFilter, CallAnalytics, BlockImportError, Mode,
ChainNotify, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType
ChainNotify, ChainRoute, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType
};
use encoded;
use engines::{EthEngine, EpochTransition};
Expand Down Expand Up @@ -245,32 +245,6 @@ impl Importer {
})
}

fn calculate_enacted_retracted(&self, import_results: &[ImportRoute]) -> (Vec<H256>, Vec<H256>) {
fn map_to_vec(map: Vec<(H256, bool)>) -> Vec<H256> {
map.into_iter().map(|(k, _v)| k).collect()
}

// In ImportRoute we get all the blocks that have been enacted and retracted by single insert.
// Because we are doing multiple inserts some of the blocks that were enacted in import `k`
// could be retracted in import `k+1`. This is why to understand if after all inserts
// the block is enacted or retracted we iterate over all routes and at the end final state
// will be in the hashmap
let map = import_results.iter().fold(HashMap::new(), |mut map, route| {
for hash in &route.enacted {
map.insert(hash.clone(), true);
}
for hash in &route.retracted {
map.insert(hash.clone(), false);
}
map
});

// Split to enacted retracted (using hashmap value)
let (enacted, retracted) = map.into_iter().partition(|&(_k, v)| v);
// And convert tuples to keys
(map_to_vec(enacted), map_to_vec(retracted))
}

/// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self, client: &Client) -> usize {

Expand Down Expand Up @@ -336,18 +310,17 @@ impl Importer {

{
if !imported_blocks.is_empty() && is_empty {
let (enacted, retracted) = self.calculate_enacted_retracted(&import_results);
let route = ChainRoute::from(import_results.as_ref());

if is_empty {
self.miner.chain_new_blocks(client, &imported_blocks, &invalid_blocks, &enacted, &retracted, false);
self.miner.chain_new_blocks(client, &imported_blocks, &invalid_blocks, route.enacted(), route.retracted(), false);
}

client.notify(|notify| {
notify.new_blocks(
imported_blocks.clone(),
invalid_blocks.clone(),
enacted.clone(),
retracted.clone(),
route.clone(),
Vec::new(),
proposed_blocks.clone(),
duration,
Expand Down Expand Up @@ -1421,7 +1394,7 @@ impl ImportBlock for Client {
}

fn import_block_with_receipts(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result<H256, BlockImportError> {
let header: Header = ::rlp::Rlp::new(&block_bytes).val_at(0)?;
let header: Header = ::rlp::Rlp::new(&block_bytes).val_at(0)?;
{
// check block order
if self.chain.read().is_known(&header.hash()) {
Expand Down Expand Up @@ -2155,14 +2128,13 @@ impl ImportSealedBlock for Client {
self.state_db.write().sync_cache(&route.enacted, &route.retracted, false);
route
};
let (enacted, retracted) = self.importer.calculate_enacted_retracted(&[route]);
self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted, true);
let route = ChainRoute::from([route].as_ref());
self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], route.enacted(), route.retracted(), true);
self.notify(|notify| {
notify.new_blocks(
vec![h.clone()],
vec![],
enacted.clone(),
retracted.clone(),
route.clone(),
vec![h.clone()],
vec![],
start.elapsed(),
Expand All @@ -2180,8 +2152,7 @@ impl BroadcastProposalBlock for Client {
notify.new_blocks(
vec![],
vec![],
vec![],
vec![],
ChainRoute::default(),
vec![],
vec![block.rlp_bytes()],
DURATION_ZERO,
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub use self::error::Error;
pub use self::evm_test_client::{EvmTestClient, EvmTestError, TransactResult};
pub use self::io_message::ClientIoMessage;
pub use self::test_client::{TestBlockChainClient, EachBlockWith};
pub use self::chain_notify::{ChainNotify, ChainMessageType};
pub use self::chain_notify::{ChainNotify, ChainRoute, ChainRouteType, ChainMessageType};
pub use self::traits::{
Nonce, Balance, ChainInfo, BlockInfo, ReopenBlock, PrepareOpenBlock, CallContract, TransactionInfo, RegistryInfo, ScheduleInfo, ImportSealedBlock, BroadcastProposalBlock, ImportBlock,
StateOrBlock, StateClient, Call, EngineInfo, AccountData, BlockChain, BlockProducer, SealedBlockImporter
Expand Down
10 changes: 4 additions & 6 deletions ethcore/src/snapshot/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! Watcher for snapshot-related chain events.

use parking_lot::Mutex;
use client::{BlockInfo, Client, ChainNotify, ClientIoMessage};
use client::{BlockInfo, Client, ChainNotify, ChainRoute, ClientIoMessage};
use ids::BlockId;

use io::IoChannel;
Expand Down Expand Up @@ -103,8 +103,7 @@ impl ChainNotify for Watcher {
&self,
imported: Vec<H256>,
_: Vec<H256>,
_: Vec<H256>,
_: Vec<H256>,
_: ChainRoute,
_: Vec<H256>,
_: Vec<Bytes>,
_duration: Duration)
Expand All @@ -131,7 +130,7 @@ impl ChainNotify for Watcher {
mod tests {
use super::{Broadcast, Oracle, Watcher};

use client::ChainNotify;
use client::{ChainNotify, ChainRoute};

use ethereum_types::{H256, U256};

Expand Down Expand Up @@ -174,8 +173,7 @@ mod tests {
watcher.new_blocks(
hashes,
vec![],
vec![],
vec![],
ChainRoute::default(),
vec![],
vec![],
DURATION_ZERO,
Expand Down
9 changes: 4 additions & 5 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, Protocol
use ethereum_types::{H256, H512, U256};
use io::{TimerToken};
use ethcore::ethstore::ethkey::Secret;
use ethcore::client::{BlockChainClient, ChainNotify, ChainMessageType};
use ethcore::client::{BlockChainClient, ChainNotify, ChainRoute, ChainMessageType};
use ethcore::snapshot::SnapshotService;
use ethcore::header::BlockNumber;
use sync_io::NetSyncIo;
Expand Down Expand Up @@ -409,8 +409,7 @@ impl ChainNotify for EthSync {
fn new_blocks(&self,
imported: Vec<H256>,
invalid: Vec<H256>,
enacted: Vec<H256>,
retracted: Vec<H256>,
route: ChainRoute,
sealed: Vec<H256>,
proposed: Vec<Bytes>,
_duration: Duration)
Expand All @@ -424,8 +423,8 @@ impl ChainNotify for EthSync {
&mut sync_io,
&imported,
&invalid,
&enacted,
&retracted,
route.enacted(),
route.retracted(),
&sealed,
&proposed);
});
Expand Down
7 changes: 4 additions & 3 deletions ethcore/sync/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use bytes::Bytes;
use network::{self, PeerId, ProtocolId, PacketId, SessionInfo};
use tests::snapshot::*;
use ethcore::client::{TestBlockChainClient, BlockChainClient, Client as EthcoreClient,
ClientConfig, ChainNotify, ChainMessageType, ClientIoMessage};
ClientConfig, ChainNotify, ChainRoute, ChainMessageType, ClientIoMessage};
use ethcore::header::BlockNumber;
use ethcore::snapshot::SnapshotService;
use ethcore::spec::Spec;
Expand Down Expand Up @@ -535,12 +535,13 @@ impl ChainNotify for EthPeer<EthcoreClient> {
fn new_blocks(&self,
imported: Vec<H256>,
invalid: Vec<H256>,
enacted: Vec<H256>,
retracted: Vec<H256>,
route: ChainRoute,
sealed: Vec<H256>,
proposed: Vec<Bytes>,
_duration: Duration)
{
let (enacted, retracted) = route.into_enacted_retracted();

self.new_blocks_queue.write().push_back(NewBlockMessage {
imported,
invalid,
Expand Down
4 changes: 2 additions & 2 deletions parity/informant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::time::{Instant, Duration};
use atty;
use ethcore::client::{
BlockId, BlockChainClient, ChainInfo, BlockInfo, BlockChainInfo,
BlockQueueInfo, ChainNotify, ClientReport, Client, ClientIoMessage
BlockQueueInfo, ChainNotify, ChainRoute, ClientReport, Client, ClientIoMessage
};
use ethcore::header::BlockNumber;
use ethcore::snapshot::{RestorationStatus, SnapshotService as SS};
Expand Down Expand Up @@ -351,7 +351,7 @@ impl<T: InformantData> Informant<T> {
}

impl ChainNotify for Informant<FullNodeInformantData> {
fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, duration: Duration) {
fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _route: ChainRoute, _sealed: Vec<H256>, _proposed: Vec<Bytes>, duration: Duration) {
let mut last_import = self.last_import.lock();
let client = &self.target.client;

Expand Down
Loading