Skip to content

Commit

Permalink
Block prop on first final (hyperledger#4265)
Browse files Browse the repository at this point in the history
* start filtering peers after 1 finalized instead of 2
* stops counting finalized, and starts filtering on first finalized
* DefaultSynchronizer now listens to Forkhoice messages so it can stop block propagation at finalization, as opposed to TTD (previous behavior)

Signed-off-by: Justin Florentine <justin+github@florentine.us>
  • Loading branch information
jflo authored and freemanzMrojo committed Aug 19, 2022
1 parent 2e2ea1f commit d0283f4
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,19 +411,14 @@ public BesuController build() {
final PivotBlockSelector pivotBlockSelector = createPivotSelector(protocolContext);

final Synchronizer synchronizer =
new DefaultSynchronizer(
syncConfig,
createSynchronizer(
protocolSchedule,
protocolContext,
worldStateStorage,
ethProtocolManager.getBlockBroadcaster(),
protocolContext,
maybePruner,
ethContext,
syncState,
dataDirectory,
clock,
metricsSystem,
getFullSyncTerminationCondition(protocolContext.getBlockchain()),
ethProtocolManager,
pivotBlockSelector);

final MiningCoordinator miningCoordinator =
Expand Down Expand Up @@ -469,6 +464,44 @@ public BesuController build() {
additionalPluginServices);
}

private Synchronizer createSynchronizer(
final ProtocolSchedule protocolSchedule,
final WorldStateStorage worldStateStorage,
final ProtocolContext protocolContext,
final Optional<Pruner> maybePruner,
final EthContext ethContext,
final SyncState syncState,
final EthProtocolManager ethProtocolManager,
final PivotBlockSelector pivotBlockSelector) {

final GenesisConfigOptions maybeForTTD = configOptionsSupplier.get();

DefaultSynchronizer toUse =
new DefaultSynchronizer(
syncConfig,
protocolSchedule,
protocolContext,
worldStateStorage,
ethProtocolManager.getBlockBroadcaster(),
maybePruner,
ethContext,
syncState,
dataDirectory,
clock,
metricsSystem,
getFullSyncTerminationCondition(protocolContext.getBlockchain()),
pivotBlockSelector);
if (maybeForTTD.getTerminalTotalDifficulty().isPresent()) {
LOG.info(
"TTD present, creating DefaultSynchronizer that stops propagating after finalization");
protocolContext
.getConsensusContext(MergeContext.class)
.addNewForkchoiceMessageListener(toUse);
}

return toUse;
}

private PivotBlockSelector createPivotSelector(final ProtocolContext protocolContext) {

final PivotSelectorFromPeers pivotSelectorFromPeers = new PivotSelectorFromPeers(syncConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.StampedLock;

import org.slf4j.Logger;
Expand All @@ -36,8 +36,7 @@ public class MergePeerFilter implements MergeStateHandler, ForkchoiceMessageList

private Optional<Difficulty> powTerminalDifficulty = Optional.of(Difficulty.MAX_VALUE);
private final StampedLock powTerminalDifficultyLock = new StampedLock();
private Hash lastFinalized = Hash.ZERO;
private final AtomicLong numFinalizedSeen = new AtomicLong(0);
private final AtomicBoolean finalized = new AtomicBoolean(false);
private static final Logger LOG = LoggerFactory.getLogger(MergePeerFilter.class);

public boolean disconnectIfPoW(final StatusMessage status, final EthPeer peer) {
Expand Down Expand Up @@ -70,7 +69,7 @@ public boolean disconnectIfGossipingBlocks(final Message message, final EthPeer
}

private boolean isFinalized() {
return this.numFinalizedSeen.get() > 1;
return this.finalized.get();
}

@Override
Expand All @@ -79,10 +78,12 @@ public void onNewForkchoiceMessage(
final Optional<Hash> maybeFinalizedBlockHash,
final Hash safeBlockHash) {
if (maybeFinalizedBlockHash.isPresent()
&& !maybeFinalizedBlockHash.get().equals(this.lastFinalized)) {
this.lastFinalized = maybeFinalizedBlockHash.get();
this.numFinalizedSeen.getAndIncrement();
LOG.debug("have seen {} finalized blocks", this.numFinalizedSeen);
&& !maybeFinalizedBlockHash
.get()
.equals(
Hash.ZERO)) { // forkchoices send finalized as 0 after ttd, but before an epoch is
// finalized
this.finalized.set(true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;

import org.hyperledger.besu.consensus.merge.ForkchoiceMessageListener;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.BadBlockManager;
Expand Down Expand Up @@ -64,7 +65,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlockPropagationManager {
public class BlockPropagationManager implements ForkchoiceMessageListener {
private static final Logger LOG = LoggerFactory.getLogger(BlockPropagationManager.class);
private final SynchronizerConfiguration config;
private final ProtocolSchedule protocolSchedule;
Expand Down Expand Up @@ -582,8 +583,7 @@ private String toLogString(final Collection<NewBlockHash> newBlockHashs) {

private void reactToTTDReachedEvent(final boolean ttdReached) {
if (started.get() && ttdReached) {
LOG.info("Block propagation was running, then ttd reached, stopping");
stop();
LOG.info("Block propagation was running, then ttd reached");
} else if (!started.get()) {
start();
}
Expand All @@ -602,4 +602,14 @@ public String toString() {
+ pendingBlocksManager
+ '}';
}

@Override
public void onNewForkchoiceMessage(
final Hash headBlockHash,
final Optional<Hash> maybeFinalizedBlockHash,
final Hash safeBlockHash) {
if (maybeFinalizedBlockHash.isPresent() && !maybeFinalizedBlockHash.get().equals(Hash.ZERO)) {
stop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import static com.google.common.base.Preconditions.checkNotNull;

import org.hyperledger.besu.consensus.merge.ForkchoiceMessageListener;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
Expand Down Expand Up @@ -45,7 +47,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultSynchronizer implements Synchronizer {
public class DefaultSynchronizer implements Synchronizer, ForkchoiceMessageListener {

private static final Logger LOG = LoggerFactory.getLogger(DefaultSynchronizer.class);

Expand Down Expand Up @@ -302,4 +304,16 @@ private Void finalizeSync(final Void unused) {
running.set(false);
return null;
}

@Override
public void onNewForkchoiceMessage(
final Hash headBlockHash,
final Optional<Hash> maybeFinalizedBlockHash,
final Hash safeBlockHash) {
if (this.blockPropagationManager.isPresent()) {
this.blockPropagationManager
.get()
.onNewForkchoiceMessage(headBlockHash, maybeFinalizedBlockHash, safeBlockHash);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ConsensusContext;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.BadBlockManager;
Expand Down Expand Up @@ -61,6 +62,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

Expand All @@ -87,6 +89,7 @@ public abstract class AbstractBlockPropagationManagerTest {
SynchronizerConfiguration.builder().blockPropagationRange(-10, 30).build()));
protected SyncState syncState;
protected final MetricsSystem metricsSystem = new NoOpMetricsSystem();
private final Hash finalizedHash = Hash.fromHexStringLenient("0x1337");

protected void setup(final DataStorageFormat dataStorageFormat) {
blockchainUtil = BlockchainSetupUtil.forTesting(dataStorageFormat);
Expand Down Expand Up @@ -842,25 +845,27 @@ public void shouldThrowErrorWhenNoValidPeerAvailable() {
}

@Test
public void shouldStopWhenTTDReached() {
public void shouldStopWhenFinalized() {
blockPropagationManager.start();
syncState.setReachedTerminalDifficulty(true);
// syncState.setReachedTerminalDifficulty(true);
blockPropagationManager.onNewForkchoiceMessage(null, Optional.of(this.finalizedHash), null);
assertThat(blockPropagationManager.isRunning()).isFalse();
assertThat(ethProtocolManager.ethContext().getEthMessages().messageCodesHandled())
.doesNotContain(EthPV62.NEW_BLOCK_HASHES, EthPV62.NEW_BLOCK);
}

@Test
public void shouldRestartWhenTTDReachedReturnsFalse() {
public void shouldRestartWhenTTDReachedReturnsFalseAfterFinalizing() {
blockPropagationManager.start();
syncState.setReachedTerminalDifficulty(true);
blockPropagationManager.onNewForkchoiceMessage(null, Optional.of(this.finalizedHash), null);
assertThat(blockPropagationManager.isRunning()).isFalse();
syncState.setReachedTerminalDifficulty(false);
assertThat(blockPropagationManager.isRunning()).isTrue();
}

@Test
public void shouldNotListenToNewBlockHashesAnnouncementsWhenTTDReached() {
public void shouldNotListenToNewBlockHashesAnnouncementsWhenTTDReachedAndFinal() {
blockchainUtil.importFirstBlocks(2);
final Block nextBlock = blockchainUtil.getBlock(2);

Expand All @@ -878,7 +883,7 @@ public void shouldNotListenToNewBlockHashesAnnouncementsWhenTTDReached() {
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());

syncState.setReachedTerminalDifficulty(true);

blockPropagationManager.onNewForkchoiceMessage(null, Optional.of(this.finalizedHash), null);
// Broadcast message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, nextAnnouncement);
peer.respondWhile(responder, peer::hasOutstandingRequests);
Expand All @@ -888,7 +893,7 @@ public void shouldNotListenToNewBlockHashesAnnouncementsWhenTTDReached() {
}

@Test
public void shouldNotListenToNewBlockAnnouncementsWhenTTDReached() {
public void shouldNotListenToNewBlockAnnouncementsWhenTTDReachedAndFinal() {
blockchainUtil.importFirstBlocks(2);
final Block nextBlock = blockchainUtil.getBlock(2);

Expand All @@ -904,7 +909,7 @@ public void shouldNotListenToNewBlockAnnouncementsWhenTTDReached() {
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());

syncState.setReachedTerminalDifficulty(true);

blockPropagationManager.onNewForkchoiceMessage(null, Optional.of(this.finalizedHash), null);
// Broadcast message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, nextAnnouncement);
peer.respondWhile(responder, peer::hasOutstandingRequests);
Expand All @@ -914,13 +919,13 @@ public void shouldNotListenToNewBlockAnnouncementsWhenTTDReached() {
}

@Test
public void shouldNotListenToBlockAddedEventsWhenTTDReached() {
public void shouldNotListenToBlockAddedEventsWhenTTDReachedAndFinal() {
blockchainUtil.importFirstBlocks(2);

blockPropagationManager.start();

syncState.setReachedTerminalDifficulty(true);

blockPropagationManager.onNewForkchoiceMessage(null, Optional.of(this.finalizedHash), null);
blockchainUtil.importBlockAtIndex(2);

assertThat(blockPropagationManager.isRunning()).isFalse();
Expand Down

0 comments on commit d0283f4

Please sign in to comment.