diff --git a/CHANGELOG.md b/CHANGELOG.md index 20833b7f2f2..c5332c57b9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,7 @@ - Fix to avoid broadcasting full blob txs, instead of only the tx announcement, to a subset of nodes [#6835](https://github.com/hyperledger/besu/pull/6835) - Snap client fixes discovered during snap server testing [#6847](https://github.com/hyperledger/besu/pull/6847) - Correctly initialize the txpool as disabled on creation [#6890](https://github.com/hyperledger/besu/pull/6890) +- Fix worldstate download halt when using snap sync during initial sync [#6981](https://github.com/hyperledger/besu/pull/6981) ### Download Links diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/CompleteTaskStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/CompleteTaskStep.java index 202853ce28f..3a2ffa35065 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/CompleteTaskStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/CompleteTaskStep.java @@ -43,9 +43,11 @@ public CompleteTaskStep( public synchronized void markAsCompleteOrFailed( final SnapWorldDownloadState downloadState, final Task task) { - if (task.getData().isResponseReceived() - || (task.getData() instanceof TrieNodeHealingRequest - && task.getData().isExpired(snapSyncState))) { + final boolean isResponseReceived = task.getData().isResponseReceived(); + final boolean isExpiredRequest = + task.getData() instanceof TrieNodeHealingRequest && task.getData().isExpired(snapSyncState); + // if pivot block has changed, the request is expired and we mark this one completed + if (isResponseReceived || isExpiredRequest) { completedRequestsCounter.inc(); task.markCompleted(); downloadState.checkCompletion(snapSyncState.getPivotBlockHeader().orElseThrow()); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/RequestDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/RequestDataStep.java index f866f4c41ba..f2df979baac 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/RequestDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/RequestDataStep.java @@ -44,6 +44,7 @@ import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import com.google.common.collect.Lists; @@ -95,10 +96,11 @@ public CompletableFuture> requestAccount( downloadState.addOutstandingTask(getAccountTask); return getAccountTask .run() + .orTimeout(10, TimeUnit.SECONDS) .handle( (response, error) -> { + downloadState.removeOutstandingTask(getAccountTask); if (response != null) { - downloadState.removeOutstandingTask(getAccountTask); accountDataRequest.setRootHash(blockHeader.getStateRoot()); accountDataRequest.addResponse( worldStateProofProvider, response.accounts(), response.proofs()); @@ -130,13 +132,12 @@ public CompletableFuture>> requestStorage( downloadState.addOutstandingTask(getStorageRangeTask); return getStorageRangeTask .run() + .orTimeout(10, TimeUnit.SECONDS) .handle( (response, error) -> { + downloadState.removeOutstandingTask(getStorageRangeTask); if (response != null) { - downloadState.removeOutstandingTask(getStorageRangeTask); final ArrayDeque> slots = new ArrayDeque<>(); - // Check if we have an empty range - /* * Checks if the response represents an "empty range". * @@ -186,10 +187,11 @@ public CompletableFuture>> requestCode( downloadState.addOutstandingTask(getByteCodeTask); return getByteCodeTask .run() + .orTimeout(10, TimeUnit.SECONDS) .handle( (response, error) -> { + downloadState.removeOutstandingTask(getByteCodeTask); if (response != null) { - downloadState.removeOutstandingTask(getByteCodeTask); for (Task requestTask : requestTasks) { final BytecodeRequest request = (BytecodeRequest) requestTask.getData(); request.setRootHash(blockHeader.getStateRoot()); @@ -225,10 +227,11 @@ public CompletableFuture>> requestTrieNodeByPath( downloadState.addOutstandingTask(getTrieNodeFromPeerTask); return getTrieNodeFromPeerTask .run() + .orTimeout(10, TimeUnit.SECONDS) .handle( (response, error) -> { + downloadState.removeOutstandingTask(getTrieNodeFromPeerTask); if (response != null) { - downloadState.removeOutstandingTask(getTrieNodeFromPeerTask); for (final Task task : requestTasks) { final TrieNodeHealingRequest request = (TrieNodeHealingRequest) task.getData(); final Bytes matchingData = response.get(request.getPathId()); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java index 3d93b852297..1b6567588b2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java @@ -46,7 +46,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -86,7 +85,7 @@ public class SnapWorldDownloadState extends WorldDownloadState // blockchain private final Blockchain blockchain; - private OptionalLong blockObserverId; + private final Long blockObserverId; // metrics around the snapsync private final SnapSyncMetricsManager metricsManager; @@ -111,7 +110,8 @@ public SnapWorldDownloadState( this.blockchain = blockchain; this.snapSyncState = snapSyncState; this.metricsManager = metricsManager; - this.blockObserverId = OptionalLong.empty(); + this.blockObserverId = blockchain.observeBlockAdded(createBlockchainObserver()); + metricsManager .getMetricsSystem() .createLongGauge( @@ -174,11 +174,6 @@ public synchronized boolean checkCompletion(final BlockHeader header) { // if all snapsync tasks are completed and the healing process was not running if (!snapSyncState.isHealTrieInProgress()) { - // Register blockchain observer if not already registered - blockObserverId = - blockObserverId.isEmpty() - ? OptionalLong.of(blockchain.observeBlockAdded(createBlockchainObserver())) - : blockObserverId; // Start the healing process startTrieHeal(); } @@ -192,8 +187,6 @@ else if (pivotBlockSelector.isBlockchainBehind()) { // if all snapsync tasks are completed and the healing was running and the blockchain is not // behind the pivot block else { - // Remove the blockchain observer - blockObserverId.ifPresent(blockchain::removeObserver); // If the flat database healing process is not in progress and the flat database mode is // FULL if (!snapSyncState.isHealFlatDatabaseInProgress() @@ -213,6 +206,8 @@ else if (pivotBlockSelector.isBlockchainBehind()) { }); updater.commit(); + // Remove the blockchain observer + blockchain.removeObserver(blockObserverId); // Notify that the snap sync has completed metricsManager.notifySnapSyncCompleted(); // Clear the snap context @@ -441,9 +436,7 @@ public BlockAddedObserver createBlockchainObserver() { final boolean isBlockchainCaughtUp = snapSyncState.isWaitingBlockchain() && !pivotBlockSelector.isBlockchainBehind(); - if (isNewPivotBlockFound - || isBlockchainCaughtUp) { // restart heal if we found a new pivot block or if close to - // head again + if (snapSyncState.isHealTrieInProgress() && (isNewPivotBlockFound || isBlockchainCaughtUp)) { snapSyncState.setWaitingBlockchain(false); reloadTrieHeal(); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java index c19ae6facc7..ddab9688043 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java @@ -15,7 +15,6 @@ package org.hyperledger.besu.ethereum.eth.sync.snapsync; import static com.google.common.base.Preconditions.checkNotNull; -import static org.hyperledger.besu.ethereum.eth.sync.snapsync.DynamicPivotBlockSelector.doNothingOnPivotChange; import static org.hyperledger.besu.services.pipeline.PipelineBuilder.createPipelineFrom; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; @@ -159,19 +158,12 @@ public static class Builder { private SnapSyncProcessState snapSyncState; private PersistDataStep persistDataStep; private CompleteTaskStep completeTaskStep; - private DynamicPivotBlockSelector pivotBlockManager; public Builder configuration(final SnapSyncConfiguration snapSyncConfiguration) { this.snapSyncConfiguration = snapSyncConfiguration; return this; } - public Builder dynamicPivotBlockSelector( - final DynamicPivotBlockSelector dynamicPivotBlockSelector) { - this.pivotBlockManager = dynamicPivotBlockSelector; - return this; - } - public Builder maxOutstandingRequests(final int maxOutstandingRequests) { this.maxOutstandingRequests = maxOutstandingRequests; return this; @@ -265,12 +257,6 @@ public SnapWorldStateDownloadProcess build() { outputCounter, true, "world_state_download") - .thenProcess( - "checkNewPivotBlock-Account", - tasks -> { - pivotBlockManager.check(doNothingOnPivotChange); - return tasks; - }) .thenProcessAsync( "batchDownloadAccountData", requestTask -> requestDataStep.requestAccount(requestTask), @@ -288,12 +274,6 @@ public SnapWorldStateDownloadProcess build() { true, "world_state_download") .inBatches(snapSyncConfiguration.getStorageCountPerRequest()) - .thenProcess( - "checkNewPivotBlock-Storage", - tasks -> { - pivotBlockManager.check(doNothingOnPivotChange); - return tasks; - }) .thenProcessAsyncOrdered( "batchDownloadStorageData", requestTask -> requestDataStep.requestStorage(requestTask), @@ -314,12 +294,6 @@ public SnapWorldStateDownloadProcess build() { outputCounter, true, "world_state_download") - .thenProcess( - "checkNewPivotBlock-LargeStorage", - tasks -> { - pivotBlockManager.check(doNothingOnPivotChange); - return tasks; - }) .thenProcessAsyncOrdered( "batchDownloadLargeStorageData", requestTask -> requestDataStep.requestStorage(List.of(requestTask)), @@ -354,14 +328,6 @@ public SnapWorldStateDownloadProcess build() { .map(BytecodeRequest::getCodeHash) .distinct() .count()) - .thenProcess( - "checkNewPivotBlock-Code", - tasks -> { - pivotBlockManager.check( - (blockHeader, newBlockFound) -> - reloadHealWhenNeeded(snapSyncState, downloadState, newBlockFound)); - return tasks; - }) .thenProcessAsyncOrdered( "batchDownloadCodeData", tasks -> requestDataStep.requestCode(tasks), @@ -390,14 +356,6 @@ public SnapWorldStateDownloadProcess build() { 3, bufferCapacity) .inBatches(snapSyncConfiguration.getTrienodeCountPerRequest()) - .thenProcess( - "checkNewPivotBlock-TrieNode", - tasks -> { - pivotBlockManager.check( - (blockHeader, newBlockFound) -> - reloadHealWhenNeeded(snapSyncState, downloadState, newBlockFound)); - return tasks; - }) .thenProcessAsync( "batchDownloadTrieNodeData", tasks -> requestDataStep.requestTrieNodeByPath(tasks), @@ -461,13 +419,4 @@ public SnapWorldStateDownloadProcess build() { requestsToComplete); } } - - private static void reloadHealWhenNeeded( - final SnapSyncProcessState snapSyncState, - final SnapWorldDownloadState downloadState, - final boolean newBlockFound) { - if (snapSyncState.isHealTrieInProgress() && newBlockFound) { - downloadState.reloadTrieHeal(); - } - } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java index 471c84a6321..0e0e1b1ac30 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java @@ -214,7 +214,6 @@ public CompletableFuture run( SnapWorldStateDownloadProcess.builder() .configuration(snapSyncConfiguration) .maxOutstandingRequests(maxOutstandingRequests) - .dynamicPivotBlockSelector(dynamicPivotBlockManager) .loadLocalDataStep( new LoadLocalDataStep( worldStateStorageCoordinator,