Skip to content

Commit

Permalink
Penalize invalid transient pending transactions in the layered transa…
Browse files Browse the repository at this point in the history
…ction pool (hyperledger#7359)

* Introduce score for pending transactions

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Introduce score for pending transactions

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Update package javadoc

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

---------

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
Signed-off-by: gconnect <agatevureglory@gmail.com>
  • Loading branch information
fab-10 authored and gconnect committed Aug 26, 2024
1 parent 9e875ec commit 6130d9c
Show file tree
Hide file tree
Showing 18 changed files with 480 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.blockcreation.txselection;

import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.BLOCK_SELECTION_TIMEOUT;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.INVALID_TX_EVALUATION_TOO_LONG;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.SELECTED;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.TX_EVALUATION_TOO_LONG;

Expand Down Expand Up @@ -419,11 +420,14 @@ private TransactionSelectionResult handleTransactionNotSelected(

final var pendingTransaction = evaluationContext.getPendingTransaction();

// check if this tx took too much to evaluate, and in case remove it from the pool
// check if this tx took too much to evaluate, and in case it was invalid remove it from the
// pool, otherwise penalize it.
final TransactionSelectionResult actualResult =
isTimeout.get()
? transactionTookTooLong(evaluationContext)
? TX_EVALUATION_TOO_LONG
? transactionTookTooLong(evaluationContext, selectionResult)
? selectionResult.discard()
? INVALID_TX_EVALUATION_TOO_LONG
: TX_EVALUATION_TOO_LONG
: BLOCK_SELECTION_TIMEOUT
: selectionResult;

Expand All @@ -441,16 +445,21 @@ private TransactionSelectionResult handleTransactionNotSelected(
return actualResult;
}

private boolean transactionTookTooLong(final TransactionEvaluationContext evaluationContext) {
private boolean transactionTookTooLong(
final TransactionEvaluationContext evaluationContext,
final TransactionSelectionResult selectionResult) {
final var evaluationTimer = evaluationContext.getEvaluationTimer();
if (evaluationTimer.elapsed(TimeUnit.MILLISECONDS) > blockTxsSelectionMaxTime) {
LOG.atWarn()
.setMessage(
"Transaction {} is too late for inclusion, evaluated in {} that is over the max limit of {}ms"
+ ", removing it from the pool")
"Transaction {} is too late for inclusion, with result {}, evaluated in {} that is over the max limit of {}ms"
+ ", {}")
.addArgument(evaluationContext.getPendingTransaction()::getHash)
.addArgument(selectionResult)
.addArgument(evaluationTimer)
.addArgument(blockTxsSelectionMaxTime)
.addArgument(
selectionResult.discard() ? "removing it from the pool" : "penalizing it in the pool")
.log();
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ private TransactionSelectionResult transactionSelectionResultForInvalidResult(
* @return True if the invalid reason is transient, false otherwise.
*/
private boolean isTransientValidationError(final TransactionInvalidReason invalidReason) {
return invalidReason.equals(TransactionInvalidReason.GAS_PRICE_BELOW_CURRENT_BASE_FEE)
return invalidReason.equals(TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE)
|| invalidReason.equals(TransactionInvalidReason.GAS_PRICE_BELOW_CURRENT_BASE_FEE)
|| invalidReason.equals(TransactionInvalidReason.NONCE_TOO_HIGH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
import static org.assertj.core.api.Assertions.entry;
import static org.awaitility.Awaitility.await;
import static org.hyperledger.besu.ethereum.core.MiningParameters.DEFAULT_NON_POA_BLOCK_TXS_SELECTION_MAX_TIME;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.NONCE_TOO_LOW;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.BLOCK_SELECTION_TIMEOUT;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.INVALID_TX_EVALUATION_TOO_LONG;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.PRIORITY_FEE_PER_GAS_BELOW_CURRENT_MIN;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.SELECTED;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.TX_EVALUATION_TOO_LONG;
Expand Down Expand Up @@ -296,7 +297,7 @@ public void invalidTransactionsAreSkippedButBlockStillFills() {
final Transaction tx = createTransaction(i, Wei.of(7), 100_000);
transactionsToInject.add(tx);
if (i == 1) {
ensureTransactionIsInvalid(tx, TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE);
ensureTransactionIsInvalid(tx, TransactionInvalidReason.NONCE_TOO_LOW);
} else {
ensureTransactionIsValid(tx);
}
Expand All @@ -311,8 +312,7 @@ public void invalidTransactionsAreSkippedButBlockStillFills() {
.containsOnly(
entry(
invalidTx,
TransactionSelectionResult.invalid(
TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE.name())));
TransactionSelectionResult.invalid(TransactionInvalidReason.NONCE_TOO_LOW.name())));
assertThat(results.getSelectedTransactions().size()).isEqualTo(4);
assertThat(results.getSelectedTransactions().contains(invalidTx)).isFalse();
assertThat(results.getReceipts().size()).isEqualTo(4);
Expand Down Expand Up @@ -568,8 +568,7 @@ public void shouldDiscardTransactionsThatFailValidation() {

ensureTransactionIsValid(validTransaction, 21_000, 0);
final Transaction invalidTransaction = createTransaction(3, Wei.of(10), 21_000);
ensureTransactionIsInvalid(
invalidTransaction, TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE);
ensureTransactionIsInvalid(invalidTransaction, TransactionInvalidReason.NONCE_TOO_LOW);

transactionPool.addRemoteTransactions(List.of(validTransaction, invalidTransaction));

Expand All @@ -582,8 +581,7 @@ public void shouldDiscardTransactionsThatFailValidation() {
.containsOnly(
entry(
invalidTransaction,
TransactionSelectionResult.invalid(
TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE.name())));
TransactionSelectionResult.invalid(TransactionInvalidReason.NONCE_TOO_LOW.name())));
}

@Test
Expand Down Expand Up @@ -948,7 +946,7 @@ public void subsetOfPendingTransactionsIncludedWhenTxSelectionMaxTimeIsOver(

@ParameterizedTest
@MethodSource("subsetOfPendingTransactionsIncludedWhenTxSelectionMaxTimeIsOver")
public void pendingTransactionsThatTakesTooLongToEvaluateIsDroppedFromThePool(
public void pendingTransactionsThatTakesTooLongToEvaluateIsPenalized(
final boolean isPoa,
final boolean preProcessingTooLate,
final boolean processingTooLate,
Expand All @@ -961,7 +959,7 @@ public void pendingTransactionsThatTakesTooLongToEvaluateIsDroppedFromThePool(
postProcessingTooLate,
900,
TX_EVALUATION_TOO_LONG,
true);
false);
}

private void internalBlockSelectionTimeoutSimulation(
Expand Down Expand Up @@ -1085,7 +1083,7 @@ public void subsetOfInvalidPendingTransactionsIncludedWhenTxSelectionMaxTimeIsOv
500,
BLOCK_SELECTION_TIMEOUT,
false,
UPFRONT_COST_EXCEEDS_BALANCE);
NONCE_TOO_LOW);
}

@ParameterizedTest
Expand All @@ -1102,9 +1100,9 @@ public void invalidPendingTransactionsThatTakesTooLongToEvaluateIsDroppedFromThe
processingTooLate,
postProcessingTooLate,
900,
TX_EVALUATION_TOO_LONG,
INVALID_TX_EVALUATION_TOO_LONG,
true,
UPFRONT_COST_EXCEEDS_BALANCE);
NONCE_TOO_LOW);
}

private void internalBlockSelectionTimeoutSimulationInvalidTxs(
Expand Down Expand Up @@ -1423,15 +1421,17 @@ protected MiningParameters createMiningParameters(

private static class PluginTransactionSelectionResult extends TransactionSelectionResult {
private enum PluginStatus implements Status {
PLUGIN_INVALID(false, true),
PLUGIN_INVALID_TRANSIENT(false, false);
PLUGIN_INVALID(false, true, false),
PLUGIN_INVALID_TRANSIENT(false, false, true);

private final boolean stop;
private final boolean discard;
private final boolean penalize;

PluginStatus(final boolean stop, final boolean discard) {
PluginStatus(final boolean stop, final boolean discard, final boolean penalize) {
this.stop = stop;
this.discard = discard;
this.penalize = penalize;
}

@Override
Expand All @@ -1443,6 +1443,11 @@ public boolean stop() {
public boolean discard() {
return discard;
}

@Override
public boolean penalize() {
return penalize;
}
}

public static final TransactionSelectionResult GENERIC_PLUGIN_INVALID_TRANSIENT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,20 @@ public abstract class PendingTransaction
private final Transaction transaction;
private final long addedAt;
private final long sequence; // Allows prioritization based on order transactions are added
private volatile byte score;

private int memorySize = NOT_INITIALIZED;

private PendingTransaction(
final Transaction transaction, final long addedAt, final long sequence) {
final Transaction transaction, final long addedAt, final long sequence, final byte score) {
this.transaction = transaction;
this.addedAt = addedAt;
this.sequence = sequence;
this.score = score;
}

private PendingTransaction(final Transaction transaction, final long addedAt) {
this(transaction, addedAt, TRANSACTIONS_ADDED.getAndIncrement());
this(transaction, addedAt, TRANSACTIONS_ADDED.getAndIncrement(), Byte.MAX_VALUE);
}

public static PendingTransaction newPendingTransaction(
Expand Down Expand Up @@ -123,6 +125,20 @@ public int memorySize() {
return memorySize;
}

public byte getScore() {
return score;
}

public void decrementScore() {
// use temp var to avoid non-atomic update of volatile var
final byte newScore = (byte) (score - 1);

// check to avoid underflow
if (newScore < score) {
score = newScore;
}
}

public abstract PendingTransaction detachedCopy();

private int computeMemorySize() {
Expand Down Expand Up @@ -255,6 +271,8 @@ public String toString() {
+ isReceivedFromLocalSource()
+ ", hasPriority="
+ hasPriority()
+ ", score="
+ score
+ '}';
}

Expand All @@ -267,6 +285,8 @@ public String toTraceLog() {
+ isReceivedFromLocalSource()
+ ", hasPriority="
+ hasPriority()
+ ", score="
+ score
+ ", "
+ transaction.toTraceLog()
+ "}";
Expand All @@ -282,13 +302,13 @@ public Local(final Transaction transaction) {
this(transaction, System.currentTimeMillis());
}

private Local(final long sequence, final Transaction transaction) {
super(transaction, System.currentTimeMillis(), sequence);
private Local(final long sequence, final byte score, final Transaction transaction) {
super(transaction, System.currentTimeMillis(), sequence, score);
}

@Override
public PendingTransaction detachedCopy() {
return new Local(getSequence(), getTransaction().detachedCopy());
return new Local(getSequence(), getScore(), getTransaction().detachedCopy());
}

@Override
Expand All @@ -310,13 +330,13 @@ public Priority(final Transaction transaction, final long addedAt) {
super(transaction, addedAt);
}

public Priority(final long sequence, final Transaction transaction) {
super(sequence, transaction);
public Priority(final long sequence, final byte score, final Transaction transaction) {
super(sequence, score, transaction);
}

@Override
public PendingTransaction detachedCopy() {
return new Priority(getSequence(), getTransaction().detachedCopy());
return new Priority(getSequence(), getScore(), getTransaction().detachedCopy());
}

@Override
Expand All @@ -336,13 +356,13 @@ public Remote(final Transaction transaction) {
this(transaction, System.currentTimeMillis());
}

private Remote(final long sequence, final Transaction transaction) {
super(transaction, System.currentTimeMillis(), sequence);
private Remote(final long sequence, final byte score, final Transaction transaction) {
super(transaction, System.currentTimeMillis(), sequence, score);
}

@Override
public PendingTransaction detachedCopy() {
return new Remote(getSequence(), getTransaction().detachedCopy());
return new Remote(getSequence(), getScore(), getTransaction().detachedCopy());
}

@Override
Expand All @@ -364,13 +384,13 @@ public Priority(final Transaction transaction, final long addedAt) {
super(transaction, addedAt);
}

public Priority(final long sequence, final Transaction transaction) {
super(sequence, transaction);
public Priority(final long sequence, final byte score, final Transaction transaction) {
super(sequence, score, transaction);
}

@Override
public PendingTransaction detachedCopy() {
return new Priority(getSequence(), getTransaction().detachedCopy());
return new Priority(getSequence(), getScore(), getTransaction().detachedCopy());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ public class TransactionPoolMetrics {
public static final String ADDED_COUNTER_NAME = "added_total";
public static final String REMOVED_COUNTER_NAME = "removed_total";
public static final String REJECTED_COUNTER_NAME = "rejected_total";
public static final String PENALIZED_COUNTER_NAME = "penalized_total";
public static final String EXPIRED_MESSAGES_COUNTER_NAME = "messages_expired_total";
private static final int SKIPPED_MESSAGES_LOGGING_THRESHOLD = 1000;
private final MetricsSystem metricsSystem;
private final LabelledMetric<Counter> addedCounter;
private final LabelledMetric<Counter> removedCounter;
private final LabelledMetric<Counter> rejectedCounter;
private final LabelledMetric<Counter> penalizedCounter;
private final LabelledGauge spaceUsed;
private final LabelledGauge transactionCount;
private final LabelledGauge transactionCountByType;
Expand Down Expand Up @@ -88,6 +90,15 @@ public TransactionPoolMetrics(final MetricsSystem metricsSystem) {
"reason",
"layer");

penalizedCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.TRANSACTION_POOL,
PENALIZED_COUNTER_NAME,
"Count of penalized transactions in the transaction pool",
"source",
"priority",
"layer");

spaceUsed =
metricsSystem.createLabelledGauge(
BesuMetricCategory.TRANSACTION_POOL,
Expand Down Expand Up @@ -246,6 +257,15 @@ public void incrementRejected(
.inc();
}

public void incrementPenalized(final PendingTransaction pendingTransaction, final String layer) {
penalizedCounter
.labels(
location(pendingTransaction.isReceivedFromLocalSource()),
priority(pendingTransaction.hasPriority()),
layer)
.inc();
}

public void incrementExpiredMessages(final String message) {
expiredMessagesCounter.labels(message).inc();
}
Expand Down
Loading

0 comments on commit 6130d9c

Please sign in to comment.