Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for backward sync wrongly thinking it is done after a restart #5182

Merged
merged 15 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Increase default from 1000 to 5000 for `--rpc-max-logs-range` #5209

### Bug Fixes
- Persist backward sync status to support resuming across restarts [#5182](https://github.com/hyperledger/besu/pull/5182)

### Download Links

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public void setup() {
.thenReturn(
new KeyValueStoragePrefixedKeyBlockchainStorage(
new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions()));
when(storageProvider.getStorageBySegmentIdentifier(any()))
.thenReturn(new InMemoryKeyValueStorage());
when(synchronizerConfiguration.getDownloaderParallelism()).thenReturn(1);
when(synchronizerConfiguration.getTransactionsParallelism()).thenReturn(1);
when(synchronizerConfiguration.getComputationParallelism()).thenReturn(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier;

import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -35,20 +36,48 @@
public class BackwardChain {
private static final Logger LOG = getLogger(BackwardChain.class);

private static final String FIRST_STORED_ANCESTOR_KEY = "firstStoredAncestor";
private static final String LAST_STORED_PIVOT_KEY = "lastStoredPivot";

private final GenericKeyValueStorageFacade<Hash, BlockHeader> headers;
private final GenericKeyValueStorageFacade<Hash, Block> blocks;
private final GenericKeyValueStorageFacade<Hash, Hash> chainStorage;
private Optional<BlockHeader> firstStoredAncestor = Optional.empty();
private Optional<BlockHeader> lastStoredPivot = Optional.empty();
private final GenericKeyValueStorageFacade<String, BlockHeader> sessionDataStorage;
private Optional<BlockHeader> firstStoredAncestor;
private Optional<BlockHeader> lastStoredPivot;
private final Queue<Hash> hashesToAppend = new ArrayDeque<>();

public BackwardChain(
final GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage,
final GenericKeyValueStorageFacade<Hash, Block> blocksStorage,
final GenericKeyValueStorageFacade<Hash, Hash> chainStorage) {
final GenericKeyValueStorageFacade<Hash, Hash> chainStorage,
final GenericKeyValueStorageFacade<String, BlockHeader> sessionDataStorage) {
this.headers = headersStorage;
this.blocks = blocksStorage;
this.chainStorage = chainStorage;
this.sessionDataStorage = sessionDataStorage;
firstStoredAncestor =
sessionDataStorage
.get(FIRST_STORED_ANCESTOR_KEY)
.map(
header -> {
LOG.atDebug()
.setMessage(FIRST_STORED_ANCESTOR_KEY + " loaded from storage with value {}")
.addArgument(header::toLogString)
.log();
return header;
});
lastStoredPivot =
sessionDataStorage
.get(LAST_STORED_PIVOT_KEY)
.map(
header -> {
LOG.atDebug()
.setMessage(LAST_STORED_PIVOT_KEY + " loaded from storage with value {}")
.addArgument(header::toLogString)
.log();
return header;
});
}

public static BackwardChain from(
Expand All @@ -67,6 +96,14 @@ public static BackwardChain from(
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe,
new HashConvertor(),
storageProvider.getStorageBySegmentIdentifier(
KeyValueSegmentIdentifier.BACKWARD_SYNC_CHAIN)),
// using BACKWARD_SYNC_CHAIN that contains the sequence of the work to do,
// to also store the session data that will be used to resume
// the backward sync from where it was left before the restart
new GenericKeyValueStorageFacade<>(
key -> key.getBytes(StandardCharsets.UTF_8),
BlocksHeadersConvertor.of(blockHeaderFunctions),
storageProvider.getStorageBySegmentIdentifier(
KeyValueSegmentIdentifier.BACKWARD_SYNC_CHAIN)));
siladu marked this conversation as resolved.
Show resolved Hide resolved
}
Expand All @@ -86,22 +123,43 @@ public synchronized List<BlockHeader> getFirstNAncestorHeaders(final int size) {
}

public synchronized void prependAncestorsHeader(final BlockHeader blockHeader) {
if (firstStoredAncestor.isEmpty()) {
firstStoredAncestor = Optional.of(blockHeader);
lastStoredPivot = Optional.of(blockHeader);
prependAncestorsHeader(blockHeader, false);
}

public synchronized void prependAncestorsHeader(
siladu marked this conversation as resolved.
Show resolved Hide resolved
final BlockHeader blockHeader, final boolean alreadyStored) {
if (!alreadyStored) {
siladu marked this conversation as resolved.
Show resolved Hide resolved
headers.put(blockHeader.getHash(), blockHeader);
return;
}
final BlockHeader firstHeader = firstStoredAncestor.get();
headers.put(blockHeader.getHash(), blockHeader);
chainStorage.put(blockHeader.getHash(), firstHeader.getHash());
firstStoredAncestor = Optional.of(blockHeader);
LOG.atDebug()
.setMessage("Added header {} to backward chain led by pivot {} on height {}")
.addArgument(blockHeader::toLogString)
.addArgument(() -> lastStoredPivot.orElseThrow().toLogString())
.addArgument(firstHeader::getNumber)
.log();

if (firstStoredAncestor.isEmpty()) {
updateLastStoredPivot(Optional.of(blockHeader));
} else {
final BlockHeader firstHeader = firstStoredAncestor.get();
chainStorage.put(blockHeader.getHash(), firstHeader.getHash());
LOG.atDebug()
.setMessage("Added header {} to backward chain led by pivot {} on height {}")
.addArgument(blockHeader::toLogString)
.addArgument(() -> lastStoredPivot.orElseThrow().toLogString())
.addArgument(firstHeader::getNumber)
.log();
}

updateFirstStoredAncestor(Optional.of(blockHeader));
}

private void updateFirstStoredAncestor(final Optional<BlockHeader> maybeHeader) {
maybeHeader.ifPresentOrElse(
header -> sessionDataStorage.put(FIRST_STORED_ANCESTOR_KEY, header),
() -> sessionDataStorage.drop(FIRST_STORED_ANCESTOR_KEY));
firstStoredAncestor = maybeHeader;
}

private void updateLastStoredPivot(final Optional<BlockHeader> maybeHeader) {
maybeHeader.ifPresentOrElse(
header -> sessionDataStorage.put(LAST_STORED_PIVOT_KEY, header),
() -> sessionDataStorage.drop(LAST_STORED_PIVOT_KEY));
lastStoredPivot = maybeHeader;
}

public synchronized Optional<Block> getPivot() {
Expand All @@ -118,9 +176,9 @@ public synchronized void dropFirstHeader() {
headers.drop(firstStoredAncestor.get().getHash());
final Optional<Hash> hash = chainStorage.get(firstStoredAncestor.get().getHash());
chainStorage.drop(firstStoredAncestor.get().getHash());
firstStoredAncestor = hash.flatMap(headers::get);
updateFirstStoredAncestor(hash.flatMap(headers::get));
if (firstStoredAncestor.isEmpty()) {
lastStoredPivot = Optional.empty();
updateLastStoredPivot(Optional.empty());
}
}

Expand All @@ -129,7 +187,7 @@ public synchronized void appendTrustedBlock(final Block newPivot) {
headers.put(newPivot.getHash(), newPivot.getHeader());
blocks.put(newPivot.getHash(), newPivot);
if (lastStoredPivot.isEmpty()) {
firstStoredAncestor = Optional.of(newPivot.getHeader());
updateFirstStoredAncestor(Optional.of(newPivot.getHeader()));
} else {
if (newPivot.getHeader().getParentHash().equals(lastStoredPivot.get().getHash())) {
LOG.atDebug()
Expand All @@ -140,14 +198,14 @@ public synchronized void appendTrustedBlock(final Block newPivot) {
.log();
chainStorage.put(lastStoredPivot.get().getHash(), newPivot.getHash());
} else {
firstStoredAncestor = Optional.of(newPivot.getHeader());
updateFirstStoredAncestor(Optional.of(newPivot.getHeader()));
LOG.atDebug()
.setMessage("Re-pivoting to new target block {}")
.addArgument(newPivot::toLogString)
.log();
}
}
lastStoredPivot = Optional.of(newPivot.getHeader());
updateLastStoredPivot(Optional.of(newPivot.getHeader()));
}

public synchronized boolean isTrusted(final Hash hash) {
Expand All @@ -162,6 +220,7 @@ public synchronized void clear() {
blocks.clear();
headers.clear();
chainStorage.clear();
sessionDataStorage.clear();
firstStoredAncestor = Optional.empty();
lastStoredPivot = Optional.empty();
hashesToAppend.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;

public class BackwardsSyncAlgorithm implements BesuEvents.InitialSyncCompletionListener {
private static final Logger LOG = getLogger(BackwardsSyncAlgorithm.class);
public class BackwardSyncAlgorithm implements BesuEvents.InitialSyncCompletionListener {
private static final Logger LOG = getLogger(BackwardSyncAlgorithm.class);

private final BackwardSyncContext context;
private final FinalBlockConfirmation finalBlockConfirmation;
private final AtomicReference<CountDownLatch> latch =
new AtomicReference<>(new CountDownLatch(1));
private volatile boolean finished = false;

public BackwardsSyncAlgorithm(
public BackwardSyncAlgorithm(
final BackwardSyncContext context, final FinalBlockConfirmation finalBlockConfirmation) {
this.context = context;
this.finalBlockConfirmation = finalBlockConfirmation;
Expand All @@ -64,7 +64,10 @@ public CompletableFuture<Void> pickNextStep() {
return executeSyncStep(firstHash.get())
.thenAccept(
result -> {
LOG.info("Backward sync target block is {}", result.toLogString());
LOG.atDebug()
siladu marked this conversation as resolved.
Show resolved Hide resolved
.setMessage("Backward sync target block is {}")
.addArgument(result::toLogString)
.log();
context.getBackwardChain().removeFromHashToAppend(firstHash.get());
context.getStatus().updateTargetHeight(result.getHeader().getNumber());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private boolean isTrusted(final Hash hash) {
if (backwardChain.isTrusted(hash)) {
LOG.atDebug()
.setMessage(
"not fetching or appending hash {} to backwards sync since it is present in successors")
"Not fetching or appending hash {} to backward sync since it is present in successors")
.addArgument(hash::toHexString)
.log();
return true;
Expand Down Expand Up @@ -237,7 +237,7 @@ private Optional<BackwardSyncException> extractBackwardSyncException(final Throw
@VisibleForTesting
CompletableFuture<Void> prepareBackwardSyncFuture() {
final MutableBlockchain blockchain = getProtocolContext().getBlockchain();
return new BackwardsSyncAlgorithm(
return new BackwardSyncAlgorithm(
this,
FinalBlockConfirmation.confirmationChain(
FinalBlockConfirmation.genesisConfirmation(blockchain),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected Hash possibleRestoreOldNodes(final BlockHeader firstAncestor) {
Hash lastHash = firstAncestor.getParentHash();
Optional<BlockHeader> iterator = backwardChain.getHeader(lastHash);
while (iterator.isPresent()) {
backwardChain.prependAncestorsHeader(iterator.get());
backwardChain.prependAncestorsHeader(iterator.get(), true);
jframe marked this conversation as resolved.
Show resolved Hide resolved
lastHash = iterator.get().getParentHash();
iterator = backwardChain.getHeader(lastHash);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class BackwardSyncAlgSpec {
@Captor ArgumentCaptor<BesuEvents.TTDReachedListener> ttdCaptor;
@Captor ArgumentCaptor<BesuEvents.InitialSyncCompletionListener> completionCaptor;

@InjectMocks BackwardsSyncAlgorithm algorithm;
@InjectMocks BackwardSyncAlgorithm algorithm;
@Mock private Hash hash;

private static final BlockDataGenerator blockDataGenerator = new BlockDataGenerator();
Expand Down Expand Up @@ -95,7 +95,7 @@ public void setUp() throws Exception {

algorithm =
Mockito.spy(
new BackwardsSyncAlgorithm(
new BackwardSyncAlgorithm(
context,
FinalBlockConfirmation.confirmationChain(
FinalBlockConfirmation.genesisConfirmation(localBlockchain),
Expand Down Expand Up @@ -292,7 +292,7 @@ public void shouldFailIfADifferentGenesisIsReached() {
doReturn(backwardChain).when(context).getBackwardChain();
algorithm =
Mockito.spy(
new BackwardsSyncAlgorithm(
new BackwardSyncAlgorithm(
context, FinalBlockConfirmation.genesisConfirmation(otherLocalBlockchain)));
assertThatThrownBy(() -> algorithm.pickNextStep())
.isInstanceOf(BackwardSyncException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -193,7 +194,12 @@ public static BackwardChain inMemoryBackwardChain() {
final GenericKeyValueStorageFacade<Hash, Hash> chainStorage =
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe, new HashConvertor(), new InMemoryKeyValueStorage());
return new BackwardChain(headersStorage, blocksStorage, chainStorage);
final GenericKeyValueStorageFacade<String, BlockHeader> sessionDataStorage =
new GenericKeyValueStorageFacade<>(
key -> key.getBytes(StandardCharsets.UTF_8),
BlocksHeadersConvertor.of(new MainnetBlockHeaderFunctions()),
new InMemoryKeyValueStorage());
return new BackwardChain(headersStorage, blocksStorage, chainStorage, sessionDataStorage);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class BackwardSyncStepTest {
GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage;
GenericKeyValueStorageFacade<Hash, Block> blocksStorage;
GenericKeyValueStorageFacade<Hash, Hash> chainStorage;
GenericKeyValueStorageFacade<String, BlockHeader> sessionDataStorage;

@Before
public void setup() {
Expand All @@ -86,10 +88,14 @@ public void setup() {
Hash::toArrayUnsafe,
new BlocksConvertor(new MainnetBlockHeaderFunctions()),
new InMemoryKeyValueStorage());

chainStorage =
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe, new HashConvertor(), new InMemoryKeyValueStorage());
sessionDataStorage =
new GenericKeyValueStorageFacade<>(
key -> key.getBytes(StandardCharsets.UTF_8),
new BlocksHeadersConvertor(new MainnetBlockHeaderFunctions()),
new InMemoryKeyValueStorage());

Block genesisBlock = blockDataGenerator.genesisBlock();
remoteBlockchain = createInMemoryBlockchain(genesisBlock);
Expand Down Expand Up @@ -234,7 +240,7 @@ private BackwardChain createBackwardChain(final int from, final int until) {
@Nonnull
private BackwardChain createBackwardChain(final int number) {
final BackwardChain backwardChain =
new BackwardChain(headersStorage, blocksStorage, chainStorage);
new BackwardChain(headersStorage, blocksStorage, chainStorage, sessionDataStorage);
backwardChain.appendTrustedBlock(remoteBlockchain.getBlockByNumber(number).orElseThrow());
return backwardChain;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.hyperledger.besu.ethereum.referencetests.ReferenceTestWorldState;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -73,8 +74,8 @@ public class ForwardSyncStepTest {
private MutableBlockchain localBlockchain;
GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage;
GenericKeyValueStorageFacade<Hash, Block> blocksStorage;

GenericKeyValueStorageFacade<Hash, Hash> chainStorage;
GenericKeyValueStorageFacade<String, BlockHeader> sessionDataStorage;

@Before
public void setup() {
Expand All @@ -91,6 +92,11 @@ public void setup() {
chainStorage =
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe, new HashConvertor(), new InMemoryKeyValueStorage());
sessionDataStorage =
new GenericKeyValueStorageFacade<>(
key -> key.getBytes(StandardCharsets.UTF_8),
new BlocksHeadersConvertor(new MainnetBlockHeaderFunctions()),
new InMemoryKeyValueStorage());

Block genesisBlock = blockDataGenerator.genesisBlock();
remoteBlockchain = createInMemoryBlockchain(genesisBlock);
Expand Down Expand Up @@ -197,7 +203,7 @@ private BackwardChain createBackwardChain(final int from, final int until) {
@Nonnull
private BackwardChain backwardChainFromBlock(final int number) {
final BackwardChain backwardChain =
new BackwardChain(headersStorage, blocksStorage, chainStorage);
new BackwardChain(headersStorage, blocksStorage, chainStorage, sessionDataStorage);
backwardChain.appendTrustedBlock(remoteBlockchain.getBlockByNumber(number).orElseThrow());
return backwardChain;
}
Expand Down
Loading