Skip to content

Commit

Permalink
Fix worldstate halt with snap sync during initial sync (hyperledger#6981
Browse files Browse the repository at this point in the history
)

Signed-off-by: Karim Taam <karim.t2am@gmail.com>
Signed-off-by: Jason Frame <jason.frame@consensys.net>
Co-authored-by: Sally MacFarlane <macfarla.github@gmail.com>
Signed-off-by: Justin Florentine <justin+github@florentine.us>
  • Loading branch information
2 people authored and jflo committed Jun 10, 2024
1 parent 9b7030f commit 5204df2
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 74 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
- Fix to avoid broadcasting full blob txs, instead of only the tx announcement, to a subset of nodes [#6835](https://github.com/hyperledger/besu/pull/6835)
- Snap client fixes discovered during snap server testing [#6847](https://github.com/hyperledger/besu/pull/6847)
- Correctly initialize the txpool as disabled on creation [#6890](https://github.com/hyperledger/besu/pull/6890)
- Fix worldstate download halt when using snap sync during initial sync [#6981](https://github.com/hyperledger/besu/pull/6981)

### Download Links

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ public CompleteTaskStep(

public synchronized void markAsCompleteOrFailed(
final SnapWorldDownloadState downloadState, final Task<SnapDataRequest> task) {
if (task.getData().isResponseReceived()
|| (task.getData() instanceof TrieNodeHealingRequest
&& task.getData().isExpired(snapSyncState))) {
final boolean isResponseReceived = task.getData().isResponseReceived();
final boolean isExpiredRequest =
task.getData() instanceof TrieNodeHealingRequest && task.getData().isExpired(snapSyncState);
// if pivot block has changed, the request is expired and we mark this one completed
if (isResponseReceived || isExpiredRequest) {
completedRequestsCounter.inc();
task.markCompleted();
downloadState.checkCompletion(snapSyncState.getPivotBlockHeader().orElseThrow());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -95,10 +96,11 @@ public CompletableFuture<Task<SnapDataRequest>> requestAccount(
downloadState.addOutstandingTask(getAccountTask);
return getAccountTask
.run()
.orTimeout(10, TimeUnit.SECONDS)
.handle(
(response, error) -> {
downloadState.removeOutstandingTask(getAccountTask);
if (response != null) {
downloadState.removeOutstandingTask(getAccountTask);
accountDataRequest.setRootHash(blockHeader.getStateRoot());
accountDataRequest.addResponse(
worldStateProofProvider, response.accounts(), response.proofs());
Expand Down Expand Up @@ -130,13 +132,12 @@ public CompletableFuture<List<Task<SnapDataRequest>>> requestStorage(
downloadState.addOutstandingTask(getStorageRangeTask);
return getStorageRangeTask
.run()
.orTimeout(10, TimeUnit.SECONDS)
.handle(
(response, error) -> {
downloadState.removeOutstandingTask(getStorageRangeTask);
if (response != null) {
downloadState.removeOutstandingTask(getStorageRangeTask);
final ArrayDeque<NavigableMap<Bytes32, Bytes>> slots = new ArrayDeque<>();
// Check if we have an empty range

/*
* Checks if the response represents an "empty range".
*
Expand Down Expand Up @@ -186,10 +187,11 @@ public CompletableFuture<List<Task<SnapDataRequest>>> requestCode(
downloadState.addOutstandingTask(getByteCodeTask);
return getByteCodeTask
.run()
.orTimeout(10, TimeUnit.SECONDS)
.handle(
(response, error) -> {
downloadState.removeOutstandingTask(getByteCodeTask);
if (response != null) {
downloadState.removeOutstandingTask(getByteCodeTask);
for (Task<SnapDataRequest> requestTask : requestTasks) {
final BytecodeRequest request = (BytecodeRequest) requestTask.getData();
request.setRootHash(blockHeader.getStateRoot());
Expand Down Expand Up @@ -225,10 +227,11 @@ public CompletableFuture<List<Task<SnapDataRequest>>> requestTrieNodeByPath(
downloadState.addOutstandingTask(getTrieNodeFromPeerTask);
return getTrieNodeFromPeerTask
.run()
.orTimeout(10, TimeUnit.SECONDS)
.handle(
(response, error) -> {
downloadState.removeOutstandingTask(getTrieNodeFromPeerTask);
if (response != null) {
downloadState.removeOutstandingTask(getTrieNodeFromPeerTask);
for (final Task<SnapDataRequest> task : requestTasks) {
final TrieNodeHealingRequest request = (TrieNodeHealingRequest) task.getData();
final Bytes matchingData = response.get(request.getPathId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
Expand Down Expand Up @@ -86,7 +85,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>

// blockchain
private final Blockchain blockchain;
private OptionalLong blockObserverId;
private final Long blockObserverId;

// metrics around the snapsync
private final SnapSyncMetricsManager metricsManager;
Expand All @@ -111,7 +110,8 @@ public SnapWorldDownloadState(
this.blockchain = blockchain;
this.snapSyncState = snapSyncState;
this.metricsManager = metricsManager;
this.blockObserverId = OptionalLong.empty();
this.blockObserverId = blockchain.observeBlockAdded(createBlockchainObserver());

metricsManager
.getMetricsSystem()
.createLongGauge(
Expand Down Expand Up @@ -174,11 +174,6 @@ public synchronized boolean checkCompletion(final BlockHeader header) {

// if all snapsync tasks are completed and the healing process was not running
if (!snapSyncState.isHealTrieInProgress()) {
// Register blockchain observer if not already registered
blockObserverId =
blockObserverId.isEmpty()
? OptionalLong.of(blockchain.observeBlockAdded(createBlockchainObserver()))
: blockObserverId;
// Start the healing process
startTrieHeal();
}
Expand All @@ -192,8 +187,6 @@ else if (pivotBlockSelector.isBlockchainBehind()) {
// if all snapsync tasks are completed and the healing was running and the blockchain is not
// behind the pivot block
else {
// Remove the blockchain observer
blockObserverId.ifPresent(blockchain::removeObserver);
// If the flat database healing process is not in progress and the flat database mode is
// FULL
if (!snapSyncState.isHealFlatDatabaseInProgress()
Expand All @@ -213,6 +206,8 @@ else if (pivotBlockSelector.isBlockchainBehind()) {
});
updater.commit();

// Remove the blockchain observer
blockchain.removeObserver(blockObserverId);
// Notify that the snap sync has completed
metricsManager.notifySnapSyncCompleted();
// Clear the snap context
Expand Down Expand Up @@ -441,9 +436,7 @@ public BlockAddedObserver createBlockchainObserver() {
final boolean isBlockchainCaughtUp =
snapSyncState.isWaitingBlockchain() && !pivotBlockSelector.isBlockchainBehind();

if (isNewPivotBlockFound
|| isBlockchainCaughtUp) { // restart heal if we found a new pivot block or if close to
// head again
if (snapSyncState.isHealTrieInProgress() && (isNewPivotBlockFound || isBlockchainCaughtUp)) {
snapSyncState.setWaitingBlockchain(false);
reloadTrieHeal();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package org.hyperledger.besu.ethereum.eth.sync.snapsync;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.DynamicPivotBlockSelector.doNothingOnPivotChange;
import static org.hyperledger.besu.services.pipeline.PipelineBuilder.createPipelineFrom;

import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
Expand Down Expand Up @@ -159,19 +158,12 @@ public static class Builder {
private SnapSyncProcessState snapSyncState;
private PersistDataStep persistDataStep;
private CompleteTaskStep completeTaskStep;
private DynamicPivotBlockSelector pivotBlockManager;

public Builder configuration(final SnapSyncConfiguration snapSyncConfiguration) {
this.snapSyncConfiguration = snapSyncConfiguration;
return this;
}

public Builder dynamicPivotBlockSelector(
final DynamicPivotBlockSelector dynamicPivotBlockSelector) {
this.pivotBlockManager = dynamicPivotBlockSelector;
return this;
}

public Builder maxOutstandingRequests(final int maxOutstandingRequests) {
this.maxOutstandingRequests = maxOutstandingRequests;
return this;
Expand Down Expand Up @@ -265,12 +257,6 @@ public SnapWorldStateDownloadProcess build() {
outputCounter,
true,
"world_state_download")
.thenProcess(
"checkNewPivotBlock-Account",
tasks -> {
pivotBlockManager.check(doNothingOnPivotChange);
return tasks;
})
.thenProcessAsync(
"batchDownloadAccountData",
requestTask -> requestDataStep.requestAccount(requestTask),
Expand All @@ -288,12 +274,6 @@ public SnapWorldStateDownloadProcess build() {
true,
"world_state_download")
.inBatches(snapSyncConfiguration.getStorageCountPerRequest())
.thenProcess(
"checkNewPivotBlock-Storage",
tasks -> {
pivotBlockManager.check(doNothingOnPivotChange);
return tasks;
})
.thenProcessAsyncOrdered(
"batchDownloadStorageData",
requestTask -> requestDataStep.requestStorage(requestTask),
Expand All @@ -314,12 +294,6 @@ public SnapWorldStateDownloadProcess build() {
outputCounter,
true,
"world_state_download")
.thenProcess(
"checkNewPivotBlock-LargeStorage",
tasks -> {
pivotBlockManager.check(doNothingOnPivotChange);
return tasks;
})
.thenProcessAsyncOrdered(
"batchDownloadLargeStorageData",
requestTask -> requestDataStep.requestStorage(List.of(requestTask)),
Expand Down Expand Up @@ -354,14 +328,6 @@ public SnapWorldStateDownloadProcess build() {
.map(BytecodeRequest::getCodeHash)
.distinct()
.count())
.thenProcess(
"checkNewPivotBlock-Code",
tasks -> {
pivotBlockManager.check(
(blockHeader, newBlockFound) ->
reloadHealWhenNeeded(snapSyncState, downloadState, newBlockFound));
return tasks;
})
.thenProcessAsyncOrdered(
"batchDownloadCodeData",
tasks -> requestDataStep.requestCode(tasks),
Expand Down Expand Up @@ -390,14 +356,6 @@ public SnapWorldStateDownloadProcess build() {
3,
bufferCapacity)
.inBatches(snapSyncConfiguration.getTrienodeCountPerRequest())
.thenProcess(
"checkNewPivotBlock-TrieNode",
tasks -> {
pivotBlockManager.check(
(blockHeader, newBlockFound) ->
reloadHealWhenNeeded(snapSyncState, downloadState, newBlockFound));
return tasks;
})
.thenProcessAsync(
"batchDownloadTrieNodeData",
tasks -> requestDataStep.requestTrieNodeByPath(tasks),
Expand Down Expand Up @@ -461,13 +419,4 @@ public SnapWorldStateDownloadProcess build() {
requestsToComplete);
}
}

private static void reloadHealWhenNeeded(
final SnapSyncProcessState snapSyncState,
final SnapWorldDownloadState downloadState,
final boolean newBlockFound) {
if (snapSyncState.isHealTrieInProgress() && newBlockFound) {
downloadState.reloadTrieHeal();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ public CompletableFuture<Void> run(
SnapWorldStateDownloadProcess.builder()
.configuration(snapSyncConfiguration)
.maxOutstandingRequests(maxOutstandingRequests)
.dynamicPivotBlockSelector(dynamicPivotBlockManager)
.loadLocalDataStep(
new LoadLocalDataStep(
worldStateStorageCoordinator,
Expand Down

0 comments on commit 5204df2

Please sign in to comment.