Skip to content

Commit

Permalink
Enable and disable gossip based on optimistic sync state. (#4914)
Browse files Browse the repository at this point in the history
Moves away from the network periodically polling recentChainData to decide when to enable gossip to explicitly notifying it of sync state changes.
This allows gossip to be reduced to just blocks when optimistic sync is enabled and also allows unsubscribing from gossip when the node falls too far behind (such as when a laptop wakes from sleep).
  • Loading branch information
ajsutton authored Jan 31, 2022
1 parent b404678 commit 77521fb
Show file tree
Hide file tree
Showing 15 changed files with 230 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import tech.pegasys.teku.spec.datastructures.state.beaconstate.versions.altair.BeaconStateSchemaAltair;
import tech.pegasys.teku.spec.datastructures.type.SszPublicKey;
import tech.pegasys.teku.spec.datastructures.util.DepositGenerator;
import tech.pegasys.teku.spec.executionengine.ForkChoiceState;
import tech.pegasys.teku.spec.schemas.SchemaDefinitions;
import tech.pegasys.teku.spec.schemas.SchemaDefinitionsAltair;
import tech.pegasys.teku.spec.schemas.SchemaDefinitionsBellatrix;
Expand Down Expand Up @@ -981,6 +982,16 @@ public EnrForkId randomEnrForkId() {
return new EnrForkId(randomBytes4(), randomBytes4(), randomUInt64());
}

public ForkChoiceState randomForkChoiceState(final boolean optimisticHead) {
return new ForkChoiceState(
randomBytes32(),
randomUInt64(),
randomBytes32(),
randomBytes32(),
randomBytes32(),
optimisticHead);
}

public BeaconState randomBeaconState() {
return randomBeaconState(100, 100);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,23 +123,8 @@ public SafeFuture<?> start() {

private synchronized void startup() {
state.set(State.RUNNING);
queueGossipStart();
}

private void queueGossipStart() {
LOG.debug("Check if gossip should be started");
final UInt64 slotsBehind = recentChainData.getChainHeadSlotsBehind().orElseThrow();
if (slotsBehind.isLessThanOrEqualTo(500)) {
// Start gossip if we're "close enough" to the chain head
// Note: we don't want to be too strict here, otherwise we could end up with our sync logic
// inactive because our chain is almost caught up to the chainhead, but gossip inactive so
// that our node slowly falls behind because no gossip is propagating. However, if we're too
// aggressive, our node could be down-scored for subscribing to topics that it can't yet
// validate or propagate.
if (isCloseToInSync()) {
startGossip();
} else {
// Schedule a future check
asyncRunner.runAfterDelay(this::queueGossipStart, Duration.ofSeconds(10)).reportExceptions();
}
}

Expand All @@ -164,6 +149,29 @@ private synchronized void startGossip() {
setTopicScoringParams();
}

@Override
public void onSyncStateChanged(final boolean isInSync, final boolean isOptimistic) {
if (state.get() != State.RUNNING) {
return;
}
if (isInSync || isCloseToInSync()) {
startGossip();
} else {
if (gossipStarted.compareAndSet(true, false)) {
LOG.warn("Stopping eth2 gossip while node is syncing");
gossipForkManager.stopGossip();
}
}
gossipForkManager.onOptimisticHeadChanged(isOptimistic);
}

private boolean isCloseToInSync() {
return recentChainData
.getChainHeadSlotsBehind()
.orElse(UInt64.MAX_VALUE)
.isLessThanOrEqualTo(500);
}

private void setTopicScoringParams() {
final GossipTopicsScoringConfig topicConfig =
gossipConfigurator.configureAllTopics(getEth2Context());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public interface Eth2P2PNetwork extends P2PNetwork<Eth2Peer> {

void onEpoch(UInt64 epoch);

void onSyncStateChanged(final boolean isInSync, final boolean isOptimistic);

void subscribeToAttestationSubnetId(int subnetId);

void unsubscribeFromAttestationSubnetId(int subnetId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ public BlockGossipManager(
public void publishBlock(final SignedBeaconBlock message) {
publishMessage(message);
}

@Override
public boolean isEnabledDuringOptimisticSync() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@ public interface GossipManager {
void subscribe();

void unsubscribe();

default boolean isEnabledDuringOptimisticSync() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,20 @@ public synchronized void stopGossip() {
// Stop all active gossips
activeSubscriptions.forEach(GossipForkSubscriptions::stopGossip);
activeSubscriptions.clear();
// Ensure we will create new active subscriptions if we are started again in the same epoch
currentEpoch = Optional.empty();
}

public synchronized void onOptimisticHeadChanged(final boolean isHeadOptimistic) {
if (isHeadOptimistic) {
activeSubscriptions.forEach(GossipForkSubscriptions::stopGossipForOptimisticSync);
} else {
activeSubscriptions.forEach(
subscriptions ->
subscriptions.startGossip(
recentChainData.getGenesisData().orElseThrow().getGenesisValidatorsRoot(),
false));
}
}

public synchronized void publishAttestation(final ValidateableAttestation attestation) {
Expand Down Expand Up @@ -237,7 +251,8 @@ private boolean isActive(final GossipForkSubscriptions subscriptions) {
private void startSubscriptions(final GossipForkSubscriptions subscription) {
if (activeSubscriptions.add(subscription)) {
subscription.startGossip(
recentChainData.getGenesisData().orElseThrow().getGenesisValidatorsRoot());
recentChainData.getGenesisData().orElseThrow().getGenesisValidatorsRoot(),
recentChainData.getOptimisticHead().isPresent());
currentAttestationSubnets.forEach(subscription::subscribeToAttestationSubnetId);
currentSyncCommitteeSubnets.forEach(subscription::subscribeToSyncCommitteeSubnet);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ public interface GossipForkSubscriptions {

UInt64 getActivationEpoch();

void startGossip(Bytes32 genesisValidatorsRoot);
void startGossip(Bytes32 genesisValidatorsRoot, boolean isOptimisticHead);

void stopGossip();

void stopGossipForOptimisticSync();

void publishAttestation(ValidateableAttestation attestation);

void publishBlock(SignedBeaconBlock block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,15 @@ public UInt64 getActivationEpoch() {
}

@Override
public final void startGossip(final Bytes32 genesisValidatorsRoot) {
final ForkInfo forkInfo = new ForkInfo(fork, genesisValidatorsRoot);
addGossipManagers(forkInfo);
gossipManagers.forEach(GossipManager::subscribe);
public final void startGossip(
final Bytes32 genesisValidatorsRoot, final boolean isOptimisticHead) {
if (gossipManagers.isEmpty()) {
final ForkInfo forkInfo = new ForkInfo(fork, genesisValidatorsRoot);
addGossipManagers(forkInfo);
}
gossipManagers.stream()
.filter(manager -> manager.isEnabledDuringOptimisticSync() || !isOptimisticHead)
.forEach(GossipManager::subscribe);
}

protected void addGossipManagers(final ForkInfo forkInfo) {
Expand Down Expand Up @@ -194,6 +199,13 @@ public void stopGossip() {
gossipManagers.forEach(GossipManager::unsubscribe);
}

@Override
public void stopGossipForOptimisticSync() {
gossipManagers.stream()
.filter(manager -> !manager.isEnabledDuringOptimisticSync())
.forEach(GossipManager::unsubscribe);
}

@Override
public void publishAttestation(final ValidateableAttestation attestation) {
attestationGossipManager.onNewAttestation(attestation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public NoOpEth2P2PNetwork(final Spec spec) {
@Override
public void onEpoch(final UInt64 epoch) {}

@Override
public void onSyncStateChanged(final boolean isInSync, final boolean isOptimistic) {}

@Override
public void subscribeToAttestationSubnetId(final int subnetId) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,52 @@ public void unsubscribeFromSyncCommitteeSubnetId_shouldUpdateDiscoveryENR() {
assertThat(capturedValues.get(3)).containsExactlyInAnyOrder(1, 3);
}

@Test
void onSyncStateChanged_shouldEnableGossipWhenInSync() {
// Current slot is a long way beyond the chain head
storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(1000));

assertThat(network.start()).isCompleted();
// Won't start gossip as chain head is too old
verify(gossipForkManager, never()).configureGossipForEpoch(any());

network.onSyncStateChanged(true, false);

// Even though we're a long way behind, start gossip because we believe we're in sync
verify(gossipForkManager).configureGossipForEpoch(any());
}

@Test
void onSyncStateChanged_shouldStopGossipWhenTooFarBehindAndNotInSync() {
// Current slot is a long way beyond the chain head
storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(1000));

assertThat(network.start()).isCompleted();
network.onSyncStateChanged(true, false);
// Even though we're a long way behind, start gossip because we believe we're in sync
verify(gossipForkManager).configureGossipForEpoch(any());

network.onSyncStateChanged(false, false);
verify(gossipForkManager).stopGossip();
}

@Test
void onSyncStateChanged_shouldNotifyForkManagerOfOptimisticSyncState() {
assertThat(network.start()).isCompleted();

network.onSyncStateChanged(false, true);
verify(gossipForkManager).onOptimisticHeadChanged(true);

network.onSyncStateChanged(false, false);
verify(gossipForkManager).onOptimisticHeadChanged(false);

network.onSyncStateChanged(true, true);
verify(gossipForkManager, times(2)).onOptimisticHeadChanged(true);

network.onSyncStateChanged(true, false);
verify(gossipForkManager, times(2)).onOptimisticHeadChanged(false);
}

@SuppressWarnings("unchecked")
private ArgumentCaptor<Iterable<Integer>> subnetIdCaptor() {
return ArgumentCaptor.forClass(Iterable.class);
Expand Down
Loading

0 comments on commit 77521fb

Please sign in to comment.