Skip to content

Commit

Permalink
7311: Fix up everything broken after merge
Browse files Browse the repository at this point in the history
Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
  • Loading branch information
Matilda-Clerke committed Oct 1, 2024
1 parent 64adedc commit 2c1446e
Show file tree
Hide file tree
Showing 17 changed files with 55 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.MonitoredExecutors;
import org.hyperledger.besu.ethereum.eth.manager.peertask.DefaultPeerManager;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.manager.peertask.DefaultPeerSelector;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskRequestSender;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
Expand Down Expand Up @@ -656,11 +656,10 @@ public BesuController build() {
}

final EthContext ethContext = new EthContext(ethPeers, ethMessages, snapMessages, scheduler);
final PeerManager peerManager = new DefaultPeerManager();
ethPeers.streamAllPeers().forEach(peerManager::addPeer);
final PeerSelector peerSelector = new DefaultPeerSelector(currentProtocolSpecSupplier);
ethPeers.streamAllPeers().forEach(peerSelector::addPeer);
final PeerTaskExecutor peerTaskExecutor =
new PeerTaskExecutor(
peerManager, new PeerTaskRequestSender(), currentProtocolSpecSupplier, metricsSystem);
new PeerTaskExecutor(peerSelector, new PeerTaskRequestSender(), metricsSystem);
final boolean fullSyncDisabled = !SyncMode.isFullSync(syncConfig.getSyncMode());
final SyncState syncState = new SyncState(blockchain, ethPeers, fullSyncDisabled, checkpoint);

Expand Down Expand Up @@ -701,7 +700,7 @@ public BesuController build() {
peerValidators,
Optional.empty(),
forkIdManager,
peerManager);
peerSelector);

