Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process onBlockAdded event asyncronously #5909

Merged
merged 5 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- Add access to an immutable world view to start/end transaction hooks in the tracing API[#5836](https://github.com/hyperledger/besu/pull/5836)
- Layered transaction pool implementation is now stable and enabled by default. If you want still to use the legacy implementation, use `--tx-pool=legacy` [#5772](https://github.com/hyperledger/besu)
- Tune G1GC to reduce Besu memory footprint, and new `besu-untuned` start scripts to run without any specific G1GC flags [#5879](https://github.com/hyperledger/besu/pull/5879)
- Reduce `engine_forkchoiceUpdatedV?` response time by asynchronously process block added events in the transaction pool [#5909](https://github.com/hyperledger/besu/pull/5909)

### Bug Fixes
- do not create ignorable storage on revert storage-variables subcommand [#5830](https://github.com/hyperledger/besu/pull/5830)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public EthScheduler(
metricsSystem),
MonitoredExecutors.newCachedThreadPool(
EthScheduler.class.getSimpleName() + "-Services", metricsSystem),
MonitoredExecutors.newBoundedThreadPool(
MonitoredExecutors.newFixedThreadPool(
EthScheduler.class.getSimpleName() + "-Computation",
1,
computationWorkerCount,
Expand Down Expand Up @@ -133,6 +133,10 @@ public void scheduleTxWorkerTask(final Runnable command) {
txWorkerExecutor.execute(command);
}

public <T> CompletableFuture<T> scheduleServiceTask(final Supplier<T> task) {
return CompletableFuture.supplyAsync(task, servicesExecutor);
}

public <T> CompletableFuture<T> scheduleServiceTask(final EthTask<T> task) {
final CompletableFuture<T> serviceFuture = task.runAsync(servicesExecutor);
pendingFutures.add(serviceFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -104,6 +106,8 @@ public class TransactionPool implements BlockAddedObserver {
new PendingTransactionsListenersProxy();
private volatile OptionalLong subscribeConnectId = OptionalLong.empty();
private final SaveRestoreManager saveRestoreManager = new SaveRestoreManager();
private final Lock blockAddedLock = new ReentrantLock();
private final Queue<BlockAddedEvent> blockAddedQueue = new ConcurrentLinkedQueue<>();

public TransactionPool(
final Supplier<PendingTransactions> pendingTransactionsSupplier,
Expand Down Expand Up @@ -321,16 +325,47 @@ public void unsubscribeDroppedTransactions(final long id) {
@Override
public void onBlockAdded(final BlockAddedEvent event) {
if (isPoolEnabled.get()) {
LOG.trace("Block added event {}", event);
final long started = System.currentTimeMillis();
if (event.getEventType().equals(BlockAddedEvent.EventType.HEAD_ADVANCED)
|| event.getEventType().equals(BlockAddedEvent.EventType.CHAIN_REORG)) {

pendingTransactions.manageBlockAdded(
event.getBlock().getHeader(),
event.getAddedTransactions(),
event.getRemovedTransactions(),
protocolSchedule.getByBlockHeader(event.getBlock().getHeader()).getFeeMarket());
reAddTransactions(event.getRemovedTransactions());
// add the event to the processing queue
blockAddedQueue.add(event);

// we want to process the added block asynchronously,
// but at the same time we must ensure that blocks are processed in order one at time
ethContext
.getScheduler()
.scheduleServiceTask(
() -> {
while (!blockAddedQueue.isEmpty()) {
if (blockAddedLock.tryLock()) {
// no other thread is processing the queue, so start processing it
try {
BlockAddedEvent e = blockAddedQueue.poll();
// check again since another thread could have stolen our task
if (e != null) {
pendingTransactions.manageBlockAdded(
e.getBlock().getHeader(),
e.getAddedTransactions(),
e.getRemovedTransactions(),
protocolSchedule
.getByBlockHeader(e.getBlock().getHeader())
.getFeeMarket());
reAddTransactions(e.getRemovedTransactions());
LOG.atDebug()
.setMessage("Block added event {} processed in {}ms")
.addArgument(e)
.addArgument(() -> System.currentTimeMillis() - started)
.log();
}
} finally {
blockAddedLock.unlock();
}
}
}
return null;
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -98,6 +99,7 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -229,6 +231,9 @@ public void setUp() {
final EthScheduler ethScheduler = mock(EthScheduler.class);
syncTaskCapture = ArgumentCaptor.forClass(Runnable.class);
doNothing().when(ethScheduler).scheduleSyncWorkerTask(syncTaskCapture.capture());
doAnswer(invocation -> ((Supplier<Void>) invocation.getArguments()[0]).get())
.when(ethScheduler)
.scheduleServiceTask(any(Supplier.class));
doReturn(ethScheduler).when(ethContext).getScheduler();

peerTransactionTracker = new PeerTransactionTracker();
Expand Down
Loading