Skip to content

Commit

Permalink
Fix for backward sync wrongly thinking it is done after a restart (hy…
Browse files Browse the repository at this point in the history
…perledger#5182)

<!-- Thanks for sending a pull request! Please check out our
contribution guidelines: -->
<!-- https://github.com/hyperledger/besu/blob/main/CONTRIBUTING.md -->

## PR description

There is an issue when restarting Besu when a backward sync session is
running, since after the restart it is possible that the Consensus
client sends a FcU or a NewPayload for a block that is present in the
backward sync storage, but not yet imported, so not on the main chain,
but still the backward sync thinks it should not do anything with that
block, so it returns like it has completed the sync, but since the sync
is not done actually then the internal error that the finalize block is
not present.

The solution is to persist the backward sync status, so in case of a
restart, it can resume from where it was interrupted.

## Fixed Issue(s)
<!-- Please link to fixed issue(s) here using format: fixes #<issue
number> -->
<!-- Example: "fixes #2" -->

fixes hyperledger#5053 

## Documentation

- [x] I thought about documentation and added the `doc-change-required`
label to this PR if
[updates are
required](https://wiki.hyperledger.org/display/BESU/Documentation).

## Acceptance Tests (Non Mainnet)

- [x] I have considered running `./gradlew acceptanceTestNonMainnet`
locally if my PR affects non-mainnet modules.

## Changelog

- [x] I thought about the changelog and included a [changelog update if
required](https://wiki.hyperledger.org/display/BESU/Changelog).

---------

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
Co-authored-by: Sally MacFarlane <macfarla.github@gmail.com>
  • Loading branch information
2 people authored and elenduuche committed Aug 16, 2023
1 parent eb1378f commit 9a72145
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Increase default from 1000 to 5000 for `--rpc-max-logs-range` #5209

### Bug Fixes
- Persist backward sync status to support resuming across restarts [#5182](https://github.com/hyperledger/besu/pull/5182)

### Download Links

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public void setup() {
.thenReturn(
new KeyValueStoragePrefixedKeyBlockchainStorage(
new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions()));
when(storageProvider.getStorageBySegmentIdentifier(any()))
.thenReturn(new InMemoryKeyValueStorage());
when(synchronizerConfiguration.getDownloaderParallelism()).thenReturn(1);
when(synchronizerConfiguration.getTransactionsParallelism()).thenReturn(1);
when(synchronizerConfiguration.getComputationParallelism()).thenReturn(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier;

import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -35,20 +36,48 @@
public class BackwardChain {
private static final Logger LOG = getLogger(BackwardChain.class);

private static final String FIRST_STORED_ANCESTOR_KEY = "firstStoredAncestor";
private static final String LAST_STORED_PIVOT_KEY = "lastStoredPivot";

private final GenericKeyValueStorageFacade<Hash, BlockHeader> headers;
private final GenericKeyValueStorageFacade<Hash, Block> blocks;
private final GenericKeyValueStorageFacade<Hash, Hash> chainStorage;
private Optional<BlockHeader> firstStoredAncestor = Optional.empty();
private Optional<BlockHeader> lastStoredPivot = Optional.empty();
private final GenericKeyValueStorageFacade<String, BlockHeader> sessionDataStorage;
private Optional<BlockHeader> firstStoredAncestor;
private Optional<BlockHeader> lastStoredPivot;
private final Queue<Hash> hashesToAppend = new ArrayDeque<>();

public BackwardChain(
final GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage,
final GenericKeyValueStorageFacade<Hash, Block> blocksStorage,
final GenericKeyValueStorageFacade<Hash, Hash> chainStorage) {
final GenericKeyValueStorageFacade<Hash, Hash> chainStorage,
final GenericKeyValueStorageFacade<String, BlockHeader> sessionDataStorage) {
this.headers = headersStorage;
this.blocks = blocksStorage;
this.chainStorage = chainStorage;
this.sessionDataStorage = sessionDataStorage;
firstStoredAncestor =
sessionDataStorage
.get(FIRST_STORED_ANCESTOR_KEY)
.map(
header -> {
LOG.atDebug()
.setMessage(FIRST_STORED_ANCESTOR_KEY + " loaded from storage with value {}")
.addArgument(header::toLogString)
.log();
return header;
});
lastStoredPivot =
sessionDataStorage
.get(LAST_STORED_PIVOT_KEY)
.map(
header -> {
LOG.atDebug()
.setMessage(LAST_STORED_PIVOT_KEY + " loaded from storage with value {}")
.addArgument(header::toLogString)
.log();
return header;
});
}

public static BackwardChain from(
Expand All @@ -67,6 +96,14 @@ public static BackwardChain from(
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe,
new HashConvertor(),
storageProvider.getStorageBySegmentIdentifier(
KeyValueSegmentIdentifier.BACKWARD_SYNC_CHAIN)),
// using BACKWARD_SYNC_CHAIN that contains the sequence of the work to do,
// to also store the session data that will be used to resume
// the backward sync from where it was left before the restart
new GenericKeyValueStorageFacade<>(
key -> key.getBytes(StandardCharsets.UTF_8),
BlocksHeadersConvertor.of(blockHeaderFunctions),
storageProvider.getStorageBySegmentIdentifier(
KeyValueSegmentIdentifier.BACKWARD_SYNC_CHAIN)));
}
Expand All @@ -86,22 +123,43 @@ public synchronized List<BlockHeader> getFirstNAncestorHeaders(final int size) {
}

public synchronized void prependAncestorsHeader(final BlockHeader blockHeader) {
if (firstStoredAncestor.isEmpty()) {
firstStoredAncestor = Optional.of(blockHeader);
lastStoredPivot = Optional.of(blockHeader);
prependAncestorsHeader(blockHeader, false);
}

public synchronized void prependAncestorsHeader(
final BlockHeader blockHeader, final boolean alreadyStored) {
if (!alreadyStored) {
headers.put(blockHeader.getHash(), blockHeader);
return;
}
final BlockHeader firstHeader = firstStoredAncestor.get();
headers.put(blockHeader.getHash(), blockHeader);
chainStorage.put(blockHeader.getHash(), firstHeader.getHash());
firstStoredAncestor = Optional.of(blockHeader);
LOG.atDebug()
.setMessage("Added header {} to backward chain led by pivot {} on height {}")
.addArgument(blockHeader::toLogString)
.addArgument(() -> lastStoredPivot.orElseThrow().toLogString())
.addArgument(firstHeader::getNumber)
.log();

if (firstStoredAncestor.isEmpty()) {
updateLastStoredPivot(Optional.of(blockHeader));
} else {
final BlockHeader firstHeader = firstStoredAncestor.get();
chainStorage.put(blockHeader.getHash(), firstHeader.getHash());
LOG.atDebug()
.setMessage("Added header {} to backward chain led by pivot {} on height {}")
.addArgument(blockHeader::toLogString)
.addArgument(() -> lastStoredPivot.orElseThrow().toLogString())
.addArgument(firstHeader::getNumber)
.log();
}

updateFirstStoredAncestor(Optional.of(blockHeader));
}

private void updateFirstStoredAncestor(final Optional<BlockHeader> maybeHeader) {
maybeHeader.ifPresentOrElse(
header -> sessionDataStorage.put(FIRST_STORED_ANCESTOR_KEY, header),
() -> sessionDataStorage.drop(FIRST_STORED_ANCESTOR_KEY));
firstStoredAncestor = maybeHeader;
}

private void updateLastStoredPivot(final Optional<BlockHeader> maybeHeader) {
maybeHeader.ifPresentOrElse(
header -> sessionDataStorage.put(LAST_STORED_PIVOT_KEY, header),
() -> sessionDataStorage.drop(LAST_STORED_PIVOT_KEY));
lastStoredPivot = maybeHeader;
}

public synchronized Optional<Block> getPivot() {
Expand All @@ -118,9 +176,9 @@ public synchronized void dropFirstHeader() {
headers.drop(firstStoredAncestor.get().getHash());
final Optional<Hash> hash = chainStorage.get(firstStoredAncestor.get().getHash());
chainStorage.drop(firstStoredAncestor.get().getHash());
firstStoredAncestor = hash.flatMap(headers::get);
updateFirstStoredAncestor(hash.flatMap(headers::get));
if (firstStoredAncestor.isEmpty()) {
lastStoredPivot = Optional.empty();
updateLastStoredPivot(Optional.empty());
}
}

Expand All @@ -129,7 +187,7 @@ public synchronized void appendTrustedBlock(final Block newPivot) {
headers.put(newPivot.getHash(), newPivot.getHeader());
blocks.put(newPivot.getHash(), newPivot);
if (lastStoredPivot.isEmpty()) {
firstStoredAncestor = Optional.of(newPivot.getHeader());
updateFirstStoredAncestor(Optional.of(newPivot.getHeader()));
} else {
if (newPivot.getHeader().getParentHash().equals(lastStoredPivot.get().getHash())) {
LOG.atDebug()
Expand All @@ -140,14 +198,14 @@ public synchronized void appendTrustedBlock(final Block newPivot) {
.log();
chainStorage.put(lastStoredPivot.get().getHash(), newPivot.getHash());
} else {
firstStoredAncestor = Optional.of(newPivot.getHeader());
updateFirstStoredAncestor(Optional.of(newPivot.getHeader()));
LOG.atDebug()
.setMessage("Re-pivoting to new target block {}")
.addArgument(newPivot::toLogString)
.log();
}
}
lastStoredPivot = Optional.of(newPivot.getHeader());
updateLastStoredPivot(Optional.of(newPivot.getHeader()));
}

public synchronized boolean isTrusted(final Hash hash) {
Expand All @@ -162,6 +220,7 @@ public synchronized void clear() {
blocks.clear();
headers.clear();
chainStorage.clear();
sessionDataStorage.clear();
firstStoredAncestor = Optional.empty();
lastStoredPivot = Optional.empty();
hashesToAppend.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;

public class BackwardsSyncAlgorithm implements BesuEvents.InitialSyncCompletionListener {
private static final Logger LOG = getLogger(BackwardsSyncAlgorithm.class);
public class BackwardSyncAlgorithm implements BesuEvents.InitialSyncCompletionListener {
private static final Logger LOG = getLogger(BackwardSyncAlgorithm.class);

private final BackwardSyncContext context;
private final FinalBlockConfirmation finalBlockConfirmation;
private final AtomicReference<CountDownLatch> latch =
new AtomicReference<>(new CountDownLatch(1));
private volatile boolean finished = false;

public BackwardsSyncAlgorithm(
public BackwardSyncAlgorithm(
final BackwardSyncContext context, final FinalBlockConfirmation finalBlockConfirmation) {
this.context = context;
this.finalBlockConfirmation = finalBlockConfirmation;
Expand All @@ -64,7 +64,10 @@ public CompletableFuture<Void> pickNextStep() {
return executeSyncStep(firstHash.get())
.thenAccept(
result -> {
LOG.info("Backward sync target block is {}", result.toLogString());
LOG.atDebug()
.setMessage("Backward sync target block is {}")
.addArgument(result::toLogString)
.log();
context.getBackwardChain().removeFromHashToAppend(firstHash.get());
context.getStatus().updateTargetHeight(result.getHeader().getNumber());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private boolean isTrusted(final Hash hash) {
if (backwardChain.isTrusted(hash)) {
LOG.atDebug()
.setMessage(
"not fetching or appending hash {} to backwards sync since it is present in successors")
"Not fetching or appending hash {} to backward sync since it is present in successors")
.addArgument(hash::toHexString)
.log();
return true;
Expand Down Expand Up @@ -237,7 +237,7 @@ private Optional<BackwardSyncException> extractBackwardSyncException(final Throw
@VisibleForTesting
CompletableFuture<Void> prepareBackwardSyncFuture() {
final MutableBlockchain blockchain = getProtocolContext().getBlockchain();
return new BackwardsSyncAlgorithm(
return new BackwardSyncAlgorithm(
this,
FinalBlockConfirmation.confirmationChain(
FinalBlockConfirmation.genesisConfirmation(blockchain),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected Hash possibleRestoreOldNodes(final BlockHeader firstAncestor) {
Hash lastHash = firstAncestor.getParentHash();
Optional<BlockHeader> iterator = backwardChain.getHeader(lastHash);
while (iterator.isPresent()) {
backwardChain.prependAncestorsHeader(iterator.get());
backwardChain.prependAncestorsHeader(iterator.get(), true);
lastHash = iterator.get().getParentHash();
iterator = backwardChain.getHeader(lastHash);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class BackwardSyncAlgSpec {
@Captor ArgumentCaptor<BesuEvents.TTDReachedListener> ttdCaptor;
@Captor ArgumentCaptor<BesuEvents.InitialSyncCompletionListener> completionCaptor;

@InjectMocks BackwardsSyncAlgorithm algorithm;
@InjectMocks BackwardSyncAlgorithm algorithm;
@Mock private Hash hash;

private static final BlockDataGenerator blockDataGenerator = new BlockDataGenerator();
Expand Down Expand Up @@ -95,7 +95,7 @@ public void setUp() throws Exception {

algorithm =
Mockito.spy(
new BackwardsSyncAlgorithm(
new BackwardSyncAlgorithm(
context,
FinalBlockConfirmation.confirmationChain(
FinalBlockConfirmation.genesisConfirmation(localBlockchain),
Expand Down Expand Up @@ -292,7 +292,7 @@ public void shouldFailIfADifferentGenesisIsReached() {
doReturn(backwardChain).when(context).getBackwardChain();
algorithm =
Mockito.spy(
new BackwardsSyncAlgorithm(
new BackwardSyncAlgorithm(
context, FinalBlockConfirmation.genesisConfirmation(otherLocalBlockchain)));
assertThatThrownBy(() -> algorithm.pickNextStep())
.isInstanceOf(BackwardSyncException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -193,7 +194,12 @@ public static BackwardChain inMemoryBackwardChain() {
final GenericKeyValueStorageFacade<Hash, Hash> chainStorage =
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe, new HashConvertor(), new InMemoryKeyValueStorage());
return new BackwardChain(headersStorage, blocksStorage, chainStorage);
final GenericKeyValueStorageFacade<String, BlockHeader> sessionDataStorage =
new GenericKeyValueStorageFacade<>(
key -> key.getBytes(StandardCharsets.UTF_8),
BlocksHeadersConvertor.of(new MainnetBlockHeaderFunctions()),
new InMemoryKeyValueStorage());
return new BackwardChain(headersStorage, blocksStorage, chainStorage, sessionDataStorage);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class BackwardSyncStepTest {
GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage;
GenericKeyValueStorageFacade<Hash, Block> blocksStorage;
GenericKeyValueStorageFacade<Hash, Hash> chainStorage;
GenericKeyValueStorageFacade<String, BlockHeader> sessionDataStorage;

@Before
public void setup() {
Expand All @@ -86,10 +88,14 @@ public void setup() {
Hash::toArrayUnsafe,
new BlocksConvertor(new MainnetBlockHeaderFunctions()),
new InMemoryKeyValueStorage());

chainStorage =
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe, new HashConvertor(), new InMemoryKeyValueStorage());
sessionDataStorage =
new GenericKeyValueStorageFacade<>(
key -> key.getBytes(StandardCharsets.UTF_8),
new BlocksHeadersConvertor(new MainnetBlockHeaderFunctions()),
new InMemoryKeyValueStorage());

Block genesisBlock = blockDataGenerator.genesisBlock();
remoteBlockchain = createInMemoryBlockchain(genesisBlock);
Expand Down Expand Up @@ -234,7 +240,7 @@ private BackwardChain createBackwardChain(final int from, final int until) {
@Nonnull
private BackwardChain createBackwardChain(final int number) {
final BackwardChain backwardChain =
new BackwardChain(headersStorage, blocksStorage, chainStorage);
new BackwardChain(headersStorage, blocksStorage, chainStorage, sessionDataStorage);
backwardChain.appendTrustedBlock(remoteBlockchain.getBlockByNumber(number).orElseThrow());
return backwardChain;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.hyperledger.besu.ethereum.referencetests.ReferenceTestWorldState;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -73,8 +74,8 @@ public class ForwardSyncStepTest {
private MutableBlockchain localBlockchain;
GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage;
GenericKeyValueStorageFacade<Hash, Block> blocksStorage;

GenericKeyValueStorageFacade<Hash, Hash> chainStorage;
GenericKeyValueStorageFacade<String, BlockHeader> sessionDataStorage;

@Before
public void setup() {
Expand All @@ -91,6 +92,11 @@ public void setup() {
chainStorage =
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe, new HashConvertor(), new InMemoryKeyValueStorage());
sessionDataStorage =
new GenericKeyValueStorageFacade<>(
key -> key.getBytes(StandardCharsets.UTF_8),
new BlocksHeadersConvertor(new MainnetBlockHeaderFunctions()),
new InMemoryKeyValueStorage());

Block genesisBlock = blockDataGenerator.genesisBlock();
remoteBlockchain = createInMemoryBlockchain(genesisBlock);
Expand Down Expand Up @@ -197,7 +203,7 @@ private BackwardChain createBackwardChain(final int from, final int until) {
@Nonnull
private BackwardChain backwardChainFromBlock(final int number) {
final BackwardChain backwardChain =
new BackwardChain(headersStorage, blocksStorage, chainStorage);
new BackwardChain(headersStorage, blocksStorage, chainStorage, sessionDataStorage);
backwardChain.appendTrustedBlock(remoteBlockchain.getBlockByNumber(number).orElseThrow());
return backwardChain;
}
Expand Down
Loading

0 comments on commit 9a72145

Please sign in to comment.