Skip to content

Commit

Permalink
fix(txpool): respect propagate setting in the full tx stream (paradig…
Browse files Browse the repository at this point in the history
…mxyz#4362)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
  • Loading branch information
alexfertel and mattsse authored Aug 29, 2023
1 parent b754194 commit 03afe37
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 54 deletions.
16 changes: 8 additions & 8 deletions crates/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ pub use crate::{
traits::{
AllPoolTransactions, BestTransactions, BlockInfo, CanonicalStateUpdate, ChangedAccount,
EthBlobTransactionSidecar, EthPoolTransaction, EthPooledTransaction,
GetPooledTransactionLimit, NewTransactionEvent, PendingTransactionListenerKind, PoolSize,
PoolTransaction, PropagateKind, PropagatedTransactions, TransactionOrigin, TransactionPool,
GetPooledTransactionLimit, NewTransactionEvent, PoolSize, PoolTransaction, PropagateKind,
PropagatedTransactions, TransactionListenerKind, TransactionOrigin, TransactionPool,
TransactionPoolExt,
},
validate::{
Expand Down Expand Up @@ -374,15 +374,15 @@ where
self.pool.add_all_transactions_event_listener()
}

fn pending_transactions_listener_for(
&self,
kind: PendingTransactionListenerKind,
) -> Receiver<TxHash> {
fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash> {
self.pool.add_pending_listener(kind)
}

fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>> {
self.pool.add_new_transaction_listener()
fn new_transactions_listener_for(
&self,
kind: TransactionListenerKind,
) -> Receiver<NewTransactionEvent<Self::Transaction>> {
self.pool.add_new_transaction_listener(kind)
}

fn pooled_transaction_hashes(&self) -> Vec<TxHash> {
Expand Down
11 changes: 9 additions & 2 deletions crates/transaction-pool/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use crate::{
blobstore::BlobStoreError,
error::PoolError,
traits::{GetPooledTransactionLimit, PendingTransactionListenerKind},
traits::{GetPooledTransactionLimit, TransactionListenerKind},
validate::ValidTransaction,
AllPoolTransactions, AllTransactionsEvents, BestTransactions, BlockInfo, EthPooledTransaction,
NewTransactionEvent, PoolResult, PoolSize, PoolTransaction, PropagatedTransactions,
Expand Down Expand Up @@ -83,7 +83,7 @@ impl TransactionPool for NoopTransactionPool {

fn pending_transactions_listener_for(
&self,
_kind: PendingTransactionListenerKind,
_kind: TransactionListenerKind,
) -> Receiver<TxHash> {
mpsc::channel(1).1
}
Expand All @@ -92,6 +92,13 @@ impl TransactionPool for NoopTransactionPool {
mpsc::channel(1).1
}

fn new_transactions_listener_for(
&self,
_kind: TransactionListenerKind,
) -> Receiver<NewTransactionEvent<Self::Transaction>> {
mpsc::channel(1).1
}

fn pooled_transaction_hashes(&self) -> Vec<TxHash> {
vec![]
}
Expand Down
72 changes: 43 additions & 29 deletions crates/transaction-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Transaction Pool internals.
//!
//! Incoming transactions validated are before they enter the pool first. The validation outcome can
//! Incoming transactions are validated before they enter the pool first. The validation outcome can
//! have 3 states:
//!
//! 1. Transaction can _never_ be valid
Expand Down Expand Up @@ -103,7 +103,7 @@ use crate::{
blobstore::BlobStore,
metrics::BlobStoreMetrics,
pool::txpool::UpdateOutcome,
traits::{GetPooledTransactionLimit, PendingTransactionListenerKind},
traits::{GetPooledTransactionLimit, TransactionListenerKind},
validate::ValidTransaction,
};
pub use listener::{AllTransactionsEvents, TransactionEvents};
Expand Down Expand Up @@ -137,7 +137,7 @@ where
/// Listeners for new pending transactions.
pending_transaction_listener: Mutex<Vec<PendingTransactionListener>>,
/// Listeners for new transactions added to the pool.
transaction_listener: Mutex<Vec<mpsc::Sender<NewTransactionEvent<T::Transaction>>>>,
transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
/// Metrics for the blob store
blob_store_metrics: BlobStoreMetrics,
}
Expand Down Expand Up @@ -222,10 +222,7 @@ where

/// Adds a new transaction listener to the pool that gets notified about every new _pending_
/// transaction inserted into the pool
pub fn add_pending_listener(
&self,
kind: PendingTransactionListenerKind,
) -> mpsc::Receiver<TxHash> {
pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
const TX_LISTENER_BUFFER_SIZE: usize = 2048;
let (sender, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE);
let listener = PendingTransactionListener { sender, kind };
Expand All @@ -236,10 +233,12 @@ where
/// Adds a new transaction listener to the pool that gets notified about every new transaction.
pub fn add_new_transaction_listener(
&self,
kind: TransactionListenerKind,
) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
const TX_LISTENER_BUFFER_SIZE: usize = 1024;
let (tx, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE);
self.transaction_listener.lock().push(tx);
let (sender, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE);
let listener = TransactionListener { sender, kind };
self.transaction_listener.lock().push(listener);
rx
}

Expand Down Expand Up @@ -517,18 +516,25 @@ where
/// Notify all listeners about a newly inserted pending transaction.
fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
let mut transaction_listeners = self.transaction_listener.lock();
transaction_listeners.retain_mut(|listener| {
if listener.kind.is_propagate_only() && !event.transaction.propagate {
// only emit this hash to listeners that are only allowed to receive propagate only
// transactions, such as network
return !listener.sender.is_closed()
}

transaction_listeners.retain_mut(|listener| match listener.try_send(event.clone()) {
Ok(()) => true,
Err(err) => {
if matches!(err, mpsc::error::TrySendError::Full(_)) {
debug!(
target: "txpool",
"skipping transaction on full transaction listener",
);
true
} else {
false
match listener.sender.try_send(event.clone()) {
Ok(()) => true,
Err(err) => {
if matches!(err, mpsc::error::TrySendError::Full(_)) {
debug!(
target: "txpool",
"skipping transaction on full transaction listener",
);
true
} else {
false
}
}
}
});
Expand Down Expand Up @@ -742,7 +748,15 @@ impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
struct PendingTransactionListener {
sender: mpsc::Sender<TxHash>,
/// Whether to include transactions that should not be propagated over the network.
kind: PendingTransactionListenerKind,
kind: TransactionListenerKind,
}

/// An active listener for new pending transactions.
#[derive(Debug)]
struct TransactionListener<T: PoolTransaction> {
sender: mpsc::Sender<NewTransactionEvent<T>>,
/// Whether to include transactions that should not be propagated over the network.
kind: TransactionListenerKind,
}

/// Tracks an added transaction and all graph changes caused by adding it.
Expand All @@ -754,19 +768,19 @@ pub struct AddedPendingTransaction<T: PoolTransaction> {
replaced: Option<Arc<ValidPoolTransaction<T>>>,
/// transactions promoted to the pending queue
promoted: Vec<Arc<ValidPoolTransaction<T>>>,
/// transaction that failed and became discarded
/// transactions that failed and became discarded
discarded: Vec<Arc<ValidPoolTransaction<T>>>,
}

impl<T: PoolTransaction> AddedPendingTransaction<T> {
/// Returns all transactions that were promoted to the pending pool and adhere to the given
/// [PendingTransactionListenerKind].
/// [TransactionListenerKind].
///
/// If the kind is [PendingTransactionListenerKind::PropagateOnly], then only transactions that
/// If the kind is [TransactionListenerKind::PropagateOnly], then only transactions that
/// are allowed to be propagated are returned.
pub(crate) fn pending_transactions(
&self,
kind: PendingTransactionListenerKind,
kind: TransactionListenerKind,
) -> impl Iterator<Item = H256> + '_ {
let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
PendingTransactionIter { kind, iter }
Expand All @@ -779,7 +793,7 @@ impl<T: PoolTransaction> AddedPendingTransaction<T> {
}

pub(crate) struct PendingTransactionIter<Iter> {
kind: PendingTransactionListenerKind,
kind: TransactionListenerKind,
iter: Iter,
}

Expand Down Expand Up @@ -876,13 +890,13 @@ pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {

impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
/// Returns all transactions that were promoted to the pending pool and adhere to the given
/// [PendingTransactionListenerKind].
/// [TransactionListenerKind].
///
/// If the kind is [PendingTransactionListenerKind::PropagateOnly], then only transactions that
/// If the kind is [TransactionListenerKind::PropagateOnly], then only transactions that
/// are allowed to be propagated are returned.
pub(crate) fn pending_transactions(
&self,
kind: PendingTransactionListenerKind,
kind: TransactionListenerKind,
) -> impl Iterator<Item = H256> + '_ {
let iter = self.promoted.iter();
PendingTransactionIter { kind, iter }
Expand Down
32 changes: 20 additions & 12 deletions crates/transaction-pool/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,24 @@ pub trait TransactionPool: Send + Sync + Clone {
///
/// Consumer: RPC/P2P
fn pending_transactions_listener(&self) -> Receiver<TxHash> {
self.pending_transactions_listener_for(PendingTransactionListenerKind::PropagateOnly)
self.pending_transactions_listener_for(TransactionListenerKind::PropagateOnly)
}

/// Returns a new Stream that yields transactions hashes for new __pending__ transactions
/// inserted into the pool depending on the given [PendingTransactionListenerKind] argument.
fn pending_transactions_listener_for(
&self,
kind: PendingTransactionListenerKind,
) -> Receiver<TxHash>;
/// inserted into the pool depending on the given [TransactionListenerKind] argument.
fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash>;

/// Returns a new stream that yields new valid transactions added to the pool.
fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>>;
fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>> {
self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly)
}

/// Returns a new stream that yields new valid transactions added to the pool
/// depending on the given [TransactionListenerKind] argument.
fn new_transactions_listener_for(
&self,
kind: TransactionListenerKind,
) -> Receiver<NewTransactionEvent<Self::Transaction>>;

/// Returns a new Stream that yields new transactions added to the pending sub-pool.
///
Expand All @@ -138,7 +144,10 @@ pub trait TransactionPool: Send + Sync + Clone {
fn new_pending_pool_transactions_listener(
&self,
) -> NewSubpoolTransactionStream<Self::Transaction> {
NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::Pending)
NewSubpoolTransactionStream::new(
self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly),
SubPool::Pending,
)
}

/// Returns a new Stream that yields new transactions added to the basefee sub-pool.
Expand Down Expand Up @@ -326,12 +335,11 @@ pub trait TransactionPoolExt: TransactionPool {
fn delete_blobs(&self, txs: Vec<H256>);
}

/// Determines what kind of new pending transactions should be emitted by a stream of pending
/// transactions.
/// Determines what kind of new transactions should be emitted by a stream of transactions.
///
/// This gives control whether to include transactions that are allowed to be propagated.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum PendingTransactionListenerKind {
pub enum TransactionListenerKind {
/// Any new pending transactions
All,
/// Only transactions that are allowed to be propagated.
Expand All @@ -340,7 +348,7 @@ pub enum PendingTransactionListenerKind {
PropagateOnly,
}

impl PendingTransactionListenerKind {
impl TransactionListenerKind {
/// Returns true if we're only interested in transactions that are allowed to be propagated.
#[inline]
pub fn is_propagate_only(&self) -> bool {
Expand Down
29 changes: 26 additions & 3 deletions crates/transaction-pool/tests/it/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use assert_matches::assert_matches;
use reth_transaction_pool::{
noop::MockTransactionValidator,
test_utils::{testing_pool, testing_pool_with_validator, MockTransactionFactory},
FullTransactionEvent, PendingTransactionListenerKind, TransactionEvent, TransactionOrigin,
FullTransactionEvent, TransactionEvent, TransactionListenerKind, TransactionOrigin,
TransactionPool,
};
use std::{future::poll_fn, task::Poll};
Expand Down Expand Up @@ -48,8 +48,7 @@ async fn txpool_listener_propagate_only() {
let transaction = mock_tx_factory.create_eip1559();
let expected = *transaction.hash();
let mut listener_network = txpool.pending_transactions_listener();
let mut listener_all =
txpool.pending_transactions_listener_for(PendingTransactionListenerKind::All);
let mut listener_all = txpool.pending_transactions_listener_for(TransactionListenerKind::All);
let result =
txpool.add_transaction(TransactionOrigin::Local, transaction.transaction.clone()).await;
assert!(result.is_ok());
Expand All @@ -64,3 +63,27 @@ async fn txpool_listener_propagate_only() {
})
.await;
}

#[tokio::test(flavor = "multi_thread")]
async fn txpool_listener_new_propagate_only() {
let txpool = testing_pool_with_validator(MockTransactionValidator::no_propagate_local());
let mut mock_tx_factory = MockTransactionFactory::default();
let transaction = mock_tx_factory.create_eip1559();
let expected = *transaction.hash();
let mut listener_network = txpool.new_transactions_listener();
let mut listener_all = txpool.new_transactions_listener_for(TransactionListenerKind::All);
let result =
txpool.add_transaction(TransactionOrigin::Local, transaction.transaction.clone()).await;
assert!(result.is_ok());

let inserted = listener_all.recv().await.unwrap();
let actual = *inserted.transaction.hash();
assert_eq!(actual, expected);

poll_fn(|cx| {
// no propagation
assert!(listener_network.poll_recv(cx).is_pending());
Poll::Ready(())
})
.await;
}

0 comments on commit 03afe37

Please sign in to comment.