Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Recently rejected cache for transaction queue (#9005)
Browse files Browse the repository at this point in the history
* Store recently rejected transactions.

* Don't cache AlreadyImported rejections.

* Make the size of transaction verification queue dependent on pool size.

* Add a test for recently rejected.

* Fix logging for recently rejected.

* Make rejection cache smaller.

* obsolete test removed

* obsolete test removed

* Construct cache with_capacity.
  • Loading branch information
tomusdrw authored and andresilva committed Jul 2, 2018
1 parent 9caa868 commit 78e0012
Show file tree
Hide file tree
Showing 11 changed files with 167 additions and 58 deletions.
5 changes: 2 additions & 3 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ pub use verification::queue::QueueInfo as BlockQueueInfo;

use_contract!(registry, "Registry", "res/contracts/registrar.json");

const MAX_TX_QUEUE_SIZE: usize = 4096;
const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096;
// Max number of blocks imported at once.
const MAX_ANCIENT_BLOCKS_TO_IMPORT: usize = 4;
Expand Down Expand Up @@ -760,13 +759,12 @@ impl Client {
tracedb: tracedb,
engine: engine,
pruning: config.pruning.clone(),
config: config,
db: RwLock::new(db.clone()),
state_db: RwLock::new(state_db),
report: RwLock::new(Default::default()),
io_channel: Mutex::new(message_channel),
notify: RwLock::new(Vec::new()),
queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE),
queue_transactions: IoChannelQueue::new(config.transaction_verification_queue_size),
queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
queued_ancient_blocks: Default::default(),
ancient_blocks_import_lock: Default::default(),
Expand All @@ -779,6 +777,7 @@ impl Client {
registrar_address,
exit_handler: Mutex::new(None),
importer,
config,
});

// prune old states.
Expand Down
41 changes: 29 additions & 12 deletions ethcore/src/client/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,6 @@ pub enum Mode {
Off,
}

impl Default for Mode {
fn default() -> Self {
Mode::Active
}
}

impl Display for Mode {
fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
match *self {
Expand All @@ -88,7 +82,7 @@ impl Display for Mode {
}

/// Client configuration. Includes configs for all sub-systems.
#[derive(Debug, PartialEq, Default, Clone)]
#[derive(Debug, PartialEq, Clone)]
pub struct ClientConfig {
/// Block queue configuration.
pub queue: QueueConfig,
Expand Down Expand Up @@ -126,8 +120,36 @@ pub struct ClientConfig {
pub history_mem: usize,
/// Check seal valididity on block import
pub check_seal: bool,
/// Maximal number of transactions queued for verification in a separate thread.
pub transaction_verification_queue_size: usize,
}

impl Default for ClientConfig {
fn default() -> Self {
let mb = 1024 * 1024;
ClientConfig {
queue: Default::default(),
blockchain: Default::default(),
tracing: Default::default(),
vm_type: Default::default(),
fat_db: false,
pruning: journaldb::Algorithm::OverlayRecent,
name: "default".into(),
db_cache_size: None,
db_compaction: Default::default(),
db_wal: true,
mode: Mode::Active,
spec_name: "".into(),
verifier_type: VerifierType::Canon,
state_cache_size: 1 * mb,
jump_table_size: 1 * mb,
history: 64,
history_mem: 32 * mb,
check_seal: true,
transaction_verification_queue_size: 8192,
}
}
}
#[cfg(test)]
mod test {
use super::{DatabaseCompactionProfile, Mode};
Expand All @@ -143,9 +165,4 @@ mod test {
assert_eq!(DatabaseCompactionProfile::SSD, "ssd".parse().unwrap());
assert_eq!(DatabaseCompactionProfile::HDD, "hdd".parse().unwrap());
}

#[test]
fn test_mode_default() {
assert_eq!(Mode::default(), Mode::Active);
}
}
6 changes: 0 additions & 6 deletions ethcore/src/verification/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ pub enum VerifierType {
Noop,
}

impl Default for VerifierType {
fn default() -> Self {
VerifierType::Canon
}
}

/// Create a new verifier based on type.
pub fn new<C: BlockInfo + CallContract>(v: VerifierType) -> Box<Verifier<C>> {
match v {
Expand Down
33 changes: 17 additions & 16 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,20 +662,29 @@ impl ChainSync {
None
}
).collect();
let mut peers: Vec<(PeerId, u8)> = confirmed_peers.iter().filter(|&&(peer_id, _)|
self.active_peers.contains(&peer_id)
).map(|v| *v).collect();

random::new().shuffle(&mut peers); //TODO: sort by rating
// prefer peers with higher protocol version
peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2));
trace!(
target: "sync",
"Syncing with peers: {} active, {} confirmed, {} total",
self.active_peers.len(), confirmed_peers.len(), self.peers.len()
);
for (peer_id, _) in peers {
self.sync_peer(io, peer_id, false);

if self.state == SyncState::Waiting {
trace!(target: "sync", "Waiting for the block queue");
} else if self.state == SyncState::SnapshotWaiting {
trace!(target: "sync", "Waiting for the snapshot restoration");
} else {
let mut peers: Vec<(PeerId, u8)> = confirmed_peers.iter().filter(|&&(peer_id, _)|
self.active_peers.contains(&peer_id)
).map(|v| *v).collect();

random::new().shuffle(&mut peers); //TODO: sort by rating
// prefer peers with higher protocol version
peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2));

for (peer_id, _) in peers {
self.sync_peer(io, peer_id, false);
}
}

if
Expand Down Expand Up @@ -710,14 +719,6 @@ impl ChainSync {
trace!(target: "sync", "Skipping busy peer {}", peer_id);
return;
}
if self.state == SyncState::Waiting {
trace!(target: "sync", "Waiting for the block queue");
return;
}
if self.state == SyncState::SnapshotWaiting {
trace!(target: "sync", "Waiting for the snapshot restoration");
return;
}
(peer.latest_hash.clone(), peer.difficulty.clone(), peer.snapshot_number.as_ref().cloned().unwrap_or(0), peer.snapshot_hash.as_ref().cloned())
} else {
return;
Expand Down
82 changes: 73 additions & 9 deletions miner/src/pool/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use std::{cmp, fmt};
use std::sync::Arc;
use std::sync::atomic::{self, AtomicUsize};
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};

