Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to avoid broadcasting full blob txs #6835

Merged
merged 8 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
- Don't enable the BFT mining coordinator when running sub commands such as `blocks export` [#6675](https://github.com/hyperledger/besu/pull/6675)
- In JSON-RPC return optional `v` fields for type 1 and type 2 transactions [#6762](https://github.com/hyperledger/besu/pull/6762)
- Fix Shanghai/QBFT block import bug when syncing new nodes [#6765](https://github.com/hyperledger/besu/pull/6765)
- Fix to avoid broadcasting full blob txs, instead of only the tx announcement, to a subset of nodes [#6835](https://github.com/hyperledger/besu/pull/6835)

### Download Links

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.function.UnaryOperator;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class EthSendRawTransactionAcceptanceTest extends AcceptanceTestBase {
Expand All @@ -45,22 +44,27 @@ public void setUp() throws Exception {
strictNode = besu.createArchiveNode("strictNode", configureNode((true)));
miningNode = besu.createMinerNode("strictMiningNode", configureNode((true)));
cluster.start(lenientNode, strictNode, miningNode);
// verify all nodes are done syncing so the tx pool will be enabled
lenientNode.verify(eth.syncingStatus(false));
strictNode.verify(eth.syncingStatus(false));
miningNode.verify(eth.syncingStatus(false));

// verify nodes are fully connected otherwise tx could not be propagated
jflo marked this conversation as resolved.
Show resolved Hide resolved
lenientNode.verify(net.awaitPeerCount(2));
strictNode.verify(net.awaitPeerCount(2));
miningNode.verify(net.awaitPeerCount(2));

// verify that the miner started producing blocks and all other nodes are syncing from it
waitForBlockHeight(miningNode, 1);
final var minerChainHead = miningNode.execute(ethTransactions.block());
lenientNode.verify(blockchain.minimumHeight(minerChainHead.getNumber().longValue()));
strictNode.verify(blockchain.minimumHeight(minerChainHead.getNumber().longValue()));
}

@Test
@Disabled("flaky with timeout")
public void shouldSendSuccessfullyToLenientNodeWithoutChainId() {
final TransferTransaction tx = createTransactionWithoutChainId();
final String rawTx = tx.signedTransactionData();
final String txHash = tx.transactionHash();

lenientNode.verify(eth.expectSuccessfulEthRawTransaction(rawTx));

// this line is where the test is flaky
// Tx should be included on-chain
miningNode.verify(eth.expectSuccessfulTransactionReceipt(txHash));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,19 @@ public void setUp() throws Exception {
minerNode = besu.createMinerNode("miner-node1");
archiveNode = besu.createArchiveNode("full-node1");
cluster.start(minerNode, archiveNode);

// verify nodes are fully connected otherwise tx could not be propagated
minerNode.verify(net.awaitPeerCount(1));
archiveNode.verify(net.awaitPeerCount(1));

accountOne = accounts.createAccount("account-one");
minerWebSocket = new WebSocket(vertx, minerNode.getConfiguration());
archiveWebSocket = new WebSocket(vertx, archiveNode.getConfiguration());
// verify all nodes are done syncing so the tx pool will be enabled
archiveNode.verify(eth.syncingStatus(false));
minerNode.verify(eth.syncingStatus(false));

// verify that the miner started producing blocks and all other nodes are syncing from it
waitForBlockHeight(minerNode, 1);
final var minerChainHead = minerNode.execute(ethTransactions.block());
archiveNode.verify(blockchain.minimumHeight(minerChainHead.getNumber().longValue()));
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void sendTransactionHashesToPeer(final EthPeer peer) {
final Capability capability = peer.getConnection().capability(EthProtocol.NAME);
for (final List<Transaction> txBatch :
Iterables.partition(
transactionTracker.claimTransactionsToSendToPeer(peer), MAX_TRANSACTIONS_HASHES)) {
transactionTracker.claimTransactionHashesToSendToPeer(peer), MAX_TRANSACTIONS_HASHES)) {
try {
final List<Hash> txHashes = toHashList(txBatch);
LOG.atTrace()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 100_000;
private final Map<EthPeer, Set<Hash>> seenTransactions = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Transaction>> transactionsToSend = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Transaction>> transactionHashesToSend = new ConcurrentHashMap<>();

public void reset() {
seenTransactions.clear();
transactionsToSend.clear();
transactionHashesToSend.clear();
}

public synchronized void markTransactionsAsSeen(
Expand All @@ -55,6 +57,15 @@ public synchronized void addToPeerSendQueue(final EthPeer peer, final Transactio
}
}

public synchronized void addToPeerHashSendQueue(
final EthPeer peer, final Transaction transaction) {
if (!hasPeerSeenTransaction(peer, transaction)) {
transactionHashesToSend
.computeIfAbsent(peer, key -> createTransactionsSet())
.add(transaction);
}
}

public Iterable<EthPeer> getEthPeersWithUnsentTransactions() {
return transactionsToSend.keySet();
}
Expand All @@ -69,6 +80,16 @@ public synchronized Set<Transaction> claimTransactionsToSendToPeer(final EthPeer
}
}

public synchronized Set<Transaction> claimTransactionHashesToSendToPeer(final EthPeer peer) {
final Set<Transaction> transactionHashesToSend = this.transactionHashesToSend.remove(peer);
if (transactionHashesToSend != null) {
markTransactionHashesAsSeen(peer, toHashList(transactionHashesToSend));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should work, but i wonder if it would be more correct to mark them as seen after we know they've been sent to the peer. we could restore them to the cache in the exception handling of NewPooledTransactionHashesMessageSender:63

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, will review next

return transactionHashesToSend;
} else {
return emptySet();
}
}

public boolean hasSeenTransaction(final Hash txHash) {
return seenTransactions.values().stream().anyMatch(seen -> seen.contains(txHash));
}
Expand Down Expand Up @@ -100,5 +121,6 @@ protected boolean removeEldestEntry(final Map.Entry<T, Boolean> eldest) {
public void onDisconnect(final EthPeer peer) {
seenTransactions.remove(peer);
transactionsToSend.remove(peer);
transactionHashesToSend.remove(peer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,16 +49,33 @@ public class TransactionBroadcaster implements TransactionBatchAddedListener {
private final TransactionsMessageSender transactionsMessageSender;
private final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender;
private final EthContext ethContext;
private final Random random;

public TransactionBroadcaster(
final EthContext ethContext,
final PeerTransactionTracker transactionTracker,
final TransactionsMessageSender transactionsMessageSender,
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender) {
this(
ethContext,
transactionTracker,
transactionsMessageSender,
newPooledTransactionHashesMessageSender,
null);
}

@VisibleForTesting
protected TransactionBroadcaster(
final EthContext ethContext,
final PeerTransactionTracker transactionTracker,
final TransactionsMessageSender transactionsMessageSender,
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender,
final Long seed) {
this.transactionTracker = transactionTracker;
this.transactionsMessageSender = transactionsMessageSender;
this.newPooledTransactionHashesMessageSender = newPooledTransactionHashesMessageSender;
this.ethContext = ethContext;
this.random = seed != null ? new Random(seed) : new Random();
}

public void relayTransactionPoolTo(
Expand All @@ -65,7 +84,13 @@ public void relayTransactionPoolTo(
if (peer.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)) {
sendTransactionHashes(toTransactionList(pendingTransactions), List.of(peer));
} else {
sendFullTransactions(toTransactionList(pendingTransactions), List.of(peer));
// we need to exclude txs that support hash only broadcasting
final var fullBroadcastTxs =
pendingTransactions.stream()
.map(PendingTransaction::getTransaction)
.filter(tx -> !ANNOUNCE_HASH_ONLY_TX_TYPES.contains(tx.getType()))
.toList();
sendFullTransactions(fullBroadcastTxs, List.of(peer));
}
}
}
Expand All @@ -77,7 +102,7 @@ public void onTransactionsAdded(final Collection<Transaction> transactions) {
return;
}

final int numPeersToSendFullTransactions = (int) Math.ceil(Math.sqrt(currPeerCount));
final int numPeersToSendFullTransactions = (int) Math.round(Math.sqrt(currPeerCount));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows for rounding down as well as up, instead of ceil which is up only. It still works, but I'm unclear why it is necessary or preferred?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not necessary, just seems more appropriate to round instead of ceil, to avoid strange results when few peers are connected, for example with 2 peers, with round we have 1, otherwise is 2.


final Map<Boolean, List<Transaction>> transactionByBroadcastMode =
transactions.stream()
Expand Down Expand Up @@ -107,7 +132,7 @@ public void onTransactionsAdded(final Collection<Transaction> transactions) {
numPeersToSendFullTransactions - sendOnlyFullTransactionPeers.size(),
sendOnlyHashPeers.size());

Collections.shuffle(sendOnlyHashPeers);
Collections.shuffle(sendOnlyHashPeers, random);

// move peers from the mixed list to reach the required size for full transaction peers
movePeersBetweenLists(sendOnlyHashPeers, sendMixedPeers, delta);
Expand All @@ -121,7 +146,7 @@ public void onTransactionsAdded(final Collection<Transaction> transactions) {
.addArgument(sendOnlyHashPeers::size)
.addArgument(sendMixedPeers::size)
.addArgument(sendOnlyFullTransactionPeers)
.addArgument(() -> sendOnlyHashPeers.toString() + sendMixedPeers.toString())
.addArgument(() -> sendOnlyHashPeers.toString() + sendMixedPeers)
.log();

sendToFullTransactionsPeers(
Expand All @@ -141,7 +166,7 @@ private void sendToOnlyHashPeers(
final Map<Boolean, List<Transaction>> txsByHashOnlyBroadcast,
final List<EthPeer> hashOnlyPeers) {
final List<Transaction> allTransactions =
txsByHashOnlyBroadcast.values().stream().flatMap(List::stream).collect(Collectors.toList());
txsByHashOnlyBroadcast.values().stream().flatMap(List::stream).toList();

sendTransactionHashes(allTransactions, hashOnlyPeers);
}
Expand Down Expand Up @@ -175,7 +200,7 @@ private void sendTransactionHashes(
.forEach(
peer -> {
transactions.forEach(
transaction -> transactionTracker.addToPeerSendQueue(peer, transaction));
transaction -> transactionTracker.addToPeerHashSendQueue(peer, transaction));
ethContext
.getScheduler()
.scheduleSyncWorkerTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ public void setUp() {
@Test
public void shouldSendPendingTransactionsToEachPeer() throws Exception {

transactionTracker.addToPeerSendQueue(peer1, transaction1);
transactionTracker.addToPeerSendQueue(peer1, transaction2);
transactionTracker.addToPeerSendQueue(peer2, transaction3);
transactionTracker.addToPeerHashSendQueue(peer1, transaction1);
transactionTracker.addToPeerHashSendQueue(peer1, transaction2);
transactionTracker.addToPeerHashSendQueue(peer2, transaction3);

List.of(peer1, peer2).forEach(messageSender::sendTransactionHashesToPeer);

Expand All @@ -96,7 +96,8 @@ public void shouldSendTransactionsInBatchesWithLimit() throws Exception {
final Set<Transaction> transactions =
generator.transactions(6000).stream().collect(Collectors.toSet());

transactions.forEach(transaction -> transactionTracker.addToPeerSendQueue(peer1, transaction));
transactions.forEach(
transaction -> transactionTracker.addToPeerHashSendQueue(peer1, transaction));

messageSender.sendTransactionHashesToPeer(peer1);
final ArgumentCaptor<MessageData> messageDataArgumentCaptor =
Expand Down
Loading
Loading