diff --git a/bin/node/bench/src/construct.rs b/bin/node/bench/src/construct.rs index ca1a1c18f9ea9..1532e02bd3ef6 100644 --- a/bin/node/bench/src/construct.rs +++ b/bin/node/bench/src/construct.rs @@ -30,8 +30,8 @@ use std::{borrow::Cow, collections::HashMap, pin::Pin, sync::Arc}; use node_primitives::Block; use node_testing::bench::{BenchDb, BlockType, DatabaseType, KeyTypes, Profile}; use sc_transaction_pool_api::{ - ImportNotificationStream, PoolFuture, PoolStatus, ReadyTransactions, TransactionFor, - TransactionSource, TransactionStatusStreamFor, TxHash, + ImportNotificationStream, PoolFuture, PoolStatus, TransactionFor, TransactionSource, + TransactionStatusStreamFor, TxHash, }; use sp_consensus::{Environment, Proposer}; use sp_inherents::InherentDataProvider; @@ -216,19 +216,6 @@ impl sc_transaction_pool_api::InPoolTransaction for PoolTransaction { #[derive(Clone, Debug)] pub struct Transactions(Vec>); -pub struct TransactionsIterator(std::vec::IntoIter>); - -impl Iterator for TransactionsIterator { - type Item = Arc; - - fn next(&mut self) -> Option { - self.0.next() - } -} - -impl ReadyTransactions for TransactionsIterator { - fn report_invalid(&mut self, _tx: &Self::Item) {} -} impl sc_transaction_pool_api::TransactionPool for Transactions { type Block = Block; @@ -270,17 +257,16 @@ impl sc_transaction_pool_api::TransactionPool for Transactions { _at: NumberFor, ) -> Pin< Box< - dyn Future< - Output = Box> + Send>, - > + Send, + dyn Future> + Send>> + + Send, >, > { - let iter: Box> + Send> = - Box::new(TransactionsIterator(self.0.clone().into_iter())); + let iter: Box> + Send> = + Box::new(self.0.clone().into_iter()); Box::pin(futures::future::ready(iter)) } - fn ready(&self) -> Box> + Send> { + fn ready(&self) -> Box> + Send> { unimplemented!() } diff --git a/client/basic-authorship/src/basic_authorship.rs b/client/basic-authorship/src/basic_authorship.rs index bbee60ae98dcf..e38bb11688f8b 100644 --- a/client/basic-authorship/src/basic_authorship.rs +++ b/client/basic-authorship/src/basic_authorship.rs @@ -344,7 +344,7 @@ where let mut t2 = futures_timer::Delay::new(deadline.saturating_duration_since((self.now)()) / 8).fuse(); - let mut pending_iterator = select! { + let pending_iterator = select! { res = t1 => res, _ = t2 => { log::warn!( @@ -363,7 +363,7 @@ where let mut transaction_pushed = false; let mut hit_block_size_limit = false; - while let Some(pending_tx) = pending_iterator.next() { + for pending_tx in pending_iterator { if (self.now)() > deadline { debug!( "Consensus deadline reached when pushing block transactions, \ @@ -378,7 +378,6 @@ where let block_size = block_builder.estimate_block_size(self.include_proof_in_block_size_estimation); if block_size + pending_tx_data.encoded_size() > block_size_limit { - pending_iterator.report_invalid(&pending_tx); if skipped < MAX_SKIPPED_TRANSACTIONS { skipped += 1; debug!( @@ -401,7 +400,6 @@ where debug!("[{:?}] Pushed to the block.", pending_tx_hash); }, Err(ApplyExtrinsicFailed(Validity(e))) if e.exhausted_resources() => { - pending_iterator.report_invalid(&pending_tx); if skipped < MAX_SKIPPED_TRANSACTIONS { skipped += 1; debug!( @@ -414,7 +412,6 @@ where } }, Err(e) if skipped > 0 => { - pending_iterator.report_invalid(&pending_tx); trace!( "[{:?}] Ignoring invalid transaction when skipping: {}", pending_tx_hash, @@ -422,7 +419,6 @@ where ); }, Err(e) => { - pending_iterator.report_invalid(&pending_tx); debug!("[{:?}] Invalid transaction: {}", pending_tx_hash, e); unqueue_invalid.push(pending_tx_hash); }, diff --git a/client/transaction-pool/api/src/lib.rs b/client/transaction-pool/api/src/lib.rs index 8cfb538c1c6be..a907097a5d8db 100644 --- a/client/transaction-pool/api/src/lib.rs +++ b/client/transaction-pool/api/src/lib.rs @@ -224,14 +224,13 @@ pub trait TransactionPool: Send + Sync { at: NumberFor, ) -> Pin< Box< - dyn Future< - Output = Box> + Send>, - > + Send, + dyn Future> + Send>> + + Send, >, >; /// Get an iterator for ready transactions ordered by priority. - fn ready(&self) -> Box> + Send>; + fn ready(&self) -> Box> + Send>; // *** Block production /// Remove transactions identified by given hashes (and dependent transactions) from the pool. @@ -256,27 +255,6 @@ pub trait TransactionPool: Send + Sync { fn ready_transaction(&self, hash: &TxHash) -> Option>; } -/// An iterator of ready transactions. -/// -/// The trait extends regular [`std::iter::Iterator`] trait and allows reporting -/// last-returned element as invalid. -/// -/// The implementation is then allowed, for performance reasons, to change the elements -/// returned next, by e.g. skipping elements that are known to depend on the reported -/// transaction, which yields them invalid as well. -pub trait ReadyTransactions: Iterator { - /// Report given transaction as invalid. - /// - /// This might affect subsequent elements returned by the iterator, so dependent transactions - /// are skipped for performance reasons. - fn report_invalid(&mut self, _tx: &Self::Item); -} - -/// A no-op implementation for an empty iterator. -impl ReadyTransactions for std::iter::Empty { - fn report_invalid(&mut self, _tx: &T) {} -} - /// Events that the transaction pool listens for. pub enum ChainEvent { /// New best block have been added to the chain diff --git a/client/transaction-pool/graph/Cargo.toml b/client/transaction-pool/graph/Cargo.toml new file mode 100644 index 0000000000000..b49cadc51c33c --- /dev/null +++ b/client/transaction-pool/graph/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "sc-transaction-graph" +version = "4.0.0-dev" +authors = ["Parity Technologies "] +edition = "2018" +license = "GPL-3.0-or-later WITH Classpath-exception-2.0" +homepage = "https://substrate.dev" +repository = "https://github.com/paritytech/substrate/" +description = "Generic Transaction Pool" +readme = "README.md" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[dependencies] +derive_more = "0.99.2" +thiserror = "1.0.21" +futures = "0.3.9" +log = "0.4.8" +parking_lot = "0.11.1" +serde = { version = "1.0.101", features = ["derive"] } +sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" } +sc-utils = { version = "4.0.0-dev", path = "../../utils" } +sp-core = { version = "4.0.0-dev", path = "../../../primitives/core" } +sp-runtime = { version = "4.0.0-dev", path = "../../../primitives/runtime" } +sp-transaction-pool = { version = "4.0.0-dev", path = "../../../primitives/transaction-pool" } +parity-util-mem = { version = "0.10.0", default-features = false, features = ["primitive-types"] } +linked-hash-map = "0.5.4" +retain_mut = "0.1.3" + +[dev-dependencies] +assert_matches = "1.3.0" +codec = { package = "parity-scale-codec", version = "2.0.0" } +substrate-test-runtime = { version = "2.0.0", path = "../../../test-utils/runtime" } +criterion = "0.3" + +[[bench]] +name = "basics" +harness = false diff --git a/client/transaction-pool/src/graph/base_pool.rs b/client/transaction-pool/src/graph/base_pool.rs index 2c8becdfb2f0b..890a87e82929d 100644 --- a/client/transaction-pool/src/graph/base_pool.rs +++ b/client/transaction-pool/src/graph/base_pool.rs @@ -36,7 +36,7 @@ use sp_runtime::{ use super::{ future::{FutureTransactions, WaitingTransaction}, - ready::{BestIterator, ReadyTransactions}, + ready::ReadyTransactions, }; /// Successful import result. @@ -355,7 +355,7 @@ impl BasePool BestIterator { + pub fn ready(&self) -> impl Iterator>> { self.ready.get() } diff --git a/client/transaction-pool/src/graph/ready.rs b/client/transaction-pool/src/graph/ready.rs index 99a034689ccd0..03689aeb32e6d 100644 --- a/client/transaction-pool/src/graph/ready.rs +++ b/client/transaction-pool/src/graph/ready.rs @@ -23,7 +23,7 @@ use std::{ sync::Arc, }; -use log::{debug, trace}; +use log::trace; use sc_transaction_pool_api::error; use serde::Serialize; use sp_runtime::{traits::Member, transaction_validity::TransactionTag as Tag}; @@ -156,16 +156,11 @@ impl ReadyTransactions { /// - transactions that are valid for a shorter time go first /// 4. Lastly we sort by the time in the queue /// - transactions that are longer in the queue go first - /// - /// The iterator is providing a way to report transactions that the receiver considers invalid. - /// In such case the entire subgraph of transactions that depend on the reported one will be - /// skipped. - pub fn get(&self) -> BestIterator { + pub fn get(&self) -> impl Iterator>> { BestIterator { all: self.ready.clone(), best: self.best.clone(), awaiting: Default::default(), - invalid: Default::default(), } } @@ -487,7 +482,6 @@ pub struct BestIterator { all: ReadOnlyTrackedMap>, awaiting: HashMap)>, best: BTreeSet>, - invalid: HashSet, } impl BestIterator { @@ -504,34 +498,6 @@ impl BestIterator { } } -impl sc_transaction_pool_api::ReadyTransactions - for BestIterator -{ - fn report_invalid(&mut self, tx: &Self::Item) { - BestIterator::report_invalid(self, tx) - } -} - -impl BestIterator { - /// Report given transaction as invalid. - /// - /// As a consequence, all values that depend on the invalid one will be skipped. - /// When given transaction is not in the pool it has no effect. - /// When invoked on a fully drained iterator it has no effect either. - pub fn report_invalid(&mut self, tx: &Arc>) { - if let Some(to_report) = self.all.read().get(&tx.hash) { - debug!( - target: "txpool", - "[{:?}] Reported as invalid. Will skip sub-chains while iterating.", - to_report.transaction.transaction.hash - ); - for hash in &to_report.unlocks { - self.invalid.insert(hash.clone()); - } - } - } -} - impl Iterator for BestIterator { type Item = Arc>; @@ -539,19 +505,8 @@ impl Iterator for BestIterator { loop { let best = self.best.iter().next_back()?.clone(); let best = self.best.take(&best)?; - let hash = &best.transaction.hash; - - // Check if the transaction was marked invalid. - if self.invalid.contains(hash) { - debug!( - target: "txpool", - "[{:?}] Skipping invalid child transaction while iterating.", - hash - ); - continue - } - let next = self.all.read().get(hash).cloned(); + let next = self.all.read().get(&best.transaction.hash).cloned(); let ready = match next { Some(ready) => ready, // The transaction is not in all, maybe it was removed in the meantime? @@ -680,13 +635,10 @@ mod tests { assert_eq!(ready.get().count(), 3); } - /// Populate the pool, with a graph that looks like so: - /// - /// tx1 -> tx2 \ - /// -> -> tx3 - /// -> tx4 -> tx5 -> tx6 - /// -> tx7 - fn populate_pool(ready: &mut ReadyTransactions>) { + #[test] + fn should_return_best_transactions_in_correct_order() { + // given + let mut ready = ReadyTransactions::default(); let mut tx1 = tx(1); tx1.requires.clear(); let mut tx2 = tx(2); @@ -697,17 +649,11 @@ mod tests { tx3.provides = vec![]; let mut tx4 = tx(4); tx4.requires = vec![tx1.provides[0].clone()]; - tx4.provides = vec![vec![107]]; - let mut tx5 = tx(5); - tx5.requires = vec![tx4.provides[0].clone()]; - tx5.provides = vec![vec![108]]; - let mut tx6 = tx(6); - tx6.requires = vec![tx5.provides[0].clone()]; - tx6.provides = vec![]; - let tx7 = Transaction { - data: vec![7], + tx4.provides = vec![]; + let tx5 = Transaction { + data: vec![5], bytes: 1, - hash: 7, + hash: 5, priority: 1, valid_till: u64::MAX, // use the max here for testing. requires: vec![tx1.provides[0].clone()], @@ -717,30 +663,20 @@ mod tests { }; // when - for tx in vec![tx1, tx2, tx3, tx7, tx4, tx5, tx6] { - import(ready, tx).unwrap(); + for tx in vec![tx1, tx2, tx3, tx4, tx5] { + import(&mut ready, tx).unwrap(); } + // then assert_eq!(ready.best.len(), 1); - } - - #[test] - fn should_return_best_transactions_in_correct_order() { - // given - let mut ready = ReadyTransactions::default(); - populate_pool(&mut ready); - // when let mut it = ready.get().map(|tx| tx.data[0]); - // then assert_eq!(it.next(), Some(1)); assert_eq!(it.next(), Some(2)); assert_eq!(it.next(), Some(3)); assert_eq!(it.next(), Some(4)); assert_eq!(it.next(), Some(5)); - assert_eq!(it.next(), Some(6)); - assert_eq!(it.next(), Some(7)); assert_eq!(it.next(), None); } @@ -789,26 +725,4 @@ mod tests { TransactionRef { transaction: Arc::new(with_priority(3, 3)), insertion_id: 2 } ); } - - #[test] - fn should_skip_invalid_transactions_while_iterating() { - // given - let mut ready = ReadyTransactions::default(); - populate_pool(&mut ready); - - // when - let mut it = ready.get(); - let data = |tx: &Arc>>| tx.data[0]; - - // then - assert_eq!(it.next().as_ref().map(data), Some(1)); - assert_eq!(it.next().as_ref().map(data), Some(2)); - assert_eq!(it.next().as_ref().map(data), Some(3)); - let tx4 = it.next(); - assert_eq!(tx4.as_ref().map(data), Some(4)); - // report 4 as invalid, which should skip 5 & 6. - it.report_invalid(&tx4.unwrap()); - assert_eq!(it.next().as_ref().map(data), Some(7)); - assert_eq!(it.next().as_ref().map(data), None); - } } diff --git a/client/transaction-pool/src/graph/validated_pool.rs b/client/transaction-pool/src/graph/validated_pool.rs index dba586adc846c..e4aad7f342b5b 100644 --- a/client/transaction-pool/src/graph/validated_pool.rs +++ b/client/transaction-pool/src/graph/validated_pool.rs @@ -25,7 +25,7 @@ use std::{ use futures::channel::mpsc::{channel, Sender}; use parking_lot::{Mutex, RwLock}; use retain_mut::RetainMut; -use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions}; +use sc_transaction_pool_api::{error, PoolStatus}; use serde::Serialize; use sp_runtime::{ generic::BlockId, @@ -630,7 +630,7 @@ impl ValidatedPool { } /// Get an iterator for ready transactions ordered by priority - pub fn ready(&self) -> impl ReadyTransactions> + Send { + pub fn ready(&self) -> impl Iterator> + Send { self.pool.read().ready() } diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 4d355df22d821..6eb5bd2f332ec 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -56,8 +56,7 @@ use std::{ use graph::{ExtrinsicHash, IsValidator}; use sc_transaction_pool_api::{ ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolFuture, PoolStatus, - ReadyTransactions, TransactionFor, TransactionPool, TransactionSource, - TransactionStatusStreamFor, TxHash, + TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash, }; use sp_core::traits::SpawnEssentialNamed; use sp_runtime::{ @@ -70,7 +69,7 @@ use crate::metrics::MetricsLink as PrometheusMetrics; use prometheus_endpoint::Registry as PrometheusRegistry; type BoxedReadyIterator = - Box>> + Send>; + Box>> + Send>; type ReadyIteratorFor = BoxedReadyIterator, graph::ExtrinsicFor>;