final PivotBlockSelector pivotBlockSelector =
createPivotSelector(
Expand Down Expand Up @@ -1036,7 +1035,7 @@ protected String getSupportedProtocol() {
* @param peerValidators the peer validators
* @param mergePeerFilter the merge peer filter
* @param forkIdManager the fork id manager
* @param peerManager the PeerManager
* @param peerSelector the PeerSelector
* @return the eth protocol manager
*/
protected EthProtocolManager createEthProtocolManager(
Expand All @@ -1051,7 +1050,7 @@ protected EthProtocolManager createEthProtocolManager(
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager,
final PeerManager peerManager) {
final PeerSelector peerSelector) {
return new EthProtocolManager(
protocolContext.getBlockchain(),
networkId,
Expand All @@ -1066,7 +1065,7 @@ protected EthProtocolManager createEthProtocolManager(
synchronizerConfiguration,
scheduler,
forkIdManager,
peerManager);
peerSelector);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
Expand Down Expand Up @@ -245,7 +245,7 @@ protected EthProtocolManager createEthProtocolManager(
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager,
final PeerManager peerManager) {
final PeerSelector peerSelector) {
return besuControllerBuilderSchedule
.get(0L)
.createEthProtocolManager(
Expand All @@ -260,7 +260,7 @@ protected EthProtocolManager createEthProtocolManager(
peerValidators,
mergePeerFilter,
forkIdManager,
peerManager);
peerSelector);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
Expand Down Expand Up @@ -101,7 +101,7 @@ protected EthProtocolManager createEthProtocolManager(
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager,
final PeerManager peerManager) {
final PeerSelector peerSelector) {

var mergeContext = protocolContext.getConsensusContext(MergeContext.class);

Expand Down Expand Up @@ -132,7 +132,7 @@ protected EthProtocolManager createEthProtocolManager(
peerValidators,
filterToUse,
forkIdManager,
peerManager);
peerSelector);

return ethProtocolManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer;
Expand Down Expand Up @@ -166,7 +166,7 @@ protected EthProtocolManager createEthProtocolManager(
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager,
final PeerManager peerManager) {
final PeerSelector peerSelector) {
return mergeBesuControllerBuilder.createEthProtocolManager(
protocolContext,
synchronizerConfiguration,
Expand All @@ -179,7 +179,7 @@ protected EthProtocolManager createEthProtocolManager(
peerValidators,
mergePeerFilter,
forkIdManager,
peerManager);
peerSelector);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.StatusMessage;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
Expand Down Expand Up @@ -70,7 +70,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {

private final Hash genesisHash;
private final ForkIdManager forkIdManager;
private final PeerManager peerManager;
private final PeerSelector peerSelector;
private final BigInteger networkId;
private final EthPeers ethPeers;
private final EthMessages ethMessages;
Expand All @@ -95,7 +95,7 @@ public EthProtocolManager(
final SynchronizerConfiguration synchronizerConfiguration,
final EthScheduler scheduler,
final ForkIdManager forkIdManager,
final PeerManager peerManager) {
final PeerSelector peerSelector) {
this.networkId = networkId;
this.peerValidators = peerValidators;
this.scheduler = scheduler;
Expand All @@ -105,7 +105,7 @@ public EthProtocolManager(
this.genesisHash = blockchain.getBlockHashByNumber(0L).orElse(Hash.ZERO);

this.forkIdManager = forkIdManager;
this.peerManager = peerManager;
this.peerSelector = peerSelector;

this.ethPeers = ethPeers;
this.ethMessages = ethMessages;
Expand Down Expand Up @@ -145,7 +145,7 @@ public EthProtocolManager(
final Optional<MergePeerFilter> mergePeerFilter,
final SynchronizerConfiguration synchronizerConfiguration,
final EthScheduler scheduler,
final PeerManager peerManager) {
final PeerSelector peerSelector) {
this(
blockchain,
networkId,
Expand All @@ -164,7 +164,7 @@ public EthProtocolManager(
Collections.emptyList(),
Collections.emptyList(),
ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled()),
peerManager);
peerSelector);
}

public EthContext ethContext() {
Expand Down Expand Up @@ -343,7 +343,7 @@ public void processMessage(final Capability cap, final Message message) {
public void handleNewConnection(final PeerConnection connection) {
ethPeers.registerNewConnection(connection, peerValidators);
final EthPeer peer = ethPeers.peer(connection);
peerManager.addPeer(peer);
peerSelector.addPeer(peer);
final Capability cap = connection.capability(getSupportedProtocol());
final ForkId latestForkId =
cap.getVersion() >= 64 ? forkIdManager.getForkIdForChainHead() : null;
Expand Down Expand Up @@ -375,7 +375,7 @@ public void handleDisconnect(
final DisconnectReason reason,
final boolean initiatedByPeer) {
final boolean wasActiveConnection = ethPeers.registerDisconnect(connection);
peerManager.removePeer(connection.getPeer());
peerSelector.removePeer(connection.getPeer());
LOG.atDebug()
.setMessage("Disconnect - active Connection? {} - {} - {} - {} {} - {} peers left")
.addArgument(wasActiveConnection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskBehavior;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskRetryBehavior;
import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage;
import org.hyperledger.besu.ethereum.mainnet.BodyValidator;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -52,8 +53,8 @@ public GetReceiptsFromPeerTask(
}

@Override
public String getSubProtocol() {
return EthProtocol.NAME;
public SubProtocol getSubProtocol() {
return EthProtocol.get();
}

@Override
Expand Down Expand Up @@ -101,7 +102,8 @@ public Map<BlockHeader, List<TransactionReceipt>> parseResponse(final MessageDat
}

@Override
public Collection<PeerTaskBehavior> getPeerTaskBehaviors() {
return List.of(PeerTaskBehavior.RETRY_WITH_OTHER_PEERS, PeerTaskBehavior.RETRY_WITH_SAME_PEER);
public Collection<PeerTaskRetryBehavior> getPeerTaskBehaviors() {
return List.of(
PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS, PeerTaskRetryBehavior.RETRY_WITH_SAME_PEER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ private CompletableFuture<Optional<BlockWithReceipts>> downloadReceipts(
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> executorResult =
peerTaskExecutor.execute(task);

if (executorResult.getResponseCode() == PeerTaskExecutorResponseCode.SUCCESS) {
if (executorResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS) {
List<TransactionReceipt> transactionReceipts =
executorResult
.getResult()
.result()
.map((map) -> map.get(block.getHeader()))
.orElseThrow(
() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ public CompletableFuture<List<BlockWithReceipts>> apply(final List<Block> blocks
new GetReceiptsFromPeerTask(headers, new BodyValidator());
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> getReceiptsResult =
peerTaskExecutor.execute(getReceiptsFromPeerTask);
if (getReceiptsResult.getResponseCode() == PeerTaskExecutorResponseCode.SUCCESS
&& getReceiptsResult.getResult().isPresent()) {
if (getReceiptsResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS
&& getReceiptsResult.result().isPresent()) {
Map<BlockHeader, List<TransactionReceipt>> receiptsResult =
getReceiptsResult.getResult().get();
getReceiptsResult.result().get();
receiptsResult
.keySet()
.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.EthProtocolVersion;
import org.hyperledger.besu.ethereum.eth.manager.MockPeerConnection.PeerSendHandler;
import org.hyperledger.besu.ethereum.eth.manager.peertask.DefaultPeerManager;
import org.hyperledger.besu.ethereum.eth.manager.peertask.DefaultPeerSelector;
import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.BlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
Expand Down Expand Up @@ -1245,7 +1245,7 @@ private EthProtocolManager createEthManager(
syncConfig,
mock(EthScheduler.class),
mock(ForkIdManager.class),
new DefaultPeerManager())) {
new DefaultPeerSelector(() -> null))) {

return ethManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.hyperledger.besu.ethereum.core.ProtocolScheduleFixture;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.peertask.DefaultPeerManager;
import org.hyperledger.besu.ethereum.eth.manager.peertask.DefaultPeerSelector;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker;
Expand Down Expand Up @@ -119,7 +119,7 @@ public static EthProtocolManager create(
mock(SynchronizerConfiguration.class),
ethScheduler,
new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false),
new DefaultPeerManager());
new DefaultPeerSelector(() -> null));
}

public static EthProtocolManager create(
Expand Down Expand Up @@ -171,7 +171,7 @@ public static EthProtocolManager create(
mock(SynchronizerConfiguration.class),
ethScheduler,
forkIdManager,
new DefaultPeerManager());
new DefaultPeerSelector(() -> null));
}

public static EthProtocolManager create(final Blockchain blockchain) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class GetReceiptsFromPeerTaskTest {
@Test
public void testGetSubProtocol() {
GetReceiptsFromPeerTask task = new GetReceiptsFromPeerTask(Collections.emptyList(), null);
Assertions.assertEquals(EthProtocol.NAME, task.getSubProtocol());
Assertions.assertEquals(EthProtocol.get(), task.getSubProtocol());
}

@Test
Expand Down Expand Up @@ -83,7 +83,7 @@ public void testParseResponseWithNullResponseMessage() {
}

@Test
public void testParseResponseForInvalidResponse() throws InvalidPeerTaskResponseException {
public void testParseResponseForInvalidResponse() {
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(
List.of(mockBlockHeader(1), mockBlockHeader(2), mockBlockHeader(3)), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> proce
bh, otherBlockchain.getTxReceipts(bh.getHash()).get()));

return new PeerTaskExecutorResult<>(
getReceiptsFromPeerTaskResult, PeerTaskExecutorResponseCode.SUCCESS);
Optional.of(getReceiptsFromPeerTaskResult), PeerTaskExecutorResponseCode.SUCCESS);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

Expand Down Expand Up @@ -121,7 +122,8 @@ public void shouldDownloadReceiptsForBlocksUsingPeerTaskSystem()
blocks.forEach(
(b) -> receiptsMap.put(b.getHeader(), List.of(Mockito.mock(TransactionReceipt.class))));
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> peerTaskResult =
new PeerTaskExecutorResult<>(receiptsMap, PeerTaskExecutorResponseCode.SUCCESS);
new PeerTaskExecutorResult<>(
Optional.of(receiptsMap), PeerTaskExecutorResponseCode.SUCCESS);
Mockito.when(peerTaskExecutor.execute(Mockito.any(GetReceiptsFromPeerTask.class)))
.thenReturn(peerTaskResult);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ private FastSyncActions createFastSyncActions(
protocolSchedule,
protocolContext,
ethContext,
new PeerTaskExecutor(null, null, null, new NoOpMetricsSystem()),
new PeerTaskExecutor(null, null, new NoOpMetricsSystem()),
new SyncState(blockchain, ethContext.getEthPeers(), true, Optional.empty()),
pivotBlockSelector,
new NoOpMetricsSystem());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private ChainDownloader downloader(
protocolSchedule,
protocolContext,
ethContext,
new PeerTaskExecutor(null, null, null, new NoOpMetricsSystem()),
new PeerTaskExecutor(null, null, new NoOpMetricsSystem()),
syncState,
new NoOpMetricsSystem(),
new FastSyncState(otherBlockchain.getBlockHeader(pivotBlockNumber).get()),
Expand Down
Loading

0 comments on commit 2c1446e

Please sign in to comment.