Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Optimize pending transactions filter #9026

Merged
merged 11 commits into from
Jul 4, 2018
49 changes: 43 additions & 6 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use std::time::{Instant, Duration};
use std::collections::{BTreeMap, HashSet, HashMap};
use std::collections::{BTreeMap, BTreeSet, HashSet, HashMap};
use std::sync::Arc;

use ansi_term::Colour;
Expand Down Expand Up @@ -851,6 +851,37 @@ impl miner::MinerService for Miner {
self.transaction_queue.all_transactions()
}

fn pending_transaction_hashes<C>(&self, chain: &C) -> BTreeSet<H256> where
C: ChainInfo + Sync,
{
let chain_info = chain.chain_info();

let from_queue = || self.transaction_queue.pending_hashes(
|sender| self.nonce_cache.read().get(sender).cloned(),
);

let from_pending = || {
self.map_existing_pending_block(|sealing| {
sealing.transactions()
.iter()
.map(|signed| signed.hash())
.collect()
}, chain_info.best_block_number)
};

match self.options.pending_set {
PendingSet::AlwaysQueue => {
from_queue()
},
PendingSet::AlwaysSealing => {
from_pending().unwrap_or_default()
},
PendingSet::SealingOrElseQueue => {
from_pending().unwrap_or_else(from_queue)
},
}
}

fn ready_transactions<C>(&self, chain: &C, max_len: usize, ordering: miner::PendingOrdering)
-> Vec<Arc<VerifiedTransaction>>
where
Expand Down Expand Up @@ -1065,8 +1096,12 @@ impl miner::MinerService for Miner {
// 2. We ignore blocks that are `invalid` because it doesn't have any meaning in terms of the transactions that
// are in those blocks

// Clear nonce cache
self.nonce_cache.write().clear();
let has_new_best_block = enacted.len() > 0;

if has_new_best_block {
// Clear nonce cache
self.nonce_cache.write().clear();
}

// First update gas limit in transaction queue and minimal gas price.
let gas_limit = *chain.best_block_header().gas_limit();
Expand All @@ -1091,10 +1126,12 @@ impl miner::MinerService for Miner {
});
}

// ...and at the end remove the old ones
self.transaction_queue.cull(client);
if has_new_best_block {
// ...and at the end remove the old ones
self.transaction_queue.cull(client);
}

