Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

fn queue_transactions accepts UnverifiedTransactions #10887

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ethcore/light/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ pub trait Handler: Send + Sync {
/// Called when a peer makes an announcement.
fn on_announcement(&self, _ctx: &dyn EventContext, _announcement: &Announcement) { }
/// Called when a peer requests relay of some transactions.
fn on_transactions(&self, _ctx: &dyn EventContext, _relay: &[UnverifiedTransaction]) { }
fn on_transactions(&self, _ctx: &dyn EventContext, _relay: Vec<UnverifiedTransaction>) { }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a regression. Luckily it's only a light client, so we shouldn't have many transactions but you may end up cloning a lot of data.

/// Called when a peer responds to requests.
/// Responses not guaranteed to contain valid data and are not yet checked against
/// the requests they correspond to.
Expand Down Expand Up @@ -1138,7 +1138,7 @@ impl LightProtocol {
peer,
io,
proto: self,
}, &txs);
}, txs.clone());
}

Ok(())
Expand Down
3 changes: 1 addition & 2 deletions ethcore/src/client/chain_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

use bytes::Bytes;
use ethereum_types::{H256, U256};
use types::transaction::UnverifiedTransaction;
use blockchain::ImportRoute;
use std::time::Duration;
use std::collections::HashMap;
Expand Down Expand Up @@ -185,7 +184,7 @@ pub trait ChainNotify : Send + Sync {

/// fires when new transactions are received from a peer
fn transactions_received(&self,
_txs: &[UnverifiedTransaction],
_txs: &[H256],
_peer_id: usize,
) {
// does nothing by default
Expand Down
18 changes: 9 additions & 9 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2217,22 +2217,22 @@ impl BlockChainClient for Client {
}

impl IoClient for Client {
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
fn queue_transactions(&self, transactions: Vec<UnverifiedTransaction>, peer_id: usize) {
trace_time!("queue_transactions");
let len = transactions.len();
self.queue_transactions.queue(&self.io_channel.read(), len, move |client| {
trace_time!("import_queued_transactions");

let txs: Vec<UnverifiedTransaction> = transactions
.iter()
.filter_map(|bytes| client.engine.decode_transaction(bytes).ok())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this verification is performed by a miner in import_external_transactions

Copy link
Collaborator

@ordian ordian Jul 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we previous ignored invalid rlps and now we return an err in on_peer_transactions? This seems like an improvement to me. But maybe it was done on purpose?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we previous ignored invalid rlps and now we return an err in on_peer_transactions?

Yes

This seems like an improvement to me. But maybe it was done on purpose?

No, it was a regression introduced very recently

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This closure is invoked in a IoWorker thread pool and the whole thing was done to prevent blocking networking task for useless stuff like decoding transactions.

.collect();

let hashes: Vec<_> = transactions.iter().map(UnverifiedTransaction::hash).collect();
client.notify(|notify| {
notify.transactions_received(&txs, peer_id);
notify.transactions_received(&hashes, peer_id);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should move this notification after miner imports the transactions? cause those transactions may be invalid

Copy link
Collaborator

@ordian ordian Jul 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIR this notification only updates last_sent_transactions of a peer, so that we don't propagate the same transaction that the peer sent us

});

client.importer.miner.import_external_transactions(client, txs);
// TODO: queue_transactions.queue, should accept FnOnce, so it can capture outer variables
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you open an issue?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to do it initially, but it requires reworking IoWorkers. Currently every IoMessage is passed to every handler. We just have one handler for ClientIoMessage, I guess we could maybe hack around it with some internal mutability though.

for res in client.importer.miner.import_external_transactions(client, transactions.clone()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unacceptable, it means you clone ~4MB a second.

if let Err(e) = res {
debug!(target: "client", "Import of external transaction has failed: {}", e);
}
}
}).unwrap_or_else(|e| {
debug!(target: "client", "Ignoring {} transactions: {}", len, e);
});
Expand Down
9 changes: 4 additions & 5 deletions ethcore/src/client/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ use itertools::Itertools;
use kvdb::DBValue;
use kvdb_memorydb;
use parking_lot::RwLock;
use rlp::{Rlp, RlpStream};
use rlp::RlpStream;
use rustc_hex::FromHex;
use types::transaction::{self, Transaction, LocalizedTransaction, SignedTransaction, Action};
use types::transaction::{self, Transaction, LocalizedTransaction, SignedTransaction, Action, UnverifiedTransaction};
use types::BlockNumber;
use types::basic_account::BasicAccount;
use types::encoded;
Expand Down Expand Up @@ -903,10 +903,9 @@ impl BlockChainClient for TestBlockChainClient {
}

impl IoClient for TestBlockChainClient {
fn queue_transactions(&self, transactions: Vec<Bytes>, _peer_id: usize) {
fn queue_transactions(&self, transactions: Vec<UnverifiedTransaction>, _peer_id: usize) {
// import right here
let txs = transactions.into_iter().filter_map(|bytes| Rlp::new(&bytes).as_val().ok()).collect();
self.miner.import_external_transactions(self, txs);
self.miner.import_external_transactions(self, transactions);
}

fn queue_ancient_block(&self, unverified: Unverified, _r: Bytes) -> EthcoreResult<H256> {
Expand Down
4 changes: 2 additions & 2 deletions ethcore/src/client/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use ethereum_types::{H256, U256, Address};
use evm::Schedule;
use itertools::Itertools;
use kvdb::DBValue;
use types::transaction::{self, LocalizedTransaction, SignedTransaction};
use types::transaction::{self, LocalizedTransaction, SignedTransaction, UnverifiedTransaction};
use types::BlockNumber;
use types::basic_account::BasicAccount;
use types::block_status::BlockStatus;
Expand Down Expand Up @@ -184,7 +184,7 @@ pub trait EngineInfo {
/// IO operations that should off-load heavy work to another thread.
pub trait IoClient: Sync + Send {
/// Queue transactions for importing.
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize);
fn queue_transactions(&self, transactions: Vec<UnverifiedTransaction>, peer_id: usize);

/// Queue block import with transaction receipts. Does no sealing and transaction validation.
fn queue_ancient_block(&self, block_bytes: Unverified, receipts_bytes: Bytes) -> EthcoreResult<H256>;
Expand Down
10 changes: 7 additions & 3 deletions ethcore/src/engines/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,9 +541,13 @@ pub trait Engine: Sync + Send {
self.machine().verify_transaction_basic(t, header)
}

/// Performs pre-validation of RLP decoded transaction before other processing
fn decode_transaction(&self, transaction: &[u8]) -> Result<UnverifiedTransaction, transaction::Error> {
self.machine().decode_transaction(transaction)
/// Verifies len of transaction RLP
fn verify_transaction_len(&self, transaction: &[u8]) -> Result<(), transaction::Error> {
if transaction.len() > self.params().max_transaction_size {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_transaction_size is configurable since #8417 but I don't know if this is really needed... maybe we could just remove it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we can't just remove it. We need a way to tell users that their transaction is not being accepted to the pool and will never be propagated. We can obviously hardcode that into a miner, but it feels like a regression.

return Err(transaction::Error::TooBig)
}

Ok(())
}
}

Expand Down
11 changes: 0 additions & 11 deletions ethcore/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::cmp;
use std::sync::Arc;

use ethereum_types::{U256, H256, Address};
use rlp::Rlp;
use types::transaction::{self, SYSTEM_ADDRESS, UNSIGNED_SENDER, UnverifiedTransaction, SignedTransaction};
use types::BlockNumber;
use types::header::Header;
Expand Down Expand Up @@ -385,16 +384,6 @@ impl Machine {
Ok(())
}

/// Performs pre-validation of RLP decoded transaction before other processing
pub fn decode_transaction(&self, transaction: &[u8]) -> Result<UnverifiedTransaction, transaction::Error> {
let rlp = Rlp::new(&transaction);
if rlp.as_raw().len() > self.params().max_transaction_size {
debug!("Rejected oversized transaction of {} bytes", rlp.as_raw().len());
return Err(transaction::Error::TooBig)
}
rlp.as_val().map_err(|e| transaction::Error::InvalidRlp(e.to_string()))
}

/// Get the balance, in base units, associated with an account.
/// Extracts data from the live block.
pub fn balance(&self, live: &ExecutedBlock, address: &Address) -> Result<U256, Error> {
Expand Down
4 changes: 2 additions & 2 deletions ethcore/src/miner/pool_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ impl<'a, C: 'a> pool::client::Client for PoolClient<'a, C> where
}
}

fn decode_transaction(&self, transaction: &[u8]) -> Result<UnverifiedTransaction, transaction::Error> {
self.engine.decode_transaction(transaction)
fn verify_transaction_len(&self, transaction: &[u8]) -> Result<(), transaction::Error> {
self.engine.verify_transaction_len(transaction)
}
}

Expand Down
7 changes: 3 additions & 4 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ use parity_runtime::Executor;
use std::sync::atomic::{AtomicBool, Ordering};
use network::IpFilter;
use private_tx::PrivateTxHandler;
use types::transaction::UnverifiedTransaction;

use super::light_sync::SyncInfo;

Expand Down Expand Up @@ -603,7 +602,7 @@ impl ChainNotify for EthSync {
});
}

fn transactions_received(&self, txs: &[UnverifiedTransaction], peer_id: PeerId) {
fn transactions_received(&self, txs: &[H256], peer_id: PeerId) {
let mut sync = self.eth_handler.sync.write();
sync.transactions_received(txs, peer_id);
}
Expand All @@ -614,9 +613,9 @@ impl ChainNotify for EthSync {
struct TxRelay(Arc<dyn BlockChainClient>);

impl LightHandler for TxRelay {
fn on_transactions(&self, ctx: &dyn EventContext, relay: &[::types::transaction::UnverifiedTransaction]) {
fn on_transactions(&self, ctx: &dyn EventContext, relay: Vec<::types::transaction::UnverifiedTransaction>) {
trace!(target: "pip", "Relaying {} transactions from peer {}", relay.len(), ctx.peer());
self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx)).collect(), ctx.peer())
self.0.queue_transactions(relay, ctx.peer())
}
}

Expand Down
3 changes: 1 addition & 2 deletions ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,8 +673,7 @@ impl SyncHandler {
trace!(target: "sync", "{:02} -> Transactions ({} entries)", peer_id, item_count);
let mut transactions = Vec::with_capacity(item_count);
for i in 0 .. item_count {
let rlp = r.at(i)?;
let tx = rlp.as_raw().to_vec();
let tx = r.val_at(i)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole point of passing Bytes was to avoid doing any intensive work (like hashing transactions) synchronously in the network IoHandlers.
The whole refactoring was done after a careful profiling of issues that happens on the mainnet where network lock could be acquired for a very long time.

transactions.push(tx);
}
io.chain().queue_transactions(transactions, peer_id);
Expand Down
6 changes: 3 additions & 3 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ use snapshot::{Snapshot};
use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID, PriorityTask};
use private_tx::PrivateTxHandler;
use transactions_stats::{TransactionsStats, Stats as TransactionStats};
use types::transaction::UnverifiedTransaction;
use types::BlockNumber;

use self::handler::SyncHandler;
Expand Down Expand Up @@ -701,9 +700,9 @@ impl ChainSync {
}

/// Updates transactions were received by a peer
pub fn transactions_received(&mut self, txs: &[UnverifiedTransaction], peer_id: PeerId) {
pub fn transactions_received(&mut self, txs: &[H256], peer_id: PeerId) {
if let Some(peer_info) = self.peers.get_mut(&peer_id) {
peer_info.last_sent_transactions.extend(txs.iter().map(|tx| tx.hash()));
peer_info.last_sent_transactions.extend(txs);
}
}

Expand Down Expand Up @@ -1373,6 +1372,7 @@ pub mod tests {
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient, ChainInfo, BlockInfo};
use ethcore::miner::{MinerService, PendingOrdering};
use types::header::Header;
use types::transaction::UnverifiedTransaction;

pub fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes {
let mut header = Header::new();
Expand Down
10 changes: 6 additions & 4 deletions ethcore/sync/src/chain/propagator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,14 @@ impl SyncPropagator {

#[cfg(test)]
mod tests {
use ethcore::client::{BlockInfo, ChainInfo, EachBlockWith, TestBlockChainClient};
use std::collections::VecDeque;
use parking_lot::RwLock;
use rlp::{Rlp};
use std::collections::{VecDeque};
use tests::helpers::{TestIo};

use ethcore::client::{BlockInfo, ChainInfo, EachBlockWith, TestBlockChainClient};
use rlp::Rlp;
use tests::helpers::TestIo;
use tests::snapshot::TestSnapshotService;
use types::transaction::UnverifiedTransaction;

use super::{*, super::{*, tests::*}};

Expand Down
5 changes: 2 additions & 3 deletions miner/src/pool/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ pub trait Client: fmt::Debug + Sync {
/// Classify transaction (check if transaction is filtered by some contracts).
fn transaction_type(&self, tx: &transaction::SignedTransaction) -> TransactionType;

/// Performs pre-validation of RLP decoded transaction
fn decode_transaction(&self, transaction: &[u8])
-> Result<transaction::UnverifiedTransaction, transaction::Error>;
/// Verifies len of transaction RLP
fn verify_transaction_len(&self, transaction: &[u8]) -> Result<(), transaction::Error>;
}

/// State nonce client
Expand Down
10 changes: 4 additions & 6 deletions miner/src/pool/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use std::sync::{atomic, Arc};

use ethereum_types::{U256, H256, Address};
use rlp::Rlp;
use types::transaction::{self, Transaction, SignedTransaction, UnverifiedTransaction};

use pool;
Expand Down Expand Up @@ -131,14 +130,13 @@ impl pool::client::Client for TestClient {
}
}

fn decode_transaction(&self, transaction: &[u8]) -> Result<UnverifiedTransaction, transaction::Error> {
let rlp = Rlp::new(&transaction);
if rlp.as_raw().len() > self.max_transaction_size {
fn verify_transaction_len(&self, transaction: &[u8]) -> Result<(), transaction::Error> {
if transaction.len() > self.max_transaction_size {
return Err(transaction::Error::TooBig)
}
rlp.as_val().map_err(|e| transaction::Error::InvalidRlp(e.to_string()))
}

Ok(())
}
}

impl pool::client::NonceClient for TestClient {
Expand Down
2 changes: 1 addition & 1 deletion miner/src/pool/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ impl<C: Client> txpool::Verifier<Transaction> for Verifier<C, ::pool::scoring::N
};

// Verify RLP payload
if let Err(err) = self.client.decode_transaction(&transaction.rlp_bytes()) {
if let Err(err) = self.client.verify_transaction_len(&transaction.rlp_bytes()) {
debug!(target: "txqueue", "[{:?}] Rejected transaction's rlp payload", err);
return Err(err)
}
Expand Down