use ethereum_types::{H256, U256, Address};
use parking_lot::RwLock;
Expand Down Expand Up @@ -138,6 +138,50 @@ impl CachedPending {
}
}

#[derive(Debug)]
struct RecentlyRejected {
inner: RwLock<HashMap<H256, transaction::Error>>,
limit: usize,
}

impl RecentlyRejected {
fn new(limit: usize) -> Self {
RecentlyRejected {
limit,
inner: RwLock::new(HashMap::with_capacity(MIN_REJECTED_CACHE_SIZE)),
}
}

fn clear(&self) {
self.inner.write().clear();
}

fn get(&self, hash: &H256) -> Option<transaction::Error> {
self.inner.read().get(hash).cloned()
}

fn insert(&self, hash: H256, err: &transaction::Error) {
if self.inner.read().contains_key(&hash) {
return;
}

let mut inner = self.inner.write();
inner.insert(hash, err.clone());

// clean up
if inner.len() > self.limit {
// randomly remove half of the entries
let to_remove: Vec<_> = inner.keys().take(self.limit / 2).cloned().collect();
for key in to_remove {
inner.remove(&key);
}
}
}
}

/// Minimal size of rejection cache, by default it's equal to queue size.
const MIN_REJECTED_CACHE_SIZE: usize = 2048;

/// Ethereum Transaction Queue
///
/// Responsible for:
Expand All @@ -150,6 +194,7 @@ pub struct TransactionQueue {
pool: RwLock<Pool>,
options: RwLock<verifier::Options>,
cached_pending: RwLock<CachedPending>,
recently_rejected: RecentlyRejected,
}

impl TransactionQueue {
Expand All @@ -159,11 +204,13 @@ impl TransactionQueue {
verification_options: verifier::Options,
strategy: PrioritizationStrategy,
) -> Self {
let max_count = limits.max_count;
TransactionQueue {
insertion_id: Default::default(),
pool: RwLock::new(txpool::Pool::new(Default::default(), scoring::NonceAndGasPrice(strategy), limits)),
options: RwLock::new(verification_options),
cached_pending: RwLock::new(CachedPending::none()),
recently_rejected: RecentlyRejected::new(cmp::max(MIN_REJECTED_CACHE_SIZE, max_count / 4)),
}
}

Expand Down Expand Up @@ -195,26 +242,42 @@ impl TransactionQueue {
None
}
};

let verifier = verifier::Verifier::new(
client,
options,
self.insertion_id.clone(),
transaction_to_replace,
);

