Skip to content

Commit

Permalink
Introduce score for pending transactions
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
  • Loading branch information
fab-10 committed Jul 23, 2024
1 parent 39fa494 commit 97e32da
Show file tree
Hide file tree
Showing 16 changed files with 310 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.blockcreation.txselection;

import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.BLOCK_SELECTION_TIMEOUT;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.INVALID_TX_EVALUATION_TOO_LONG;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.SELECTED;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.TX_EVALUATION_TOO_LONG;

Expand Down Expand Up @@ -419,11 +420,14 @@ private TransactionSelectionResult handleTransactionNotSelected(

final var pendingTransaction = evaluationContext.getPendingTransaction();

// check if this tx took too much to evaluate, and in case remove it from the pool
// check if this tx took too much to evaluate, and in case it was invalid remove it from the
// pool, otherwise penalize it.
final TransactionSelectionResult actualResult =
isTimeout.get()
? transactionTookTooLong(evaluationContext)
? TX_EVALUATION_TOO_LONG
? selectionResult.discard()
? INVALID_TX_EVALUATION_TOO_LONG
: TX_EVALUATION_TOO_LONG
: BLOCK_SELECTION_TIMEOUT
: selectionResult;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
import static org.assertj.core.api.Assertions.entry;
import static org.awaitility.Awaitility.await;
import static org.hyperledger.besu.ethereum.core.MiningParameters.DEFAULT_NON_POA_BLOCK_TXS_SELECTION_MAX_TIME;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.NONCE_TOO_LOW;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.BLOCK_SELECTION_TIMEOUT;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.INVALID_TX_EVALUATION_TOO_LONG;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.PRIORITY_FEE_PER_GAS_BELOW_CURRENT_MIN;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.SELECTED;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.TX_EVALUATION_TOO_LONG;
Expand Down Expand Up @@ -296,7 +297,7 @@ public void invalidTransactionsAreSkippedButBlockStillFills() {
final Transaction tx = createTransaction(i, Wei.of(7), 100_000);
transactionsToInject.add(tx);
if (i == 1) {
ensureTransactionIsInvalid(tx, TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE);
ensureTransactionIsInvalid(tx, TransactionInvalidReason.NONCE_TOO_LOW);
} else {
ensureTransactionIsValid(tx);
}
Expand All @@ -311,8 +312,7 @@ public void invalidTransactionsAreSkippedButBlockStillFills() {
.containsOnly(
entry(
invalidTx,
TransactionSelectionResult.invalid(
TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE.name())));
TransactionSelectionResult.invalid(TransactionInvalidReason.NONCE_TOO_LOW.name())));
assertThat(results.getSelectedTransactions().size()).isEqualTo(4);
assertThat(results.getSelectedTransactions().contains(invalidTx)).isFalse();
assertThat(results.getReceipts().size()).isEqualTo(4);
Expand Down Expand Up @@ -568,8 +568,7 @@ public void shouldDiscardTransactionsThatFailValidation() {

ensureTransactionIsValid(validTransaction, 21_000, 0);
final Transaction invalidTransaction = createTransaction(3, Wei.of(10), 21_000);
ensureTransactionIsInvalid(
invalidTransaction, TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE);
ensureTransactionIsInvalid(invalidTransaction, TransactionInvalidReason.NONCE_TOO_LOW);

transactionPool.addRemoteTransactions(List.of(validTransaction, invalidTransaction));

Expand All @@ -582,8 +581,7 @@ public void shouldDiscardTransactionsThatFailValidation() {
.containsOnly(
entry(
invalidTransaction,
TransactionSelectionResult.invalid(
TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE.name())));
TransactionSelectionResult.invalid(TransactionInvalidReason.NONCE_TOO_LOW.name())));
}

