Skip to content

Commit

Permalink
Merge pull request #4 from Arindam2407/parallel_sender_recovery_chunk…
Browse files Browse the repository at this point in the history
…_impl

Updated reth-primitives src/transactions/mod.rs
  • Loading branch information
Arindam2407 authored Nov 23, 2023
2 parents 53c04fa + 887b69d commit ef45cad
Showing 1 changed file with 19 additions and 47 deletions.
66 changes: 19 additions & 47 deletions crates/primitives/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use alloy_rlp::{
use bytes::{Buf, BytesMut};
use derive_more::{AsRef, Deref};
use once_cell::sync::Lazy;
use rayon::prelude::*;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use reth_codecs::{add_arbitrary_tests, derive_arbitrary, Compact};
use serde::{Deserialize, Serialize};
use std::{mem, sync::mpsc, thread};
use std::mem;

pub use access_list::{AccessList, AccessListItem};
pub use eip1559::TxEip1559;
Expand Down Expand Up @@ -833,16 +833,6 @@ impl TransactionSignedNoHash {
self.signature.recover_signer(signature_hash)
}

/// Recover signer helper for parallel impl.
///
/// Returns `None` if the transaction's signature is invalid, see also [Self::recover_signer].
pub fn recover_signer_helper(&self, rlp_buf: &mut Vec<u8>) -> Option<Address> {
let tx = self;
tx.transaction.encode_without_signature(rlp_buf);

tx.signature.recover_signer(keccak256(rlp_buf))
}

/// Converts into a transaction type with its hash: [`TransactionSigned`].
pub fn with_hash(self) -> TransactionSigned {
self.into()
Expand All @@ -862,15 +852,13 @@ impl TransactionSignedNoHash {
let mut recovered_signers: Vec<Address> = Vec::new();
let mut channels = Vec::new();
rayon::scope(|s| {
let (chunk_size, chunks) =
if num_txes < (rayon::current_num_threads() * rayon::current_num_threads()) {
(num_txes, 2)
} else {
let chunk_size = num_txes /
(rayon::current_num_threads() * rayon::current_num_threads());
let chunks = num_txes / chunk_size + 1;
(chunk_size, chunks)
};
let (chunk_size, chunks) = if num_txes < 16 {
(num_txes, 2)
} else {
let chunk_size = num_txes / (num_txes / 16);
let chunks = num_txes / chunk_size + 1;
(chunk_size, chunks)
};
let mut iter = txes.into_iter();
(0..chunks).for_each(|i| {
let chunk: Vec<&TransactionSignedNoHash> = if i == chunks - 1 {
Expand All @@ -884,10 +872,8 @@ impl TransactionSignedNoHash {
// This task will send the results through the channel after it recovered
// the senders.
s.spawn(move |_| {
let mut rlp_buf = Vec::with_capacity(128);
for tx in chunk {
rlp_buf.clear();
let recovery_result = tx.recover_signer_helper(&mut rlp_buf);
let recovery_result = tx.recover_signer();
let _ = recovered_senders_tx.send(recovery_result);
}
});
Expand Down Expand Up @@ -1052,16 +1038,6 @@ impl TransactionSigned {
self.signature.recover_signer(signature_hash)
}

/// Recover signer helper for parallel impl.
///
/// Returns `None` if the transaction's signature is invalid, see also [Self::recover_signer].
pub fn recover_signer_helper(&self, rlp_buf: &mut Vec<u8>) -> Option<Address> {
let tx = self;
tx.transaction.encode_without_signature(rlp_buf);

tx.signature.recover_signer(keccak256(rlp_buf))
}

/// Recovers a list of signers from a transaction list iterator
///
/// Returns `None`, if some transaction's signature is invalid, see also
Expand All @@ -1076,15 +1052,13 @@ impl TransactionSigned {
let mut recovered_signers: Vec<Address> = Vec::new();
let mut channels = Vec::new();
rayon::scope(|s| {
let (chunk_size, chunks) =
if num_txes < (rayon::current_num_threads() * rayon::current_num_threads()) {
(num_txes, 2)
} else {
let chunk_size = num_txes /
(rayon::current_num_threads() * rayon::current_num_threads());
let chunks = num_txes / chunk_size + 1;
(chunk_size, chunks)
};
let (chunk_size, chunks) = if num_txes < 16 {
(num_txes, 2)
} else {
let chunk_size = num_txes / (num_txes / 16);
let chunks = num_txes / chunk_size + 1;
(chunk_size, chunks)
};
let mut iter = txes.into_iter();
(0..chunks).for_each(|i| {
let chunk: Vec<&TransactionSigned> = if i == chunks - 1 {
Expand All @@ -1098,10 +1072,8 @@ impl TransactionSigned {
// This task will send the results through the channel after it recovered
// the senders.
s.spawn(move |_| {
let mut rlp_buf = Vec::with_capacity(128);
for tx in chunk {
rlp_buf.clear();
let recovery_result = tx.recover_signer_helper(&mut rlp_buf);
let recovery_result = tx.recover_signer();
let _ = recovered_senders_tx.send(recovery_result);
}
});
Expand Down Expand Up @@ -1363,7 +1335,7 @@ impl TransactionSigned {
/// To decode EIP-4844 transactions in `eth_sendRawTransaction`, use
/// [PooledTransactionsElement::decode_enveloped].
pub fn decode_enveloped(data: &mut &[u8]) -> alloy_rlp::Result<Self> {
if data.is_empty() {
if data.is_empty() {
return Err(RlpError::InputTooShort)
}

Expand Down

0 comments on commit ef45cad

Please sign in to comment.