Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Layered txpool: do not send notifications when moving tx between layers #7539

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
- Correctly drops messages that exceeds local message size limit [#5455](https://github.com/hyperledger/besu/pull/7507)
- **DebugMetrics**: Fixed a `ClassCastException` occurring in `DebugMetrics` when handling nested metric structures. Previously, `Double` values within these structures were incorrectly cast to `Map` objects, leading to errors. This update allows for proper handling of both direct values and nested structures at the same level. Issue# [#7383](https://github.com/hyperledger/besu/pull/7383)
- `evmtool` was not respecting the `--genesis` setting, resulting in unexpected trace results. [#7433](https://github.com/hyperledger/besu/pull/7433)
- Layered txpool: do not send notifications when moving tx between layers [#7539](https://github.com/hyperledger/besu/pull/7539)

## 24.8.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.eth.transactions;

import org.hyperledger.besu.datatypes.TransactionType;
import org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason;
import org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.ReplaceableDoubleSupplier;
Expand Down Expand Up @@ -68,6 +69,7 @@ public TransactionPoolMetrics(final MetricsSystem metricsSystem) {
"Count of transactions added to the transaction pool",
"source",
"priority",
"reason",
"layer");

removedCounter =
Expand Down Expand Up @@ -215,11 +217,13 @@ public void initExpiredMessagesCounter(final String message) {
SKIPPED_MESSAGES_LOGGING_THRESHOLD));
}

public void incrementAdded(final PendingTransaction pendingTransaction, final String layer) {
public void incrementAdded(
final PendingTransaction pendingTransaction, final AddReason addReason, final String layer) {
addedCounter
.labels(
location(pendingTransaction.isReceivedFromLocalSource()),
priority(pendingTransaction.hasPriority()),
addReason.label(),
layer)
.inc();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.FOLLOW_INVALIDATED;

Expand Down Expand Up @@ -77,7 +78,7 @@ private void pushDown(
senderTxs.remove(txToRemove.getNonce());
processRemove(senderTxs, txToRemove.getTransaction(), FOLLOW_INVALIDATED);
})
.forEach(followingTx -> nextLayer.add(followingTx, gap));
.forEach(followingTx -> nextLayer.add(followingTx, gap, MOVE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.TRY_NEXT_LAYER;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.CONFIRMED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.CROSS_LAYER_REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.PROMOTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.REPLACED;

import org.hyperledger.besu.datatypes.Address;
Expand Down Expand Up @@ -169,7 +169,8 @@ protected abstract TransactionAddedResult canAdd(
final PendingTransaction pendingTransaction, final int gap);

@Override
public TransactionAddedResult add(final PendingTransaction pendingTransaction, final int gap) {
public TransactionAddedResult add(
final PendingTransaction pendingTransaction, final int gap, final AddReason addReason) {

// is replacing an existing one?
TransactionAddedResult addStatus = maybeReplaceTransaction(pendingTransaction);
Expand All @@ -178,21 +179,26 @@ public TransactionAddedResult add(final PendingTransaction pendingTransaction, f
}

if (addStatus.equals(TRY_NEXT_LAYER)) {
return addToNextLayer(pendingTransaction, gap);
return addToNextLayer(pendingTransaction, gap, addReason);
}

if (addStatus.isSuccess()) {
processAdded(pendingTransaction.detachedCopy());
final var addedPendingTransaction =
addReason.makeCopy() ? pendingTransaction.detachedCopy() : pendingTransaction;
processAdded(addedPendingTransaction, addReason);
addStatus.maybeReplacedTransaction().ifPresent(this::replaced);

nextLayer.notifyAdded(pendingTransaction);
nextLayer.notifyAdded(addedPendingTransaction);

if (!maybeFull()) {
// if there is space try to see if the added tx filled some gaps
tryFillGap(addStatus, pendingTransaction, getRemainingPromotionsPerType());
tryFillGap(addStatus, addedPendingTransaction, getRemainingPromotionsPerType());
}

if (addReason.sendNotification()) {
ethScheduler.scheduleTxWorkerTask(() -> notifyTransactionAdded(addedPendingTransaction));
}

ethScheduler.scheduleTxWorkerTask(() -> notifyTransactionAdded(pendingTransaction));
} else {
final var rejectReason = addStatus.maybeInvalidReason().orElseThrow();
metrics.incrementRejected(pendingTransaction, rejectReason, name());
Expand Down Expand Up @@ -238,7 +244,7 @@ private void tryFillGap(
pendingTransaction.getNonce(),
remainingPromotionsPerType);
if (promotedTx != null) {
processAdded(promotedTx);
processAdded(promotedTx, AddReason.PROMOTED);
if (!maybeFull()) {
tryFillGap(ADDED, promotedTx, remainingPromotionsPerType);
}
Expand Down Expand Up @@ -286,7 +292,7 @@ public PendingTransaction promoteFor(

if (remainingPromotionsPerType[txType.ordinal()] > 0) {
senderTxs.pollFirstEntry();
processRemove(senderTxs, candidateTx.getTransaction(), PROMOTED);
processRemove(senderTxs, candidateTx.getTransaction(), RemovalReason.PROMOTED);
metrics.incrementRemoved(candidateTx, "promoted", name());

if (senderTxs.isEmpty()) {
Expand All @@ -302,32 +308,34 @@ public PendingTransaction promoteFor(
}

private TransactionAddedResult addToNextLayer(
final PendingTransaction pendingTransaction, final int distance) {
final PendingTransaction pendingTransaction, final int distance, final AddReason addReason) {
return addToNextLayer(
txsBySender.getOrDefault(pendingTransaction.getSender(), EMPTY_SENDER_TXS),
pendingTransaction,
distance);
distance,
addReason);
}

protected TransactionAddedResult addToNextLayer(
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction pendingTransaction,
final int distance) {
final int distance,
final AddReason addReason) {
final int nextLayerDistance;
if (senderTxs.isEmpty()) {
nextLayerDistance = distance;
} else {
nextLayerDistance = (int) (pendingTransaction.getNonce() - (senderTxs.lastKey() + 1));
}
return nextLayer.add(pendingTransaction, nextLayerDistance);
return nextLayer.add(pendingTransaction, nextLayerDistance, addReason);
}

private void processAdded(final PendingTransaction addedTx) {
private void processAdded(final PendingTransaction addedTx, final AddReason addReason) {
pendingTransactions.put(addedTx.getHash(), addedTx);
final var senderTxs = txsBySender.computeIfAbsent(addedTx.getSender(), s -> new TreeMap<>());
senderTxs.put(addedTx.getNonce(), addedTx);
increaseCounters(addedTx);
metrics.incrementAdded(addedTx, name());
metrics.incrementAdded(addedTx, addReason, name());
internalAdd(senderTxs, addedTx);
}

Expand All @@ -353,7 +361,7 @@ private void evict(final long spaceToFree, final int txsToEvict) {
++evictedCount;
evictedSize += lastTx.memorySize();
// evicted can always be added to the next layer
addToNextLayer(lessReadySenderTxs, lastTx, 0);
addToNextLayer(lessReadySenderTxs, lastTx, 0, MOVE);
}

if (lessReadySenderTxs.isEmpty()) {
Expand Down Expand Up @@ -459,7 +467,7 @@ final void promoteTransactions() {
nextLayer
.promote(
this::promotionFilter, cacheFreeSpace(), freeSlots, getRemainingPromotionsPerType())
.forEach(this::processAdded);
.forEach(addedTx -> processAdded(addedTx, AddReason.PROMOTED));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.BELOW_BASE_FEE;

import org.hyperledger.besu.datatypes.Wei;
Expand Down Expand Up @@ -133,7 +134,7 @@ protected void internalBlockAdded(final BlockHeader blockHeader, final FeeMarket
.addArgument(newNextBlockBaseFee::toHumanReadableString)
.log();
processEvict(senderTxs, demoteTx, BELOW_BASE_FEE);
addToNextLayer(senderTxs, demoteTx, 0);
addToNextLayer(senderTxs, demoteTx, 0, MOVE);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public List<PendingTransaction> getAll() {
}

@Override
public TransactionAddedResult add(final PendingTransaction pendingTransaction, final int gap) {
public TransactionAddedResult add(
final PendingTransaction pendingTransaction, final int gap, final AddReason reason) {
notifyTransactionDropped(pendingTransaction);
metrics.incrementRemoved(pendingTransaction, DROPPED.label(), name());
++droppedCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.INTERNAL_ERROR;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.NEW;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.RECONCILED;

Expand Down Expand Up @@ -100,7 +101,7 @@ public synchronized TransactionAddedResult addTransaction(
}

try {
return prioritizedTransactions.add(pendingTransaction, (int) nonceDistance);
return prioritizedTransactions.add(pendingTransaction, (int) nonceDistance, NEW);
} catch (final Throwable throwable) {
return reconcileAndRetryAdd(
pendingTransaction, stateSenderNonce, (int) nonceDistance, throwable);
Expand All @@ -123,7 +124,7 @@ private TransactionAddedResult reconcileAndRetryAdd(
.log();
reconcileSender(pendingTransaction.getSender(), stateSenderNonce);
try {
return prioritizedTransactions.add(pendingTransaction, nonceDistance);
return prioritizedTransactions.add(pendingTransaction, nonceDistance, NEW);
} catch (final Throwable throwable2) {
// the error should have been solved by the reconcile, logging at higher level now
LOG.atWarn()
Expand Down Expand Up @@ -210,7 +211,7 @@ private void reconcileSender(final Address sender, final long stateSenderNonce)
final long lowestNonce = reAddTxs.getFirst().getNonce();
final int newNonceDistance = (int) Math.max(0, lowestNonce - stateSenderNonce);

reAddTxs.forEach(ptx -> prioritizedTransactions.add(ptx, newNonceDistance));
reAddTxs.forEach(ptx -> prioritizedTransactions.add(ptx, newNonceDistance, NEW));
}

LOG.atDebug()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,19 @@ public interface TransactionsLayer {

boolean contains(Transaction transaction);

TransactionAddedResult add(PendingTransaction pendingTransaction, int gap);
/**
* Try to add a pending transaction to this layer. The {@code addReason} is used to discriminate
* between a new tx that is added to the pool, or a tx that is already in the pool, but is moving
* internally between layers, for example, due to a promotion or demotion. The distinction is
* needed since we only need to send a notification for a new tx, and not when it is only an
* internal move.
*
* @param pendingTransaction the tx to try to add to this layer
* @param gap the nonce gap between the current sender nonce and the tx
* @param addReason define if it is a new tx or an internal move
* @return the result of the add operation
*/
TransactionAddedResult add(PendingTransaction pendingTransaction, int gap, AddReason addReason);

void remove(PendingTransaction pendingTransaction, RemovalReason reason);

Expand Down Expand Up @@ -108,6 +120,49 @@ List<PendingTransaction> promote(

String logSender(Address sender);

/** Describe why we are trying to add a tx to a layer. */
enum AddReason {
/** When adding a tx, that is not present in the pool. */
NEW(true, true),
/** When adding a tx as result of an internal move between layers. */
MOVE(false, false),
/** When adding a tx as result of a promotion from a lower layer. */
PROMOTED(false, false);

private final boolean sendNotification;
private final boolean makeCopy;
private final String label;

AddReason(final boolean sendNotification, final boolean makeCopy) {
this.sendNotification = sendNotification;
this.makeCopy = makeCopy;
this.label = name().toLowerCase(Locale.ROOT);
}

/**
* Should we send add notification for this reason?
*
* @return true if notification should be sent
*/
public boolean sendNotification() {
return sendNotification;
}

/**
* Should the layer make a copy of the pending tx before adding it, to avoid keeping reference
* to potentially large underlying byte buffers?
*
* @return true is a copy is necessary
*/
public boolean makeCopy() {
return makeCopy;
}

public String label() {
return label;
}
}

enum RemovalReason {
CONFIRMED,
CROSS_LAYER_REPLACED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ADDED;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.DROPPED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.NEW;

import org.hyperledger.besu.datatypes.TransactionType;
import org.hyperledger.besu.datatypes.Wei;
Expand Down Expand Up @@ -169,7 +170,7 @@ protected void shouldPrioritizeValueThenTimeAddedToPool(
.mapToObj(
i -> {
final var lowPriceTx = lowValueTxSupplier.next();
final var prioritizeResult = transactions.add(lowPriceTx, 0);
final var prioritizeResult = transactions.add(lowPriceTx, 0, NEW);

assertThat(prioritizeResult).isEqualTo(ADDED);
assertThat(evictCollector.getEvictedTransactions()).isEmpty();
Expand All @@ -180,7 +181,7 @@ protected void shouldPrioritizeValueThenTimeAddedToPool(
assertThat(transactions.count()).isEqualTo(MAX_TRANSACTIONS);

// This should kick the oldest tx with the low gas price out, namely the first one we added
final var highValuePrioRes = transactions.add(highValueTx, 0);
final var highValuePrioRes = transactions.add(highValueTx, 0, NEW);
assertThat(highValuePrioRes).isEqualTo(ADDED);
assertEvicted(expectedDroppedTx);

Expand All @@ -195,7 +196,7 @@ protected TransactionAddedResult prioritizeTransaction(final Transaction tx) {
}

protected TransactionAddedResult prioritizeTransaction(final PendingTransaction tx) {
return transactions.add(tx, 0);
return transactions.add(tx, 0, NEW);
}

protected void assertTransactionPrioritized(final PendingTransaction tx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactions;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason;
import org.hyperledger.besu.evm.account.Account;
import org.hyperledger.besu.metrics.StubMetricsSystem;
import org.hyperledger.besu.testutil.DeterministicEthScheduler;
Expand Down Expand Up @@ -258,9 +259,10 @@ protected void addLocalTransactions(
}
}

protected long getAddedCount(final String source, final String priority, final String layer) {
protected long getAddedCount(
final String source, final String priority, final AddReason addReason, final String layer) {
return metricsSystem.getCounterValue(
TransactionPoolMetrics.ADDED_COUNTER_NAME, source, priority, layer);
TransactionPoolMetrics.ADDED_COUNTER_NAME, source, priority, addReason.label(), layer);
}

protected long getRemovedCount(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ public String name() {
}

@Override
public TransactionAddedResult add(final PendingTransaction pendingTransaction, final int gap) {
final var res = super.add(pendingTransaction, gap);
public TransactionAddedResult add(
final PendingTransaction pendingTransaction, final int gap, final AddReason addReason) {
final var res = super.add(pendingTransaction, gap, addReason);
evictedTxs.add(pendingTransaction);
return res;
}
Expand Down
Loading
Loading