@Test
Expand Down Expand Up @@ -948,7 +946,7 @@ public void subsetOfPendingTransactionsIncludedWhenTxSelectionMaxTimeIsOver(

@ParameterizedTest
@MethodSource("subsetOfPendingTransactionsIncludedWhenTxSelectionMaxTimeIsOver")
public void pendingTransactionsThatTakesTooLongToEvaluateIsDroppedFromThePool(
public void pendingTransactionsThatTakesTooLongToEvaluateIsPenalized(
final boolean isPoa,
final boolean preProcessingTooLate,
final boolean processingTooLate,
Expand All @@ -961,7 +959,7 @@ public void pendingTransactionsThatTakesTooLongToEvaluateIsDroppedFromThePool(
postProcessingTooLate,
900,
TX_EVALUATION_TOO_LONG,
true);
false);
}

private void internalBlockSelectionTimeoutSimulation(
Expand Down Expand Up @@ -1085,7 +1083,7 @@ public void subsetOfInvalidPendingTransactionsIncludedWhenTxSelectionMaxTimeIsOv
500,
BLOCK_SELECTION_TIMEOUT,
false,
UPFRONT_COST_EXCEEDS_BALANCE);
NONCE_TOO_LOW);
}

@ParameterizedTest
Expand All @@ -1102,9 +1100,9 @@ public void invalidPendingTransactionsThatTakesTooLongToEvaluateIsDroppedFromThe
processingTooLate,
postProcessingTooLate,
900,
TX_EVALUATION_TOO_LONG,
INVALID_TX_EVALUATION_TOO_LONG,
true,
UPFRONT_COST_EXCEEDS_BALANCE);
NONCE_TOO_LOW);
}

private void internalBlockSelectionTimeoutSimulationInvalidTxs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,20 @@ public abstract class PendingTransaction
private final Transaction transaction;
private final long addedAt;
private final long sequence; // Allows prioritization based on order transactions are added
private volatile byte score = Byte.MAX_VALUE;
private volatile byte score;

private int memorySize = NOT_INITIALIZED;

private PendingTransaction(
final Transaction transaction, final long addedAt, final long sequence) {
final Transaction transaction, final long addedAt, final long sequence, final byte score) {
this.transaction = transaction;
this.addedAt = addedAt;
this.sequence = sequence;
this.score = score;
}

private PendingTransaction(final Transaction transaction, final long addedAt) {
this(transaction, addedAt, TRANSACTIONS_ADDED.getAndIncrement());
this(transaction, addedAt, TRANSACTIONS_ADDED.getAndIncrement(), Byte.MAX_VALUE);
}

