Skip to content

Commit

Permalink
Fix snapsync heal (hyperledger#5838)
Browse files Browse the repository at this point in the history
Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
Signed-off-by: garyschulte <garyschulte@gmail.com>
  • Loading branch information
matkt authored and garyschulte committed Sep 20, 2023
1 parent 8f31b77 commit cc2904d
Show file tree
Hide file tree
Showing 15 changed files with 415 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static Optional<FastSyncDownloader<?>> createCheckpointDownloader(
.getAccountToRepair()
.ifPresent(
address ->
snapContext.addAccountsToBeRepaired(
snapContext.addAccountToHealingList(
CompactEncoding.bytesToPath(address.addressHash())));
} else if (fastSyncState.getPivotBlockHeader().isEmpty()
&& protocolContext.getBlockchain().getChainHeadBlockNumber()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static Optional<FastSyncDownloader<?>> createSnapDownloader(
.getAccountToRepair()
.ifPresent(
address ->
snapContext.addAccountsToBeRepaired(
snapContext.addAccountToHealingList(
CompactEncoding.bytesToPath(address.addressHash())));
} else if (fastSyncState.getPivotBlockHeader().isEmpty()
&& protocolContext.getBlockchain().getChainHeadBlockNumber()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.AccountFlatDatabaseHealingRangeRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.StorageFlatDatabaseHealingRangeRequest;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.ethereum.worldstate.FlatDbMode;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.BesuMetricCategory;
Expand All @@ -43,6 +42,7 @@
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
Expand Down Expand Up @@ -72,7 +72,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>

protected final InMemoryTasksPriorityQueues<SnapDataRequest>
pendingStorageFlatDatabaseHealingRequests = new InMemoryTasksPriorityQueues<>();
private HashSet<Bytes> accountsToBeRepaired = new HashSet<>();
private HashSet<Bytes> accountsHealingList = new HashSet<>();
private DynamicPivotBlockSelector pivotBlockSelector;

private final SnapSyncStatePersistenceManager snapContext;
Expand Down Expand Up @@ -156,6 +156,7 @@ protected synchronized void markAsStalled(final int maxNodeRequestRetries) {
@Override
public synchronized boolean checkCompletion(final BlockHeader header) {

// Check if all snapsync tasks are completed
if (!internalFuture.isDone()
&& pendingAccountRequests.allTasksCompleted()
&& pendingCodeRequests.allTasksCompleted()
Expand All @@ -164,29 +165,50 @@ public synchronized boolean checkCompletion(final BlockHeader header) {
&& pendingTrieNodeRequests.allTasksCompleted()
&& pendingAccountFlatDatabaseHealingRequests.allTasksCompleted()
&& pendingStorageFlatDatabaseHealingRequests.allTasksCompleted()) {

// if all snapsync tasks are completed and the healing process was not running
if (!snapSyncState.isHealTrieInProgress()) {
// Register blockchain observer if not already registered
blockObserverId =
blockObserverId.isEmpty()
? OptionalLong.of(blockchain.observeBlockAdded(createBlockchainObserver()))
: blockObserverId;
// Start the healing process
startTrieHeal();
} else if (pivotBlockSelector.isBlockchainBehind()) {
}
// if all snapsync tasks are completed and the healing was running and blockchain is behind
// the pivot block
else if (pivotBlockSelector.isBlockchainBehind()) {
LOG.info("Pausing world state download while waiting for sync to complete");
if (blockObserverId.isEmpty())
blockObserverId = OptionalLong.of(blockchain.observeBlockAdded(getBlockAddedListener()));
// Set the snapsync to wait for the blockchain to catch up
snapSyncState.setWaitingBlockchain(true);
} else if (!snapSyncState.isHealFlatDatabaseInProgress()
&& worldStateStorage.getFlatDbMode().equals(FlatDbMode.FULL)) {
// only doing a flat db heal for bonsai
startFlatDatabaseHeal(header);
} else {
final WorldStateStorage.Updater updater = worldStateStorage.updater();
updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData);
updater.commit();
metricsManager.notifySnapSyncCompleted();
snapContext.clear();
internalFuture.complete(null);

return true;
}
// if all snapsync tasks are completed and the healing was running and the blockchain is not
// behind the pivot block
else {
// Remove the blockchain observer
blockObserverId.ifPresent(blockchain::removeObserver);
// If the flat database healing process is not in progress and the flat database mode is
// FULL
if (!snapSyncState.isHealFlatDatabaseInProgress()
&& worldStateStorage.getFlatDbMode().equals(FlatDbMode.FULL)) {
// Start the flat database healing process
startFlatDatabaseHeal(header);
}
// If the flat database healing process is in progress or the flat database mode is not FULL
else {
final WorldStateStorage.Updater updater = worldStateStorage.updater();
updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData);
updater.commit();
// Notify that the snap sync has completed
metricsManager.notifySnapSyncCompleted();
// Clear the snap context
snapContext.clear();
internalFuture.complete(null);
return true;
}
}
}

return false;
}

Expand All @@ -200,10 +222,11 @@ protected synchronized void cleanupQueues() {
pendingTrieNodeRequests.clear();
}

/** Method to start the healing process of the trie */
public synchronized void startTrieHeal() {
snapContext.clearAccountRangeTasks();
snapSyncState.setHealTrieStatus(true);
// try to find new pivot block before healing
// Try to find a new pivot block before starting the healing process
pivotBlockSelector.switchToNewPivotBlock(
(blockHeader, newPivotBlockFound) -> {
snapContext.clearAccountRangeTasks();
Expand All @@ -212,21 +235,25 @@ public synchronized void startTrieHeal() {
blockHeader.getNumber());
enqueueRequest(
createAccountTrieNodeDataRequest(
blockHeader.getStateRoot(), Bytes.EMPTY, accountsToBeRepaired));
blockHeader.getStateRoot(), Bytes.EMPTY, accountsHealingList));
});
}

/** Method to reload the healing process of the trie */
public synchronized void reloadTrieHeal() {
// Clear the flat database and trie log from the world state storage if needed
worldStateStorage.clearFlatDatabase();
worldStateStorage.clearTrieLog();
// Clear pending trie node and code requests
pendingTrieNodeRequests.clear();
pendingCodeRequests.clear();

snapSyncState.setHealTrieStatus(false);
checkCompletion(snapSyncState.getPivotBlockHeader().orElseThrow());
}

public synchronized void startFlatDatabaseHeal(final BlockHeader header) {
LOG.info("Running flat database heal process");
LOG.info("Initiating the healing process for the flat database");
snapSyncState.setHealFlatDatabaseInProgress(true);
final Map<Bytes32, Bytes32> ranges = RangeManager.generateAllRanges(16);
ranges.forEach(
Expand All @@ -235,10 +262,6 @@ public synchronized void startFlatDatabaseHeal(final BlockHeader header) {
createAccountFlatHealingRangeRequest(header.getStateRoot(), key, value)));
}

public boolean isBonsaiStorageFormat() {
return worldStateStorage.getDataStorageFormat().equals(DataStorageFormat.BONSAI);
}

@Override
public synchronized void enqueueRequest(final SnapDataRequest request) {
if (!internalFuture.isDone()) {
Expand All @@ -263,8 +286,8 @@ public synchronized void enqueueRequest(final SnapDataRequest request) {
}
}

public synchronized void setAccountsToBeRepaired(final HashSet<Bytes> accountsToBeRepaired) {
this.accountsToBeRepaired = accountsToBeRepaired;
public synchronized void setAccountsHealingList(final HashSet<Bytes> addAccountToHealingList) {
this.accountsHealingList = addAccountToHealingList;
}

/**
Expand All @@ -274,15 +297,15 @@ public synchronized void setAccountsToBeRepaired(final HashSet<Bytes> accountsTo
*
* @param account The account to be added for repair.
*/
public synchronized void addAccountsToBeRepaired(final Bytes account) {
if (!accountsToBeRepaired.contains(account)) {
snapContext.addAccountsToBeRepaired(account);
accountsToBeRepaired.add(account);
public synchronized void addAccountToHealingList(final Bytes account) {
if (!accountsHealingList.contains(account)) {
snapContext.addAccountToHealingList(account);
accountsHealingList.add(account);
}
}

public HashSet<Bytes> getAccountsToBeRepaired() {
return accountsToBeRepaired;
public HashSet<Bytes> getAccountsHealingList() {
return accountsHealingList;
}

@Override
Expand Down Expand Up @@ -385,25 +408,25 @@ public void setPivotBlockSelector(final DynamicPivotBlockSelector pivotBlockSele
this.pivotBlockSelector = pivotBlockSelector;
}

public BlockAddedObserver getBlockAddedListener() {
public BlockAddedObserver createBlockchainObserver() {
return addedBlockContext -> {
if (snapSyncState.isWaitingBlockchain()) {
// if we receive a new pivot block we can restart the heal
pivotBlockSelector.check(
(____, isNewPivotBlock) -> {
if (isNewPivotBlock) {
snapSyncState.setWaitingBlockchain(false);
}
});
// if we are close to the head we can also restart the heal and finish snapsync
if (!pivotBlockSelector.isBlockchainBehind()) {
snapSyncState.setWaitingBlockchain(false);
}
if (!snapSyncState.isWaitingBlockchain()) {
blockObserverId.ifPresent(blockchain::removeObserver);
blockObserverId = OptionalLong.empty();
reloadTrieHeal();
}
final AtomicBoolean foundNewPivotBlock = new AtomicBoolean(false);
pivotBlockSelector.check(
(____, isNewPivotBlock) -> {
if (isNewPivotBlock) {
foundNewPivotBlock.set(true);
}
});

final boolean isNewPivotBlockFound = foundNewPivotBlock.get();
final boolean isBlockchainCaughtUp =
snapSyncState.isWaitingBlockchain() && !pivotBlockSelector.isBlockchainBehind();

if (isNewPivotBlockFound
|| isBlockchainCaughtUp) { // restart heal if we found a new pivot block or if close to
// head again
snapSyncState.setWaitingBlockchain(false);
reloadTrieHeal();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ public CompletableFuture<Void> run(

final List<AccountRangeDataRequest> currentAccountRange =
snapContext.getCurrentAccountRange();
final HashSet<Bytes> inconsistentAccounts = snapContext.getAccountsToBeRepaired();
final HashSet<Bytes> inconsistentAccounts = snapContext.getAccountsHealingList();

if (!currentAccountRange.isEmpty()) { // continue to download worldstate ranges
newDownloadState.setAccountsToBeRepaired(inconsistentAccounts);
newDownloadState.setAccountsHealingList(inconsistentAccounts);
snapContext
.getCurrentAccountRange()
.forEach(
Expand All @@ -165,14 +165,14 @@ public CompletableFuture<Void> run(
DOWNLOAD, snapDataRequest.getStartKeyHash(), snapDataRequest.getEndKeyHash());
newDownloadState.enqueueRequest(snapDataRequest);
});
} else if (!snapContext.getAccountsToBeRepaired().isEmpty()) { // restart only the heal step
} else if (!snapContext.getAccountsHealingList().isEmpty()) { // restart only the heal step
snapSyncState.setHealTrieStatus(true);
worldStateStorage.clearFlatDatabase();
worldStateStorage.clearTrieLog();
newDownloadState.setAccountsToBeRepaired(inconsistentAccounts);
newDownloadState.setAccountsHealingList(inconsistentAccounts);
newDownloadState.enqueueRequest(
SnapDataRequest.createAccountTrieNodeDataRequest(
stateRoot, Bytes.EMPTY, snapContext.getAccountsToBeRepaired()));
stateRoot, Bytes.EMPTY, snapContext.getAccountsHealingList()));
} else {
// start from scratch
worldStateStorage.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*/
public class SnapSyncStatePersistenceManager {

private final byte[] SNAP_ACCOUNT_TO_BE_REPAIRED_INDEX =
private final byte[] SNAP_ACCOUNT_HEALING_LIST_INDEX =
"snapInconsistentAccountsStorageIndex".getBytes(StandardCharsets.UTF_8);

private final GenericKeyValueStorageFacade<BigInteger, AccountRangeDataRequest>
Expand Down Expand Up @@ -104,20 +104,20 @@ public void updatePersistedTasks(final List<? extends SnapDataRequest> accountRa
}

/**
* Persists the current accounts to be repaired in the database.
* Persists the current accounts to heal in the database.
*
* @param accountsToBeRepaired The current list of accounts to persist.
* @param accountsHealingList The current list of accounts to heal.
*/
public void addAccountsToBeRepaired(final Bytes accountsToBeRepaired) {
public void addAccountToHealingList(final Bytes accountsHealingList) {
final BigInteger index =
healContext
.get(SNAP_ACCOUNT_TO_BE_REPAIRED_INDEX)
.get(SNAP_ACCOUNT_HEALING_LIST_INDEX)
.map(bytes -> new BigInteger(bytes.toArrayUnsafe()).add(BigInteger.ONE))
.orElse(BigInteger.ZERO);
healContext.putAll(
keyValueStorageTransaction -> {
keyValueStorageTransaction.put(SNAP_ACCOUNT_TO_BE_REPAIRED_INDEX, index.toByteArray());
keyValueStorageTransaction.put(index.toByteArray(), accountsToBeRepaired.toArrayUnsafe());
keyValueStorageTransaction.put(SNAP_ACCOUNT_HEALING_LIST_INDEX, index.toByteArray());
keyValueStorageTransaction.put(index.toByteArray(), accountsHealingList.toArrayUnsafe());
});
}

Expand All @@ -127,9 +127,9 @@ public List<AccountRangeDataRequest> getCurrentAccountRange() {
.collect(Collectors.toList());
}

public HashSet<Bytes> getAccountsToBeRepaired() {
public HashSet<Bytes> getAccountsHealingList() {
return healContext
.streamValuesFromKeysThat(notEqualsTo(SNAP_ACCOUNT_TO_BE_REPAIRED_INDEX))
.streamValuesFromKeysThat(notEqualsTo(SNAP_ACCOUNT_HEALING_LIST_INDEX))
.collect(Collectors.toCollection(HashSet::new));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ public void addResponse(
if (!slots.isEmpty() || !proofs.isEmpty()) {
if (!worldStateProofProvider.isValidRangeProof(
startKeyHash, endKeyHash, storageRoot, proofs, slots)) {
// If the proof is invalid, it means that the storage will be a mix of several blocks.
// Therefore, it will be necessary to heal the account's storage subsequently
downloadState.addAccountToHealingList(CompactEncoding.bytesToPath(accountHash));
// We will request the new storage root of the account because it is apparently no longer
// valid with the new pivot block.
downloadState.enqueueRequest(
createAccountDataRequest(
getRootHash(), Hash.wrap(accountHash), startKeyHash, endKeyHash));
Expand Down Expand Up @@ -173,7 +178,7 @@ public Stream<SnapDataRequest> getChildRequests(
});
if (startKeyHash.equals(MIN_RANGE) && endKeyHash.equals(MAX_RANGE)) {
// need to heal this account storage
downloadState.addAccountsToBeRepaired(CompactEncoding.bytesToPath(accountHash));
downloadState.addAccountToHealingList(CompactEncoding.bytesToPath(accountHash));
}
});

Expand Down
Loading

0 comments on commit cc2904d

Please sign in to comment.