if enacted.len() > 0 || (imported.len() > 0 && self.options.reseal_on_uncle) {
if has_new_best_block || (imported.len() > 0 && self.options.reseal_on_uncle) {
// Reset `next_allowed_reseal` in case a block is imported.
// Even if min_period is high, we will always attempt to create
// new pending block.
Expand Down
8 changes: 7 additions & 1 deletion ethcore/src/miner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use self::miner::{Miner, MinerOptions, Penalization, PendingSet, AuthoringPa
pub use ethcore_miner::pool::PendingOrdering;

use std::sync::Arc;
use std::collections::BTreeMap;
use std::collections::{BTreeSet, BTreeMap};

use bytes::Bytes;
use ethereum_types::{H256, U256, Address};
Expand Down Expand Up @@ -164,6 +164,12 @@ pub trait MinerService : Send + Sync {
fn next_nonce<C>(&self, chain: &C, address: &Address) -> U256
where C: Nonce + Sync;

/// Get a set of all pending transaction hashes.
///
/// Depending on the settings may look in transaction pool or only in pending block.
fn pending_transaction_hashes<C>(&self, chain: &C) -> BTreeSet<H256> where
C: ChainInfo + Sync;

/// Get a list of all ready transactions either ordered by priority or unordered (cheaper).
///
/// Depending on the settings may look in transaction pool or only in pending block.
Expand Down
15 changes: 14 additions & 1 deletion miner/src/pool/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use std::{cmp, fmt};
use std::sync::Arc;
use std::sync::atomic::{self, AtomicUsize};
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, BTreeSet, HashMap};

use ethereum_types::{H256, U256, Address};
use parking_lot::RwLock;
Expand Down Expand Up @@ -296,6 +296,19 @@ impl TransactionQueue {
self.pool.read().unordered_pending(ready).collect()
}

/// Computes unordered set of pending hashes.
///
/// Since strict nonce-checking is not required, you may get some false positive future transactions as well.
pub fn pending_hashes<N>(
&self,
nonce: N,
) -> BTreeSet<H256> where
N: Fn(&Address) -> Option<U256>,
{
let ready = ready::OptionalState::new(nonce);
self.pool.read().unordered_pending(ready).map(|tx| tx.hash).collect()
}

/// Returns current pending transactions ordered by priority.
///
/// NOTE: This may return a cached version of pending transaction set.
Expand Down
37 changes: 37 additions & 0 deletions miner/src/pool/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,43 @@ impl txpool::Ready<VerifiedTransaction> for Condition {
}
}

/// Readiness checker that only relies on nonce cache (does actually go to state).
///
/// Checks readiness of transactions by comparing the nonce to state nonce. If nonce
/// isn't found in provided state nonce store, defaults to the tx nonce and updates
/// the nonce store. Useful for using with a state nonce cache when false positives are allowed.
pub struct OptionalState<C> {
nonces: HashMap<Address, U256>,
state: C,
}

impl<C> OptionalState<C> {
pub fn new(state: C) -> Self {
OptionalState {
nonces: Default::default(),
state,
}
}
}

impl<C: Fn(&Address) -> Option<U256>> txpool::Ready<VerifiedTransaction> for OptionalState<C> {
fn is_ready(&mut self, tx: &VerifiedTransaction) -> txpool::Readiness {
let sender = tx.sender();
let state = &self.state;
let nonce = self.nonces.entry(*sender).or_insert_with(|| {
state(sender).unwrap_or_else(|| tx.transaction.nonce)
});
match tx.transaction.nonce.cmp(nonce) {
cmp::Ordering::Greater => txpool::Readiness::Future,
cmp::Ordering::Less => txpool::Readiness::Stale,
cmp::Ordering::Equal => {
*nonce = *nonce + 1.into();
txpool::Readiness::Ready
},
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/v1/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ mod subscription_manager;
pub use self::dispatch::{Dispatcher, FullDispatcher};
pub use self::network_settings::NetworkSettings;
pub use self::poll_manager::PollManager;
pub use self::poll_filter::{PollFilter, limit_logs};
pub use self::poll_filter::{PollFilter, SyncPollFilter, limit_logs};
pub use self::requests::{
TransactionRequest, FilledTransactionRequest, ConfirmationRequest, ConfirmationPayload, CallRequest,
};
Expand Down
28 changes: 25 additions & 3 deletions rpc/src/v1/helpers/poll_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,41 @@

//! Helper type with all filter state data.

use std::collections::HashSet;
use std::{
collections::{BTreeSet, HashSet},
sync::Arc,
};
use ethereum_types::H256;
use parking_lot::Mutex;
use v1::types::{Filter, Log};

pub type BlockNumber = u64;

/// Thread-safe filter state.
#[derive(Clone)]
pub struct SyncPollFilter(Arc<Mutex<PollFilter>>);

impl SyncPollFilter {
/// New `SyncPollFilter`
pub fn new(f: PollFilter) -> Self {
SyncPollFilter(Arc::new(Mutex::new(f)))
}

/// Modify underlying filter
pub fn modify<F, R>(&self, f: F) -> R where
F: FnOnce(&mut PollFilter) -> R,
{
f(&mut self.0.lock())
}
}

/// Filter state.
#[derive(Clone)]
pub enum PollFilter {
/// Number of last block which client was notified about.
Block(BlockNumber),
/// Hashes of all transactions which client was notified about.
PendingTransaction(Vec<H256>),
/// Hashes of all pending transactions the client knows about.
PendingTransaction(BTreeSet<H256>),
/// Number of From block number, last seen block hash, pending logs and log filter itself.
Logs(BlockNumber, Option<H256>, HashSet<Log>, Filter)
}
Expand Down
4 changes: 1 addition & 3 deletions rpc/src/v1/impls/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,9 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM, T: StateInfo + 'static> Eth for EthClient<
}

fn block_transaction_count_by_number(&self, num: BlockNumber) -> BoxFuture<Option<RpcU256>> {
let block_number = self.client.chain_info().best_block_number;

Box::new(future::ok(match num {
BlockNumber::Pending =>
self.miner.pending_transactions(block_number).map(|x| x.len().into()),
Some(self.miner.pending_transaction_hashes(&*self.client).len().into()),
_ =>
self.client.block(block_number_to_id(num)).map(|block| block.transactions_count().into())
}))
Expand Down
Loading