-
Notifications
You must be signed in to change notification settings - Fork 839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix snapsync heal #5838
Fix snapsync heal #5838
Changes from all commits
7809ec5
26f35a6
9ef4497
de39eec
54b2e77
ed3c586
c19d464
4132242
2cd8215
e8ca734
3564ccb
c6609b2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,6 @@ | |
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.AccountFlatDatabaseHealingRangeRequest; | ||
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.StorageFlatDatabaseHealingRangeRequest; | ||
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState; | ||
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat; | ||
import org.hyperledger.besu.ethereum.worldstate.FlatDbMode; | ||
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; | ||
import org.hyperledger.besu.metrics.BesuMetricCategory; | ||
|
@@ -43,6 +42,7 @@ | |
import java.util.List; | ||
import java.util.Map; | ||
import java.util.OptionalLong; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.Consumer; | ||
import java.util.function.Predicate; | ||
import java.util.stream.Stream; | ||
|
@@ -72,7 +72,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest> | |
|
||
protected final InMemoryTasksPriorityQueues<SnapDataRequest> | ||
pendingStorageFlatDatabaseHealingRequests = new InMemoryTasksPriorityQueues<>(); | ||
private HashSet<Bytes> accountsToBeRepaired = new HashSet<>(); | ||
private HashSet<Bytes> accountsHealingList = new HashSet<>(); | ||
private DynamicPivotBlockSelector pivotBlockSelector; | ||
|
||
private final SnapSyncStatePersistenceManager snapContext; | ||
|
@@ -156,6 +156,7 @@ protected synchronized void markAsStalled(final int maxNodeRequestRetries) { | |
@Override | ||
public synchronized boolean checkCompletion(final BlockHeader header) { | ||
|
||
// Check if all snapsync tasks are completed | ||
if (!internalFuture.isDone() | ||
&& pendingAccountRequests.allTasksCompleted() | ||
&& pendingCodeRequests.allTasksCompleted() | ||
|
@@ -164,29 +165,50 @@ public synchronized boolean checkCompletion(final BlockHeader header) { | |
&& pendingTrieNodeRequests.allTasksCompleted() | ||
&& pendingAccountFlatDatabaseHealingRequests.allTasksCompleted() | ||
&& pendingStorageFlatDatabaseHealingRequests.allTasksCompleted()) { | ||
|
||
// if all snapsync tasks are completed and the healing process was not running | ||
if (!snapSyncState.isHealTrieInProgress()) { | ||
// Register blockchain observer if not already registered | ||
blockObserverId = | ||
blockObserverId.isEmpty() | ||
? OptionalLong.of(blockchain.observeBlockAdded(createBlockchainObserver())) | ||
: blockObserverId; | ||
// Start the healing process | ||
startTrieHeal(); | ||
} else if (pivotBlockSelector.isBlockchainBehind()) { | ||
} | ||
// if all snapsync tasks are completed and the healing was running and blockchain is behind | ||
// the pivot block | ||
else if (pivotBlockSelector.isBlockchainBehind()) { | ||
LOG.info("Pausing world state download while waiting for sync to complete"); | ||
if (blockObserverId.isEmpty()) | ||
blockObserverId = OptionalLong.of(blockchain.observeBlockAdded(getBlockAddedListener())); | ||
// Set the snapsync to wait for the blockchain to catch up | ||
snapSyncState.setWaitingBlockchain(true); | ||
} else if (!snapSyncState.isHealFlatDatabaseInProgress() | ||
&& worldStateStorage.getFlatDbMode().equals(FlatDbMode.FULL)) { | ||
// only doing a flat db heal for bonsai | ||
startFlatDatabaseHeal(header); | ||
} else { | ||
final WorldStateStorage.Updater updater = worldStateStorage.updater(); | ||
updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData); | ||
updater.commit(); | ||
metricsManager.notifySnapSyncCompleted(); | ||
snapContext.clear(); | ||
internalFuture.complete(null); | ||
|
||
return true; | ||
} | ||
// if all snapsync tasks are completed and the healing was running and the blockchain is not | ||
// behind the pivot block | ||
else { | ||
// Remove the blockchain observer | ||
blockObserverId.ifPresent(blockchain::removeObserver); | ||
// If the flat database healing process is not in progress and the flat database mode is | ||
// FULL | ||
if (!snapSyncState.isHealFlatDatabaseInProgress() | ||
&& worldStateStorage.getFlatDbMode().equals(FlatDbMode.FULL)) { | ||
// Start the flat database healing process | ||
startFlatDatabaseHeal(header); | ||
} | ||
// If the flat database healing process is in progress or the flat database mode is not FULL | ||
else { | ||
final WorldStateStorage.Updater updater = worldStateStorage.updater(); | ||
updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData); | ||
updater.commit(); | ||
// Notify that the snap sync has completed | ||
metricsManager.notifySnapSyncCompleted(); | ||
// Clear the snap context | ||
snapContext.clear(); | ||
internalFuture.complete(null); | ||
return true; | ||
} | ||
} | ||
} | ||
|
||
return false; | ||
} | ||
|
||
|
@@ -200,10 +222,11 @@ protected synchronized void cleanupQueues() { | |
pendingTrieNodeRequests.clear(); | ||
} | ||
|
||
/** Method to start the healing process of the trie */ | ||
public synchronized void startTrieHeal() { | ||
snapContext.clearAccountRangeTasks(); | ||
snapSyncState.setHealTrieStatus(true); | ||
// try to find new pivot block before healing | ||
// Try to find a new pivot block before starting the healing process | ||
pivotBlockSelector.switchToNewPivotBlock( | ||
(blockHeader, newPivotBlockFound) -> { | ||
snapContext.clearAccountRangeTasks(); | ||
|
@@ -212,21 +235,25 @@ public synchronized void startTrieHeal() { | |
blockHeader.getNumber()); | ||
enqueueRequest( | ||
createAccountTrieNodeDataRequest( | ||
blockHeader.getStateRoot(), Bytes.EMPTY, accountsToBeRepaired)); | ||
blockHeader.getStateRoot(), Bytes.EMPTY, accountsHealingList)); | ||
}); | ||
} | ||
|
||
/** Method to reload the healing process of the trie */ | ||
public synchronized void reloadTrieHeal() { | ||
// Clear the flat database and trie log from the world state storage if needed | ||
worldStateStorage.clearFlatDatabase(); | ||
worldStateStorage.clearTrieLog(); | ||
// Clear pending trie node and code requests | ||
pendingTrieNodeRequests.clear(); | ||
pendingCodeRequests.clear(); | ||
|
||
snapSyncState.setHealTrieStatus(false); | ||
checkCompletion(snapSyncState.getPivotBlockHeader().orElseThrow()); | ||
} | ||
|
||
public synchronized void startFlatDatabaseHeal(final BlockHeader header) { | ||
LOG.info("Running flat database heal process"); | ||
LOG.info("Initiating the healing process for the flat database"); | ||
snapSyncState.setHealFlatDatabaseInProgress(true); | ||
final Map<Bytes32, Bytes32> ranges = RangeManager.generateAllRanges(16); | ||
ranges.forEach( | ||
|
@@ -235,10 +262,6 @@ public synchronized void startFlatDatabaseHeal(final BlockHeader header) { | |
createAccountFlatHealingRangeRequest(header.getStateRoot(), key, value))); | ||
} | ||
|
||
public boolean isBonsaiStorageFormat() { | ||
return worldStateStorage.getDataStorageFormat().equals(DataStorageFormat.BONSAI); | ||
} | ||
|
||
@Override | ||
public synchronized void enqueueRequest(final SnapDataRequest request) { | ||
if (!internalFuture.isDone()) { | ||
|
@@ -263,8 +286,8 @@ public synchronized void enqueueRequest(final SnapDataRequest request) { | |
} | ||
} | ||
|
||
public synchronized void setAccountsToBeRepaired(final HashSet<Bytes> accountsToBeRepaired) { | ||
this.accountsToBeRepaired = accountsToBeRepaired; | ||
public synchronized void setAccountsHealingList(final HashSet<Bytes> addAccountToHealingList) { | ||
this.accountsHealingList = addAccountToHealingList; | ||
} | ||
|
||
/** | ||
|
@@ -274,15 +297,15 @@ public synchronized void setAccountsToBeRepaired(final HashSet<Bytes> accountsTo | |
* | ||
* @param account The account to be added for repair. | ||
*/ | ||
public synchronized void addAccountsToBeRepaired(final Bytes account) { | ||
if (!accountsToBeRepaired.contains(account)) { | ||
snapContext.addAccountsToBeRepaired(account); | ||
accountsToBeRepaired.add(account); | ||
public synchronized void addAccountToHealingList(final Bytes account) { | ||
if (!accountsHealingList.contains(account)) { | ||
snapContext.addAccountToHealingList(account); | ||
accountsHealingList.add(account); | ||
} | ||
} | ||
|
||
public HashSet<Bytes> getAccountsToBeRepaired() { | ||
return accountsToBeRepaired; | ||
public HashSet<Bytes> getAccountsHealingList() { | ||
return accountsHealingList; | ||
} | ||
|
||
@Override | ||
|
@@ -385,25 +408,25 @@ public void setPivotBlockSelector(final DynamicPivotBlockSelector pivotBlockSele | |
this.pivotBlockSelector = pivotBlockSelector; | ||
} | ||
|
||
public BlockAddedObserver getBlockAddedListener() { | ||
public BlockAddedObserver createBlockchainObserver() { | ||
return addedBlockContext -> { | ||
if (snapSyncState.isWaitingBlockchain()) { | ||
// if we receive a new pivot block we can restart the heal | ||
pivotBlockSelector.check( | ||
(____, isNewPivotBlock) -> { | ||
if (isNewPivotBlock) { | ||
snapSyncState.setWaitingBlockchain(false); | ||
} | ||
}); | ||
// if we are close to the head we can also restart the heal and finish snapsync | ||
if (!pivotBlockSelector.isBlockchainBehind()) { | ||
snapSyncState.setWaitingBlockchain(false); | ||
} | ||
if (!snapSyncState.isWaitingBlockchain()) { | ||
blockObserverId.ifPresent(blockchain::removeObserver); | ||
blockObserverId = OptionalLong.empty(); | ||
reloadTrieHeal(); | ||
} | ||
final AtomicBoolean foundNewPivotBlock = new AtomicBoolean(false); | ||
pivotBlockSelector.check( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it not a problem that The async code in DynamicPivotBlockSelector makes this hard to reason about. This PR doesn't make this worse, but still hard to follow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed, the code for the Pivot Block Selector is a bit complex to understand and could benefit from more detailed descriptions. This class fetches pivot blocks in advance to avoid having to search at the time of the call. I don't have the exact numbers, but for example: I call the check method and if the distance to the head is >60 and <126, I will cache a pivot block. I won't use it right now. On the next call, when the distance to the head is >126, I will take the one cached. This means that every time we call this method and we are in this range(>60 and <126), we are looking for a pivot block to put in a cache , This is to have fast access to the next pivot block In conclusion, there will be no problem as it will be synchronous (every time we will take the pivot from cache). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it would probably be beneficial to simplify this pivot block selection code in the future as it's more technical debt. Previously, obtaining a pivot block required querying 5 peers and reaching a consensus, which could be time-consuming. Now, it's simply a matter of taking the last safe block. Therefore, this logic may no longer be necessary. |
||
(____, isNewPivotBlock) -> { | ||
if (isNewPivotBlock) { | ||
foundNewPivotBlock.set(true); | ||
} | ||
}); | ||
|
||
final boolean isNewPivotBlockFound = foundNewPivotBlock.get(); | ||
final boolean isBlockchainCaughtUp = | ||
snapSyncState.isWaitingBlockchain() && !pivotBlockSelector.isBlockchainBehind(); | ||
|
||
if (isNewPivotBlockFound | ||
|| isBlockchainCaughtUp) { // restart heal if we found a new pivot block or if close to | ||
// head again | ||
snapSyncState.setWaitingBlockchain(false); | ||
reloadTrieHeal(); | ||
} | ||
}; | ||
} | ||
|
Check failure
Code scanning / CodeQL
Inconsistent synchronization of getter and setter Error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignore this one because it's not in the scope of this PR. I'm just renaming the method in this PR . Don't want to modify the logic