Skip to content

Commit

Permalink
fix healig
Browse files Browse the repository at this point in the history
Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
  • Loading branch information
matkt committed Sep 7, 2023
1 parent ced8b4b commit 66c3e76
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
Expand Down Expand Up @@ -155,6 +156,7 @@ protected synchronized void markAsStalled(final int maxNodeRequestRetries) {
@Override
public synchronized boolean checkCompletion(final BlockHeader header) {

// Check if all snapsync tasks are completed
if (!internalFuture.isDone()
&& pendingAccountRequests.allTasksCompleted()
&& pendingCodeRequests.allTasksCompleted()
Expand All @@ -163,30 +165,50 @@ public synchronized boolean checkCompletion(final BlockHeader header) {
&& pendingTrieNodeRequests.allTasksCompleted()
&& pendingAccountFlatDatabaseHealingRequests.allTasksCompleted()
&& pendingStorageFlatDatabaseHealingRequests.allTasksCompleted()) {

// if all snapsync tasks are completed and the healing process was not running
if (!snapSyncState.isHealTrieInProgress()) {
// Register blockchain observer if not already registered
blockObserverId =
blockObserverId.isEmpty()
? OptionalLong.of(blockchain.observeBlockAdded(createBlockchainObserver()))
: blockObserverId;
// Start the healing process
startTrieHeal();
} else if (pivotBlockSelector.isBlockchainBehind()) {
}
// if all snapsync tasks are completed and the healing was running and blockchain is behind
// the pivot block
else if (pivotBlockSelector.isBlockchainBehind()) {
LOG.info("Pausing world state download while waiting for sync to complete");
// Set the snapsync to wait for the blockchain to catch up
snapSyncState.setWaitingBlockchain(true);
} else {
}
// if all snapsync tasks are completed and the healing was running and the blockchain is not
// behind the pivot block
else {
// Remove the blockchain observer
blockObserverId.ifPresent(blockchain::removeObserver);
// If the flat database healing process is not in progress and the flat database mode is
// FULL
if (!snapSyncState.isHealFlatDatabaseInProgress()
&& worldStateStorage.getFlatDbMode().equals(FlatDbMode.FULL)) {
// only doing a flat db heal for bonsai
// Start the flat database healing process
startFlatDatabaseHeal(header);
} else {
}
// If the flat database healing process is in progress or the flat database mode is not FULL
else {
final WorldStateStorage.Updater updater = worldStateStorage.updater();
updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData);
updater.commit();
// Notify that the snap sync has completed
metricsManager.notifySnapSyncCompleted();
// Clear the snap context
snapContext.clear();
internalFuture.complete(null);

return true;
}
}
}

return false;
}

Expand All @@ -200,12 +222,11 @@ protected synchronized void cleanupQueues() {
pendingTrieNodeRequests.clear();
}

/** Method to start the healing process of the trie */
public synchronized void startTrieHeal() {
snapContext.clearAccountRangeTasks();
snapSyncState.setHealTrieStatus(true);
if (blockObserverId.isEmpty())
blockObserverId = OptionalLong.of(blockchain.observeBlockAdded(getBlockAddedListener()));
// try to find new pivot block before healing
// Try to find a new pivot block before starting the healing process
pivotBlockSelector.switchToNewPivotBlock(
(blockHeader, newPivotBlockFound) -> {
snapContext.clearAccountRangeTasks();
Expand All @@ -218,17 +239,21 @@ public synchronized void startTrieHeal() {
});
}

/** Method to reload the healing process of the trie */
public synchronized void reloadTrieHeal() {
// Clear the flat database and trie log from the world state storage if needed
worldStateStorage.clearFlatDatabase();
worldStateStorage.clearTrieLog();
// Clear pending trie node and code requests
pendingTrieNodeRequests.clear();
pendingCodeRequests.clear();

snapSyncState.setHealTrieStatus(false);
checkCompletion(snapSyncState.getPivotBlockHeader().orElseThrow());
}

public synchronized void startFlatDatabaseHeal(final BlockHeader header) {
LOG.info("Running flat database heal process");
LOG.info("Initiating the healing process for the flat database");
snapSyncState.setHealFlatDatabaseInProgress(true);
final Map<Bytes32, Bytes32> ranges = RangeManager.generateAllRanges(16);
ranges.forEach(
Expand Down Expand Up @@ -383,23 +408,22 @@ public void setPivotBlockSelector(final DynamicPivotBlockSelector pivotBlockSele
this.pivotBlockSelector = pivotBlockSelector;
}

public BlockAddedObserver getBlockAddedListener() {
public BlockAddedObserver createBlockchainObserver() {
return addedBlockContext -> {
if (snapSyncState.isHealTrieInProgress()) {
// if we receive a new pivot block we can restart the heal
pivotBlockSelector.check(
(____, isNewPivotBlock) -> {
if (isNewPivotBlock) {
snapSyncState.setWaitingBlockchain(false);
}
});
// if we are close to the head we can also restart the heal and finish snapsync
if (!pivotBlockSelector.isBlockchainBehind()) {
snapSyncState.setWaitingBlockchain(false);
}
if (!snapSyncState.isWaitingBlockchain()) {
reloadTrieHeal();
}
final AtomicBoolean foundNewPivotBlock = new AtomicBoolean(false);
pivotBlockSelector.check(
(____, isNewPivotBlock) -> {
if (isNewPivotBlock) {
foundNewPivotBlock.set(true);
}
});
// restart heal if close to head again
if (snapSyncState.isWaitingBlockchain() && !pivotBlockSelector.isBlockchainBehind()) {
snapSyncState.setWaitingBlockchain(false);
reloadTrieHeal();
// restart heal if we found a new pivot block
} else if (foundNewPivotBlock.get()) {
reloadTrieHeal();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ public void shouldStopWaitingBlockchainWhenNewPivotBlockAvailable(
.when(dynamicPivotBlockManager)
.check(any());

final BlockAddedObserver blockAddedListener = downloadState.getBlockAddedListener();
final BlockAddedObserver blockAddedListener = downloadState.createBlockchainObserver();
blockAddedListener.onBlockAdded(
BlockAddedEvent.createForHeadAdvancement(
new Block(
Expand Down Expand Up @@ -403,7 +403,7 @@ public void shouldStopWaitingBlockchainWhenCloseToTheHead(
when(snapSyncState.isWaitingBlockchain()).thenReturn(true);

when(dynamicPivotBlockManager.isBlockchainBehind()).thenReturn(false);
final BlockAddedObserver blockAddedListener = downloadState.getBlockAddedListener();
final BlockAddedObserver blockAddedListener = downloadState.createBlockchainObserver();
blockAddedListener.onBlockAdded(
BlockAddedEvent.createForHeadAdvancement(
new Block(
Expand Down

0 comments on commit 66c3e76

Please sign in to comment.