public static PendingTransaction newPendingTransaction(
Expand Down Expand Up @@ -301,13 +302,13 @@ public Local(final Transaction transaction) {
this(transaction, System.currentTimeMillis());
}

private Local(final long sequence, final Transaction transaction) {
super(transaction, System.currentTimeMillis(), sequence);
private Local(final long sequence, final byte score, final Transaction transaction) {
super(transaction, System.currentTimeMillis(), sequence, score);
}

@Override
public PendingTransaction detachedCopy() {
return new Local(getSequence(), getTransaction().detachedCopy());
return new Local(getSequence(), getScore(), getTransaction().detachedCopy());
}

@Override
Expand All @@ -329,13 +330,13 @@ public Priority(final Transaction transaction, final long addedAt) {
super(transaction, addedAt);
}

public Priority(final long sequence, final Transaction transaction) {
super(sequence, transaction);
public Priority(final long sequence, final byte score, final Transaction transaction) {
super(sequence, score, transaction);
}

@Override
public PendingTransaction detachedCopy() {
return new Priority(getSequence(), getTransaction().detachedCopy());
return new Priority(getSequence(), getScore(), getTransaction().detachedCopy());
}

@Override
Expand All @@ -355,13 +356,13 @@ public Remote(final Transaction transaction) {
this(transaction, System.currentTimeMillis());
}

private Remote(final long sequence, final Transaction transaction) {
super(transaction, System.currentTimeMillis(), sequence);
private Remote(final long sequence, final byte score, final Transaction transaction) {
super(transaction, System.currentTimeMillis(), sequence, score);
}

@Override
public PendingTransaction detachedCopy() {
return new Remote(getSequence(), getTransaction().detachedCopy());
return new Remote(getSequence(), getScore(), getTransaction().detachedCopy());
}

@Override
Expand All @@ -383,13 +384,13 @@ public Priority(final Transaction transaction, final long addedAt) {
super(transaction, addedAt);
}

public Priority(final long sequence, final Transaction transaction) {
super(sequence, transaction);
public Priority(final long sequence, final byte score, final Transaction transaction) {
super(sequence, score, transaction);
}

@Override
public PendingTransaction detachedCopy() {
return new Priority(getSequence(), getTransaction().detachedCopy());
return new Priority(getSequence(), getScore(), getTransaction().detachedCopy());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ public class TransactionPoolMetrics {
public static final String ADDED_COUNTER_NAME = "added_total";
public static final String REMOVED_COUNTER_NAME = "removed_total";
public static final String REJECTED_COUNTER_NAME = "rejected_total";
public static final String PENALIZED_COUNTER_NAME = "penalized_total";
public static final String EXPIRED_MESSAGES_COUNTER_NAME = "messages_expired_total";
private static final int SKIPPED_MESSAGES_LOGGING_THRESHOLD = 1000;
private final MetricsSystem metricsSystem;
private final LabelledMetric<Counter> addedCounter;
private final LabelledMetric<Counter> removedCounter;
private final LabelledMetric<Counter> rejectedCounter;
private final LabelledMetric<Counter> penalizedCounter;
private final LabelledGauge spaceUsed;
private final LabelledGauge transactionCount;
private final LabelledGauge transactionCountByType;
Expand Down Expand Up @@ -88,6 +90,15 @@ public TransactionPoolMetrics(final MetricsSystem metricsSystem) {
"reason",
"layer");

penalizedCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.TRANSACTION_POOL,
PENALIZED_COUNTER_NAME,
"Count of penalized transactions in the transaction pool",
"source",
"priority",
"layer");

spaceUsed =
metricsSystem.createLabelledGauge(
BesuMetricCategory.TRANSACTION_POOL,
Expand Down Expand Up @@ -246,6 +257,15 @@ public void incrementRejected(
.inc();
}

public void incrementPenalized(final PendingTransaction pendingTransaction, final String layer) {
penalizedCounter
.labels(
location(pendingTransaction.isReceivedFromLocalSource()),
priority(pendingTransaction.hasPriority()),
layer)
.inc();
}

public void incrementExpiredMessages(final String message) {
expiredMessagesCounter.labels(message).inc();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ protected void internalRemove(
orderByFee.remove(removedTx);
}

@Override
protected void internalPenalize(final PendingTransaction penalizedTx) {
orderByFee.remove(penalizedTx);
penalizedTx.decrementScore();
orderByFee.add(penalizedTx);
}

@Override
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter,
Expand Down Expand Up @@ -192,7 +199,21 @@ public List<SenderPendingTransactions> getBySender() {
.toList();
}

public Map<Byte, List<SenderPendingTransactions>> getByScore() {
/**
* Returns pending txs by sender and ordered by score desc. In case a sender has pending txs with
* different scores, then in nonce sequence, every time there is a score decrease, his pending txs
* will be put in a new entry with that score. For example if a sender has 3 pending txs (where
* the first number is the nonce and the score is between parenthesis): 0(127), 1(126), 2(127),
* then for he there will be 2 entries:
*
* <ul>
* <li>0(127)
* <li>1(126), 2(127)
* </ul>
*
* @return pending txs by sender and ordered by score desc
*/
public NavigableMap<Byte, List<SenderPendingTransactions>> getByScore() {
final var sendersToAdd = new HashSet<>(txsBySender.keySet());
return orderByFee.descendingSet().stream()
.map(PendingTransaction::getSender)
Expand All @@ -206,7 +227,8 @@ public Map<Byte, List<SenderPendingTransactions>> getByScore() {
a.addAll(b);
return a;
},
TreeMap::new));
TreeMap::new))
.descendingMap();
}

private Map<Byte, List<SenderPendingTransactions>> splitByScore(
Expand All @@ -216,15 +238,14 @@ private Map<Byte, List<SenderPendingTransactions>> splitByScore(
var currSplit = new ArrayList<PendingTransaction>();
for (final var entry : txsBySender.entrySet()) {
if (entry.getValue().getScore() < currScore) {
// score decreased, we need to start a new split
// score decreased, we need to save current split and start a new one
splitByScore
.computeIfAbsent(currScore, k -> new ArrayList<>())
.add(new SenderPendingTransactions(sender, currSplit));
currSplit = new ArrayList<>();
currScore = entry.getValue().getScore();
} else {
currSplit.add(entry.getValue());
}
currSplit.add(entry.getValue());
}
splitByScore
.computeIfAbsent(currScore, k -> new ArrayList<>())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,18 @@ final void promoteTransactions() {
}
}

@Override
public void penalize(final PendingTransaction penalizedTransaction) {
if (pendingTransactions.containsKey(penalizedTransaction.getHash())) {
internalPenalize(penalizedTransaction);
metrics.incrementPenalized(penalizedTransaction, name());
} else {
nextLayer.penalize(penalizedTransaction);
}
}

protected abstract void internalPenalize(final PendingTransaction pendingTransaction);

/**
* How many txs of a specified type can be promoted? This make sense when a max number of txs of a
* type can be included in a single block (ex. blob txs), to avoid filling the layer with more txs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
Expand Down Expand Up @@ -196,8 +195,8 @@ protected String internalLogStats() {
return "Basefee Prioritized: Empty";
}

final Transaction highest = orderByFee.last().getTransaction();
final Transaction lowest = orderByFee.first().getTransaction();
final PendingTransaction highest = orderByFee.last();
final PendingTransaction lowest = orderByFee.first();

return "Basefee Prioritized: "
+ "count: "
Expand All @@ -206,16 +205,26 @@ protected String internalLogStats() {
+ spaceUsed
+ ", unique senders: "
+ txsBySender.size()
+ ", highest priority tx: [max fee: "
+ highest.getMaxGasPrice().toHumanReadableString()
+ ", highest priority tx: [score: "
+ highest.getScore()
+ ", max fee: "
+ highest.getTransaction().getMaxGasPrice().toHumanReadableString()
+ ", curr prio fee: "
+ highest.getEffectivePriorityFeePerGas(nextBlockBaseFee).toHumanReadableString()
+ highest
.getTransaction()
.getEffectivePriorityFeePerGas(nextBlockBaseFee)
.toHumanReadableString()
+ ", hash: "
+ highest.getHash()
+ "], lowest priority tx: [max fee: "
+ lowest.getMaxGasPrice().toHumanReadableString()
+ "], lowest priority tx: [score: "
+ lowest.getScore()
+ ", max fee: "
+ lowest.getTransaction().getMaxGasPrice().toHumanReadableString()
+ ", curr prio fee: "
+ lowest.getEffectivePriorityFeePerGas(nextBlockBaseFee).toHumanReadableString()
+ lowest
.getTransaction()
.getEffectivePriorityFeePerGas(nextBlockBaseFee)
.toHumanReadableString()
+ ", hash: "
+ lowest.getHash()
+ "], next block base fee: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public TransactionAddedResult add(final PendingTransaction pendingTransaction, f
@Override
public void remove(final PendingTransaction pendingTransaction, final RemovalReason reason) {}

@Override
public void penalize(final PendingTransaction penalizedTx) {}

@Override
public void blockAdded(
final FeeMarket feeMarket,
Expand Down
Loading

0 comments on commit 97e32da

Please sign in to comment.