From a3f3afdc6054bf7c20121bcc9ca910b31f6a7594 Mon Sep 17 00:00:00 2001 From: Jason Frame Date: Mon, 29 Apr 2024 13:55:38 +1000 Subject: [PATCH 01/10] Check for pivot block before each step in case chain halts Signed-off-by: Jason Frame --- .../SnapWorldStateDownloadProcess.java | 51 +++++++++++++++++++ .../snapsync/SnapWorldStateDownloader.java | 1 + 2 files changed, 52 insertions(+) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java index ddab9688043..c19ae6facc7 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.eth.sync.snapsync; import static com.google.common.base.Preconditions.checkNotNull; +import static org.hyperledger.besu.ethereum.eth.sync.snapsync.DynamicPivotBlockSelector.doNothingOnPivotChange; import static org.hyperledger.besu.services.pipeline.PipelineBuilder.createPipelineFrom; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; @@ -158,12 +159,19 @@ public static class Builder { private SnapSyncProcessState snapSyncState; private PersistDataStep persistDataStep; private CompleteTaskStep completeTaskStep; + private DynamicPivotBlockSelector pivotBlockManager; public Builder configuration(final SnapSyncConfiguration snapSyncConfiguration) { this.snapSyncConfiguration = snapSyncConfiguration; return this; } + public Builder dynamicPivotBlockSelector( + final DynamicPivotBlockSelector dynamicPivotBlockSelector) { + this.pivotBlockManager = dynamicPivotBlockSelector; + return this; + } + public Builder maxOutstandingRequests(final int maxOutstandingRequests) { this.maxOutstandingRequests = maxOutstandingRequests; return this; @@ -257,6 +265,12 @@ public SnapWorldStateDownloadProcess build() { outputCounter, true, "world_state_download") + .thenProcess( + "checkNewPivotBlock-Account", + tasks -> { + pivotBlockManager.check(doNothingOnPivotChange); + return tasks; + }) .thenProcessAsync( "batchDownloadAccountData", requestTask -> requestDataStep.requestAccount(requestTask), @@ -274,6 +288,12 @@ public SnapWorldStateDownloadProcess build() { true, "world_state_download") .inBatches(snapSyncConfiguration.getStorageCountPerRequest()) + .thenProcess( + "checkNewPivotBlock-Storage", + tasks -> { + pivotBlockManager.check(doNothingOnPivotChange); + return tasks; + }) .thenProcessAsyncOrdered( "batchDownloadStorageData", requestTask -> requestDataStep.requestStorage(requestTask), @@ -294,6 +314,12 @@ public SnapWorldStateDownloadProcess build() { outputCounter, true, "world_state_download") + .thenProcess( + "checkNewPivotBlock-LargeStorage", + tasks -> { + pivotBlockManager.check(doNothingOnPivotChange); + return tasks; + }) .thenProcessAsyncOrdered( "batchDownloadLargeStorageData", requestTask -> requestDataStep.requestStorage(List.of(requestTask)), @@ -328,6 +354,14 @@ public SnapWorldStateDownloadProcess build() { .map(BytecodeRequest::getCodeHash) .distinct() .count()) + .thenProcess( + "checkNewPivotBlock-Code", + tasks -> { + pivotBlockManager.check( + (blockHeader, newBlockFound) -> + reloadHealWhenNeeded(snapSyncState, downloadState, newBlockFound)); + return tasks; + }) .thenProcessAsyncOrdered( "batchDownloadCodeData", tasks -> requestDataStep.requestCode(tasks), @@ -356,6 +390,14 @@ public SnapWorldStateDownloadProcess build() { 3, bufferCapacity) .inBatches(snapSyncConfiguration.getTrienodeCountPerRequest()) + .thenProcess( + "checkNewPivotBlock-TrieNode", + tasks -> { + pivotBlockManager.check( + (blockHeader, newBlockFound) -> + reloadHealWhenNeeded(snapSyncState, downloadState, newBlockFound)); + return tasks; + }) .thenProcessAsync( "batchDownloadTrieNodeData", tasks -> requestDataStep.requestTrieNodeByPath(tasks), @@ -419,4 +461,13 @@ public SnapWorldStateDownloadProcess build() { requestsToComplete); } } + + private static void reloadHealWhenNeeded( + final SnapSyncProcessState snapSyncState, + final SnapWorldDownloadState downloadState, + final boolean newBlockFound) { + if (snapSyncState.isHealTrieInProgress() && newBlockFound) { + downloadState.reloadTrieHeal(); + } + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java index 0e0e1b1ac30..471c84a6321 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java @@ -214,6 +214,7 @@ public CompletableFuture run( SnapWorldStateDownloadProcess.builder() .configuration(snapSyncConfiguration) .maxOutstandingRequests(maxOutstandingRequests) + .dynamicPivotBlockSelector(dynamicPivotBlockManager) .loadLocalDataStep( new LoadLocalDataStep( worldStateStorageCoordinator, From 8cea73bc95ebfc2f433b2cb816c6f53dbb94e489 Mon Sep 17 00:00:00 2001 From: Jason Frame Date: Tue, 30 Apr 2024 20:13:05 +1000 Subject: [PATCH 02/10] Update timeout of pivot block to 5s Signed-off-by: Jason Frame --- .../ethereum/eth/sync/snapsync/DynamicPivotBlockSelector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/DynamicPivotBlockSelector.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/DynamicPivotBlockSelector.java index 1e3d6bb2a5e..32dd014ba15 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/DynamicPivotBlockSelector.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/DynamicPivotBlockSelector.java @@ -161,7 +161,7 @@ private CompletableFuture downloadNewPivotBlock(final FastSyncState fss) { .addArgument(this::logLastPivotBlockFound) .log(); }) - .orTimeout(5, TimeUnit.MINUTES); + .orTimeout(5, TimeUnit.SECONDS); } private boolean isSamePivotBlock(final FastSyncState fss) { From aaf8973b00e9c3d0df707b8f7e92bbb6e3714274 Mon Sep 17 00:00:00 2001 From: Jason Frame Date: Wed, 1 May 2024 08:20:40 +1000 Subject: [PATCH 03/10] Async pivot block check from block added events so that import block thread is not blocked Signed-off-by: Jason Frame --- .../sync/snapsync/SnapWorldDownloadState.java | 48 +++++++++++-------- .../snapsync/SnapWorldStateDownloader.java | 3 +- .../snapsync/SnapWorldDownloadStateTest.java | 17 ++++++- 3 files changed, 47 insertions(+), 21 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java index 1b6567588b2..df5ac18f452 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java @@ -21,6 +21,7 @@ import org.hyperledger.besu.ethereum.chain.BlockAddedObserver; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.sync.snapsync.context.SnapSyncStatePersistenceManager; import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.AccountRangeDataRequest; import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.BytecodeRequest; @@ -86,6 +87,7 @@ public class SnapWorldDownloadState extends WorldDownloadState // blockchain private final Blockchain blockchain; private final Long blockObserverId; + private final EthContext ethContext; // metrics around the snapsync private final SnapSyncMetricsManager metricsManager; @@ -99,7 +101,8 @@ public SnapWorldDownloadState( final int maxRequestsWithoutProgress, final long minMillisBeforeStalling, final SnapSyncMetricsManager metricsManager, - final Clock clock) { + final Clock clock, + final EthContext ethContext) { super( worldStateStorageCoordinator, pendingRequests, @@ -111,6 +114,7 @@ public SnapWorldDownloadState( this.snapSyncState = snapSyncState; this.metricsManager = metricsManager; this.blockObserverId = blockchain.observeBlockAdded(createBlockchainObserver()); + this.ethContext = ethContext; metricsManager .getMetricsSystem() @@ -423,23 +427,29 @@ public void setPivotBlockSelector(final DynamicPivotBlockSelector pivotBlockSele } public BlockAddedObserver createBlockchainObserver() { - return addedBlockContext -> { - final AtomicBoolean foundNewPivotBlock = new AtomicBoolean(false); - pivotBlockSelector.check( - (____, isNewPivotBlock) -> { - if (isNewPivotBlock) { - foundNewPivotBlock.set(true); - } - }); - - final boolean isNewPivotBlockFound = foundNewPivotBlock.get(); - final boolean isBlockchainCaughtUp = - snapSyncState.isWaitingBlockchain() && !pivotBlockSelector.isBlockchainBehind(); - - if (snapSyncState.isHealTrieInProgress() && (isNewPivotBlockFound || isBlockchainCaughtUp)) { - snapSyncState.setWaitingBlockchain(false); - reloadTrieHeal(); - } - }; + return addedBlockContext -> + ethContext + .getScheduler() + .executeServiceTask( + () -> { + final AtomicBoolean foundNewPivotBlock = new AtomicBoolean(false); + pivotBlockSelector.check( + (____, isNewPivotBlock) -> { + if (isNewPivotBlock) { + foundNewPivotBlock.set(true); + } + }); + + final boolean isNewPivotBlockFound = foundNewPivotBlock.get(); + final boolean isBlockchainCaughtUp = + snapSyncState.isWaitingBlockchain() + && !pivotBlockSelector.isBlockchainBehind(); + + if (snapSyncState.isHealTrieInProgress() + && (isNewPivotBlockFound || isBlockchainCaughtUp)) { + snapSyncState.setWaitingBlockchain(false); + reloadTrieHeal(); + } + }); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java index 471c84a6321..cf2f1a4bf6d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java @@ -147,7 +147,8 @@ public CompletableFuture run( maxNodeRequestsWithoutProgress, minMillisBeforeStalling, snapsyncMetricsManager, - clock); + clock, + ethContext); final Map ranges = RangeManager.generateAllRanges(16); snapsyncMetricsManager.initRange(ranges); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadStateTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadStateTest.java index 35c5de5daca..011027813db 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadStateTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadStateTest.java @@ -33,6 +33,8 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider; +import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; import org.hyperledger.besu.ethereum.eth.sync.snapsync.context.SnapSyncStatePersistenceManager; import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.BytecodeRequest; @@ -90,6 +92,7 @@ public class SnapWorldDownloadStateTest { private final Blockchain blockchain = mock(Blockchain.class); private final DynamicPivotBlockSelector dynamicPivotBlockManager = mock(DynamicPivotBlockSelector.class); + private final EthContext ethContext = mock(EthContext.class); private final TestClock clock = new TestClock(); private SnapWorldDownloadState downloadState; @@ -132,7 +135,8 @@ public void setUp(final DataStorageFormat storageFormat) { MAX_REQUESTS_WITHOUT_PROGRESS, MIN_MILLIS_BEFORE_STALLING, metricsManager, - clock); + clock, + ethContext); final DynamicPivotBlockSelector dynamicPivotBlockManager = mock(DynamicPivotBlockSelector.class); doAnswer( @@ -147,6 +151,17 @@ public void setUp(final DataStorageFormat storageFormat) { downloadState.setRootNodeData(ROOT_NODE_DATA); future = downloadState.getDownloadFuture(); assertThat(downloadState.isDownloading()).isTrue(); + + final EthScheduler ethScheduler = mock(EthScheduler.class); + when(ethContext.getScheduler()).thenReturn(ethScheduler); + doAnswer( + invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }) + .when(ethScheduler) + .executeServiceTask(any(Runnable.class)); } @ParameterizedTest From af7426f4eea3179314558af65ae770eeedcd0b75 Mon Sep 17 00:00:00 2001 From: "stefan.pingel@consensys.net" Date: Fri, 19 Apr 2024 12:47:36 +1000 Subject: [PATCH 04/10] fix chain download halt Signed-off-by: stefan.pingel@consensys.net --- .../eth/sync/range/RangeHeadersFetcher.java | 9 ++++++++ .../eth/sync/range/SyncTargetRangeSource.java | 21 ++++++++++++++----- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/range/RangeHeadersFetcher.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/range/RangeHeadersFetcher.java index 8ee1dc28a43..096ce58a7f5 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/range/RangeHeadersFetcher.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/range/RangeHeadersFetcher.java @@ -119,7 +119,16 @@ private CompletableFuture> requestHeaders( private List stripExistingRangeHeaders( final BlockHeader lastHeader, final List headers) { + LOG.atDebug() + .setMessage("Retrieved range headers for block number {} to {}") + .addArgument(headers.get(0).getNumber()) + .addArgument(headers.get(headers.size() - 1).getNumber()) + .log(); if (!headers.isEmpty() && headers.get(0).equals(lastHeader)) { + LOG.atDebug() + .setMessage("Stripping block {} from range headers") + .addArgument(lastHeader.getNumber()) + .log(); return headers.subList(1, headers.size()); } return headers; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/range/SyncTargetRangeSource.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/range/SyncTargetRangeSource.java index fbeac02f406..2ff1e57b7a2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/range/SyncTargetRangeSource.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/range/SyncTargetRangeSource.java @@ -22,6 +22,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage; import java.time.Duration; import java.util.ArrayDeque; @@ -53,6 +54,7 @@ public class SyncTargetRangeSource implements Iterator { private boolean reachedEndOfRanges = false; private Optional>> pendingRequests = Optional.empty(); private int requestFailureCount = 0; + private int emptyHeaderFailureCount = 0; public SyncTargetRangeSource( final RangeHeadersFetcher fetcher, @@ -148,13 +150,22 @@ private SyncTargetRange getRangeFromPendingRequest() { pendingRequest.get(newHeaderWaitDuration.toMillis(), MILLISECONDS); this.pendingRequests = Optional.empty(); if (newHeaders.isEmpty()) { - requestFailureCount++; + emptyHeaderFailureCount++; + if (emptyHeaderFailureCount > 5) { + LOG.atDebug() + .setMessage( + "Disconnecting target peer for providing useless or empty range header: {}.") + .addArgument(peer) + .log(); + peer.disconnect(DisconnectMessage.DisconnectReason.USELESS_PEER_USELESS_RESPONSES); + } } else { requestFailureCount = 0; - } - for (final BlockHeader header : newHeaders) { - retrievedRanges.add(new SyncTargetRange(peer, lastRangeEnd, header)); - lastRangeEnd = header; + emptyHeaderFailureCount = 0; + for (final BlockHeader header : newHeaders) { + retrievedRanges.add(new SyncTargetRange(peer, lastRangeEnd, header)); + lastRangeEnd = header; + } } return retrievedRanges.poll(); } catch (final InterruptedException e) { From 077040a4544a91f369c244ea5cfc61fec99d9000 Mon Sep 17 00:00:00 2001 From: "stefan.pingel@consensys.net" Date: Sun, 21 Apr 2024 14:00:47 +1000 Subject: [PATCH 05/10] rename to retires Signed-off-by: stefan.pingel@consensys.net --- .../options/unstable/SynchronizerOptions.java | 13 +++++----- .../cli/options/SynchronizerOptionsTest.java | 2 +- .../eth/sync/SynchronizerConfiguration.java | 20 +++++++--------- .../FastSyncDownloadPipelineFactory.java | 2 +- .../FullSyncDownloadPipelineFactory.java | 2 +- .../eth/sync/range/SyncTargetRangeSource.java | 24 +++++++++---------- 6 files changed, 30 insertions(+), 33 deletions(-) diff --git a/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/SynchronizerOptions.java b/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/SynchronizerOptions.java index da5199f85dc..1adf7080582 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/SynchronizerOptions.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/SynchronizerOptions.java @@ -39,6 +39,8 @@ public class SynchronizerOptions implements CLIOptions