let results = transactions
.into_iter()
.map(|transaction| {
if self.pool.read().find(&transaction.hash()).is_some() {
bail!(transaction::Error::AlreadyImported)
let hash = transaction.hash();

if self.pool.read().find(&hash).is_some() {
return Err(transaction::Error::AlreadyImported);
}

verifier.verify_transaction(transaction)
if let Some(err) = self.recently_rejected.get(&hash) {
trace!(target: "txqueue", "[{:?}] Rejecting recently rejected: {:?}", hash, err);
return Err(err);
}

let imported = verifier
.verify_transaction(transaction)
.and_then(|verified| {
self.pool.write().import(verified).map_err(convert_error)
});

match imported {
Ok(_) => Ok(()),
Err(err) => {
self.recently_rejected.insert(hash, &err);
Err(err)
},
}
})
.map(|result| result.and_then(|verified| {
self.pool.write().import(verified)
.map(|_imported| ())
.map_err(convert_error)
}))
.collect::<Vec<_>>();

// Notify about imported transactions.
Expand Down Expand Up @@ -342,6 +405,7 @@ impl TransactionQueue {

let state_readiness = ready::State::new(client, stale_id, nonce_cap);

self.recently_rejected.clear();
let removed = self.pool.write().cull(None, state_readiness);
debug!(target: "txqueue", "Removed {} stalled transactions. {}", removed, self.status());
}
Expand Down
41 changes: 41 additions & 0 deletions miner/src/pool/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,47 @@ fn should_avoid_verifying_transaction_already_in_pool() {
}

#[test]
fn should_avoid_reverifying_recently_rejected_transactions() {
// given
let txq = TransactionQueue::new(
txpool::Options {
max_count: 1,
max_per_sender: 2,
max_mem_usage: 50
},
verifier::Options {
minimal_gas_price: 1.into(),
block_gas_limit: 1_000_000.into(),
tx_gas_limit: 1_000_000.into(),
},
PrioritizationStrategy::GasPriceOnly,
);

let client = TestClient::new();
let tx1 = Tx::gas_price(10_000).signed().unverified();

let res = txq.import(client.clone(), vec![tx1.clone()]);
assert_eq!(res, vec![Err(transaction::Error::InsufficientBalance {
balance: 0xf67c.into(),
cost: 0xc8458e4.into(),
})]);
assert_eq!(txq.status().status.transaction_count, 0);
assert!(client.was_verification_triggered());

// when
let client = TestClient::new();
let res = txq.import(client.clone(), vec![tx1]);
assert_eq!(res, vec![Err(transaction::Error::InsufficientBalance {
balance: 0xf67c.into(),
cost: 0xc8458e4.into(),
})]);
assert!(!client.was_verification_triggered());

// then
assert_eq!(txq.status().status.transaction_count, 0);
}


fn should_reject_early_in_case_gas_price_is_less_than_min_effective() {
// given
let txq = TransactionQueue::new(
Expand Down
2 changes: 1 addition & 1 deletion parity/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ fn execute_import(cmd: ImportBlockchain) -> Result<(), String> {
algorithm,
cmd.pruning_history,
cmd.pruning_memory,
cmd.check_seal
cmd.check_seal,
);

client_config.queue.verifier_settings = cmd.verifier_settings;
Expand Down
2 changes: 2 additions & 0 deletions parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
// fetch service
let fetch = fetch::Client::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?;

let txpool_size = cmd.miner_options.pool_limits.max_count;
// create miner
let miner = Arc::new(Miner::new(
cmd.miner_options,
Expand Down Expand Up @@ -574,6 +575,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
);

client_config.queue.verifier_settings = cmd.verifier_settings;
client_config.transaction_verification_queue_size = ::std::cmp::max(2048, txpool_size / 4);

// set up bootnodes
let mut net_conf = cmd.net_conf;
Expand Down
2 changes: 1 addition & 1 deletion parity/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl SnapshotCommand {
algorithm,
self.pruning_history,
self.pruning_memory,
true
true,
);

let restoration_db_handler = db::restoration_db_handler(&client_path, &client_config);
Expand Down
2 changes: 1 addition & 1 deletion parity/user_defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Default for UserDefaults {
fn default() -> Self {
UserDefaults {
is_first_launch: true,
pruning: Algorithm::default(),
pruning: Algorithm::OverlayRecent,
tracing: false,
fat_db: false,
mode: Mode::Active,
Expand Down
Loading

0 comments on commit 78e0012

Please sign in to comment.