diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index a26bcb9811a..6a4cc203a3b 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -55,6 +55,8 @@ import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter; import org.hyperledger.besu.ethereum.eth.manager.MonitoredExecutors; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskRequestSender; import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager; import org.hyperledger.besu.ethereum.eth.peervalidation.CheckpointBlocksPeerValidator; import org.hyperledger.besu.ethereum.eth.peervalidation.ClassicForkPeerValidator; @@ -653,6 +655,8 @@ public BesuController build() { } final EthContext ethContext = new EthContext(ethPeers, ethMessages, snapMessages, scheduler); + final PeerTaskExecutor peerTaskExecutor = + new PeerTaskExecutor(ethPeers, new PeerTaskRequestSender(), metricsSystem); final boolean fullSyncDisabled = !SyncMode.isFullSync(syncConfig.getSyncMode()); final SyncState syncState = new SyncState(blockchain, ethPeers, fullSyncDisabled, checkpoint); @@ -704,6 +708,7 @@ public BesuController build() { worldStateStorageCoordinator, protocolContext, ethContext, + peerTaskExecutor, syncState, ethProtocolManager, pivotBlockSelector); @@ -830,6 +835,7 @@ private TrieLogPruner createTrieLogPruner( * @param worldStateStorageCoordinator the world state storage * @param protocolContext the protocol context * @param ethContext the eth context + * @param peerTaskExecutor the PeerTaskExecutor * @param syncState the sync state * @param ethProtocolManager the eth protocol manager * @param pivotBlockSelector the pivot block selector @@ -840,6 +846,7 @@ protected DefaultSynchronizer createSynchronizer( final WorldStateStorageCoordinator worldStateStorageCoordinator, final ProtocolContext protocolContext, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final SyncState syncState, final EthProtocolManager ethProtocolManager, final PivotBlockSelector pivotBlockSelector) { @@ -851,6 +858,7 @@ protected DefaultSynchronizer createSynchronizer( worldStateStorageCoordinator, ethProtocolManager.getBlockBroadcaster(), ethContext, + peerTaskExecutor, syncState, dataDirectory, storageProvider, diff --git a/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java index ee2611d9c87..703592f90a9 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java @@ -40,6 +40,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; @@ -225,6 +226,7 @@ protected DefaultSynchronizer createSynchronizer( final WorldStateStorageCoordinator worldStateStorageCoordinator, final ProtocolContext protocolContext, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final SyncState syncState, final EthProtocolManager ethProtocolManager, final PivotBlockSelector pivotBlockSelector) { @@ -235,6 +237,7 @@ protected DefaultSynchronizer createSynchronizer( worldStateStorageCoordinator, protocolContext, ethContext, + peerTaskExecutor, syncState, ethProtocolManager, pivotBlockSelector); diff --git a/besu/src/test/java/org/hyperledger/besu/RunnerTest.java b/besu/src/test/java/org/hyperledger/besu/RunnerTest.java index cbbf9804084..6324fabb743 100644 --- a/besu/src/test/java/org/hyperledger/besu/RunnerTest.java +++ b/besu/src/test/java/org/hyperledger/besu/RunnerTest.java @@ -153,7 +153,15 @@ public void fullSyncFromGenesis() throws Exception { // set merge flag to false, otherwise this test can fail if a merge test runs first MergeConfiguration.setMergeEnabled(false); - syncFromGenesis(SyncMode.FULL, getFastSyncGenesis()); + syncFromGenesis(SyncMode.FULL, getFastSyncGenesis(), false); + } + + @Test + public void fullSyncFromGenesisUsingPeerTaskSystem() throws Exception { + // set merge flag to false, otherwise this test can fail if a merge test runs first + MergeConfiguration.setMergeEnabled(false); + + syncFromGenesis(SyncMode.FULL, getFastSyncGenesis(), true); } @Test @@ -161,10 +169,21 @@ public void fastSyncFromGenesis() throws Exception { // set merge flag to false, otherwise this test can fail if a merge test runs first MergeConfiguration.setMergeEnabled(false); - syncFromGenesis(SyncMode.FAST, getFastSyncGenesis()); + syncFromGenesis(SyncMode.FAST, getFastSyncGenesis(), false); + } + + @Test + public void fastSyncFromGenesisUsingPeerTaskSystem() throws Exception { + // set merge flag to false, otherwise this test can fail if a merge test runs first + MergeConfiguration.setMergeEnabled(false); + + syncFromGenesis(SyncMode.FAST, getFastSyncGenesis(), true); } - private void syncFromGenesis(final SyncMode mode, final GenesisConfigFile genesisConfig) + private void syncFromGenesis( + final SyncMode mode, + final GenesisConfigFile genesisConfig, + final boolean isPeerTaskSystemEnabled) throws Exception { final Path dataDirAhead = Files.createTempDirectory(temp, "db-ahead"); final Path dbAhead = dataDirAhead.resolve("database"); @@ -172,7 +191,10 @@ private void syncFromGenesis(final SyncMode mode, final GenesisConfigFile genesi final NodeKey aheadDbNodeKey = NodeKeyUtils.createFrom(KeyPairUtil.loadKeyPair(dataDirAhead)); final NodeKey behindDbNodeKey = NodeKeyUtils.generate(); final SynchronizerConfiguration syncConfigAhead = - SynchronizerConfiguration.builder().syncMode(SyncMode.FULL).build(); + SynchronizerConfiguration.builder() + .syncMode(SyncMode.FULL) + .isPeerTaskSystemEnabled(isPeerTaskSystemEnabled) + .build(); final ObservableMetricsSystem noOpMetricsSystem = new NoOpMetricsSystem(); final var miningParameters = MiningParameters.newDefault(); final var dataStorageConfiguration = DataStorageConfiguration.DEFAULT_FOREST_CONFIG; diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/execution/JsonRpcExecutor.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/execution/JsonRpcExecutor.java index 6fedcc70ca4..b8786eb7a5d 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/execution/JsonRpcExecutor.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/execution/JsonRpcExecutor.java @@ -104,9 +104,9 @@ public JsonRpcResponse execute( private Optional validateMethodAvailability(final JsonRpcRequest request) { final String name = request.getMethod(); - if (LOG.isDebugEnabled()) { + if (LOG.isTraceEnabled()) { final JsonArray params = JsonObject.mapFrom(request).getJsonArray("params"); - LOG.debug("JSON-RPC request -> {} {}", name, params); + LOG.trace("JSON-RPC request -> {} {}", name, params); } final JsonRpcMethod method = rpcMethods.get(name); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java index 58318f9611e..30cd03c15c3 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java @@ -337,7 +337,6 @@ public void processMessage(final Capability cap, final Message message) { public void handleNewConnection(final PeerConnection connection) { ethPeers.registerNewConnection(connection, peerValidators); final EthPeer peer = ethPeers.peer(connection); - final Capability cap = connection.capability(getSupportedProtocol()); final ForkId latestForkId = cap.getVersion() >= 64 ? forkIdManager.getForkIdForChainHead() : null; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java index 1e2f3eb6abb..8c90993c689 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java @@ -145,7 +145,7 @@ public void executeServiceTask(final Runnable command) { servicesExecutor.execute(command); } - public CompletableFuture scheduleServiceTask(final Runnable task) { + public CompletableFuture scheduleServiceTask(final Runnable task) { return CompletableFuture.runAsync(task, servicesExecutor); } @@ -156,6 +156,19 @@ public CompletableFuture scheduleServiceTask(final EthTask task) { return serviceFuture; } + public CompletableFuture scheduleServiceTask(final Supplier> future) { + final CompletableFuture promise = new CompletableFuture<>(); + final Future workerFuture = servicesExecutor.submit(() -> propagateResult(future, promise)); + // If returned promise is cancelled, cancel the worker future + promise.whenComplete( + (r, t) -> { + if (t instanceof CancellationException) { + workerFuture.cancel(false); + } + }); + return promise; + } + public CompletableFuture startPipeline(final Pipeline pipeline) { final CompletableFuture pipelineFuture = pipeline.start(servicesExecutor); pendingFutures.add(pipelineFuture); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java index 1243846ac3d..fed671d38d2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java @@ -41,13 +41,13 @@ public interface PeerTask { MessageData getRequestMessage(); /** - * Parses the MessageData response from the EthPeer + * Parses and processes the MessageData response from the EthPeer * * @param messageData the response MessageData to be parsed * @return a T built from the response MessageData * @throws InvalidPeerTaskResponseException if the response messageData is invalid */ - T parseResponse(MessageData messageData) throws InvalidPeerTaskResponseException; + T processResponse(MessageData messageData) throws InvalidPeerTaskResponseException; /** * Gets the number of times this task may be attempted against other peers diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java index 984cedccecb..a2ae0455263 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java @@ -133,7 +133,7 @@ public PeerTaskExecutorResult executeAgainstPeer( MessageData responseMessageData = requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer); - result = peerTask.parseResponse(responseMessageData); + result = peerTask.processResponse(responseMessageData); } finally { inflightRequestCountForThisTaskClass.decrementAndGet(); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTask.java new file mode 100644 index 00000000000..7d4b5d585e5 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTask.java @@ -0,0 +1,135 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask.task; + +import static java.util.Collections.emptyList; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.TransactionReceipt; +import org.hyperledger.besu.ethereum.eth.EthProtocol; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask; +import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage; +import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage; +import org.hyperledger.besu.ethereum.mainnet.BodyValidation; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +public class GetReceiptsFromPeerTask + implements PeerTask>> { + + private final Collection blockHeaders; + private final ProtocolSchedule protocolSchedule; + private final Map> receiptsByBlockHeader = new HashMap<>(); + private final Map> headersByReceiptsRoot = new HashMap<>(); + private final long requiredBlockchainHeight; + + public GetReceiptsFromPeerTask( + final Collection blockHeaders, final ProtocolSchedule protocolSchedule) { + this.blockHeaders = new ArrayList<>(blockHeaders); + this.protocolSchedule = protocolSchedule; + + // pre-fill any headers with an empty receipts root into the result map + this.blockHeaders.stream() + .filter(header -> header.getReceiptsRoot().equals(Hash.EMPTY_TRIE_HASH)) + .forEach(header -> receiptsByBlockHeader.put(header, emptyList())); + this.blockHeaders.removeAll(receiptsByBlockHeader.keySet()); + + // group headers by their receipts root hash to reduce total number of receipts hashes requested + // for + this.blockHeaders.forEach( + header -> + headersByReceiptsRoot + .computeIfAbsent(header.getReceiptsRoot(), key -> new ArrayList<>()) + .add(header)); + + // calculate the minimum required blockchain height a peer will need to be able to fulfil this + // request + requiredBlockchainHeight = + this.blockHeaders.stream() + .mapToLong(BlockHeader::getNumber) + .max() + .orElse(BlockHeader.GENESIS_BLOCK_NUMBER); + } + + @Override + public SubProtocol getSubProtocol() { + return EthProtocol.get(); + } + + @Override + public MessageData getRequestMessage() { + // Since we have to match up the data by receipt root, we only need to request receipts + // for one of the headers with each unique receipt root. + final List blockHashes = + headersByReceiptsRoot.values().stream() + .map(headers -> headers.getFirst().getHash()) + .toList(); + return GetReceiptsMessage.create(blockHashes); + } + + @Override + public Map> processResponse(final MessageData messageData) + throws InvalidPeerTaskResponseException { + if (messageData == null) { + throw new InvalidPeerTaskResponseException(); + } + final ReceiptsMessage receiptsMessage = ReceiptsMessage.readFrom(messageData); + final List> receiptsByBlock = receiptsMessage.receipts(); + // take a copy of the pre-filled receiptsByBlockHeader, to ensure idempotency of subsequent + // calls to processResponse + final Map> receiptsByHeader = + new HashMap<>(receiptsByBlockHeader); + if (!blockHeaders.isEmpty()) { + if (receiptsByBlock.isEmpty() || receiptsByBlock.size() > blockHeaders.size()) { + throw new InvalidPeerTaskResponseException(); + } + + for (final List receiptsInBlock : receiptsByBlock) { + final List blockHeaders = + headersByReceiptsRoot.get(BodyValidation.receiptsRoot(receiptsInBlock)); + if (blockHeaders == null) { + // Contains receipts that we didn't request, so mustn't be the response we're looking for. + throw new InvalidPeerTaskResponseException(); + } + blockHeaders.forEach(header -> receiptsByHeader.put(header, receiptsInBlock)); + } + } + return receiptsByHeader; + } + + @Override + public Predicate getPeerRequirementFilter() { + return (ethPeer) -> + ethPeer.getProtocolName().equals(getSubProtocol().getName()) + && (protocolSchedule.anyMatch((ps) -> ps.spec().isPoS()) + || ethPeer.chainState().getEstimatedHeight() >= requiredBlockchainHeight); + } + + @Override + public boolean isSuccess(final Map> result) { + return !result.isEmpty(); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index 23844372999..66684ab7873 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -22,6 +22,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.Synchronizer; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.checkpointsync.CheckpointDownloaderFactory; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloader; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; @@ -82,6 +83,7 @@ public DefaultSynchronizer( final WorldStateStorageCoordinator worldStateStorageCoordinator, final BlockBroadcaster blockBroadcaster, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final SyncState syncState, final Path dataDirectory, final StorageProvider storageProvider, @@ -147,6 +149,7 @@ public DefaultSynchronizer( protocolContext, metricsSystem, ethContext, + peerTaskExecutor, worldStateStorageCoordinator, syncState, clock, @@ -163,6 +166,7 @@ public DefaultSynchronizer( protocolContext, metricsSystem, ethContext, + peerTaskExecutor, worldStateStorageCoordinator, syncState, clock, @@ -179,6 +183,7 @@ public DefaultSynchronizer( protocolContext, metricsSystem, ethContext, + peerTaskExecutor, worldStateStorageCoordinator, syncState, clock, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloadBlockStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloadBlockStep.java index b4bdf585410..72f1fae764f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloadBlockStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloadBlockStep.java @@ -16,17 +16,23 @@ import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockWithReceipts; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetReceiptsFromPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; import org.hyperledger.besu.ethereum.eth.manager.task.GetBlockFromPeerTask; -import org.hyperledger.besu.ethereum.eth.manager.task.GetReceiptsFromPeerTask; +import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -34,17 +40,23 @@ public class CheckpointDownloadBlockStep { private final ProtocolSchedule protocolSchedule; private final EthContext ethContext; + private final PeerTaskExecutor peerTaskExecutor; private final Checkpoint checkpoint; + private final SynchronizerConfiguration synchronizerConfiguration; private final MetricsSystem metricsSystem; public CheckpointDownloadBlockStep( final ProtocolSchedule protocolSchedule, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final Checkpoint checkpoint, + final SynchronizerConfiguration synchronizerConfiguration, final MetricsSystem metricsSystem) { this.protocolSchedule = protocolSchedule; this.ethContext = ethContext; + this.peerTaskExecutor = peerTaskExecutor; this.checkpoint = checkpoint; + this.synchronizerConfiguration = synchronizerConfiguration; this.metricsSystem = metricsSystem; } @@ -65,17 +77,52 @@ public CompletableFuture> downloadBlock(final Hash h private CompletableFuture> downloadReceipts( final PeerTaskResult peerTaskResult) { final Block block = peerTaskResult.getResult(); - final GetReceiptsFromPeerTask getReceiptsFromPeerTask = - GetReceiptsFromPeerTask.forHeaders(ethContext, List.of(block.getHeader()), metricsSystem); - return getReceiptsFromPeerTask - .run() - .thenCompose( - receiptTaskResult -> { - final Optional> transactionReceipts = - Optional.ofNullable(receiptTaskResult.getResult().get(block.getHeader())); - return CompletableFuture.completedFuture( - transactionReceipts.map(receipts -> new BlockWithReceipts(block, receipts))); - }) - .exceptionally(throwable -> Optional.empty()); + if (synchronizerConfiguration.isPeerTaskSystemEnabled()) { + return ethContext + .getScheduler() + .scheduleServiceTask( + () -> { + GetReceiptsFromPeerTask task = + new GetReceiptsFromPeerTask(List.of(block.getHeader()), protocolSchedule); + PeerTaskExecutorResult>> executorResult = + peerTaskExecutor.execute(task); + + if (executorResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS) { + List transactionReceipts = + executorResult + .result() + .map((map) -> map.get(block.getHeader())) + .orElseThrow( + () -> + new IllegalStateException( + "PeerTask response code was success, but empty")); + if (block.getBody().getTransactions().size() != transactionReceipts.size()) { + throw new IllegalStateException( + "PeerTask response code was success, but incorrect number of receipts returned"); + } + BlockWithReceipts blockWithReceipts = + new BlockWithReceipts(block, transactionReceipts); + return CompletableFuture.completedFuture(Optional.of(blockWithReceipts)); + } else { + return CompletableFuture.completedFuture(Optional.empty()); + } + }); + + } else { + final org.hyperledger.besu.ethereum.eth.manager.task.GetReceiptsFromPeerTask + getReceiptsFromPeerTask = + org.hyperledger.besu.ethereum.eth.manager.task.GetReceiptsFromPeerTask.forHeaders( + ethContext, List.of(block.getHeader()), metricsSystem); + return getReceiptsFromPeerTask + .run() + .thenCompose( + receiptTaskResult -> { + final Optional> transactionReceipts = + Optional.ofNullable(receiptTaskResult.getResult().get(block.getHeader())); + return CompletableFuture.completedFuture( + transactionReceipts.map(receipts -> new BlockWithReceipts(block, receipts))); + }) + .exceptionally(throwable -> Optional.empty()); + } } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java index 03df47e4407..30134d9f6c5 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java @@ -17,6 +17,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -61,6 +62,7 @@ public static Optional> createCheckpointDownloader( final ProtocolContext protocolContext, final MetricsSystem metricsSystem, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final WorldStateStorageCoordinator worldStateStorageCoordinator, final SyncState syncState, final Clock clock, @@ -110,6 +112,7 @@ public static Optional> createCheckpointDownloader( protocolSchedule, protocolContext, ethContext, + peerTaskExecutor, syncState, pivotBlockSelector, metricsSystem); @@ -127,6 +130,7 @@ public static Optional> createCheckpointDownloader( protocolSchedule, protocolContext, ethContext, + peerTaskExecutor, syncState, pivotBlockSelector, metricsSystem); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncActions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncActions.java index 5096b74e24f..61b997e6c53 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncActions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncActions.java @@ -16,6 +16,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -34,6 +35,7 @@ public CheckpointSyncActions( final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final SyncState syncState, final PivotBlockSelector pivotBlockSelector, final MetricsSystem metricsSystem) { @@ -43,6 +45,7 @@ public CheckpointSyncActions( protocolSchedule, protocolContext, ethContext, + peerTaskExecutor, syncState, pivotBlockSelector, metricsSystem); @@ -57,6 +60,7 @@ public ChainDownloader createChainDownloader( protocolSchedule, protocolContext, ethContext, + peerTaskExecutor, syncState, metricsSystem, currentState, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncChainDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncChainDownloader.java index 5450b9e5a49..2590e4736ae 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncChainDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncChainDownloader.java @@ -16,6 +16,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.PipelineChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -36,6 +37,7 @@ public static ChainDownloader create( final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final SyncState syncState, final MetricsSystem metricsSystem, final FastSyncState fastSyncState, @@ -55,7 +57,13 @@ public static ChainDownloader create( syncState, syncTargetManager, new CheckpointSyncDownloadPipelineFactory( - config, protocolSchedule, protocolContext, ethContext, fastSyncState, metricsSystem), + config, + protocolSchedule, + protocolContext, + ethContext, + peerTaskExecutor, + fastSyncState, + metricsSystem), ethContext.getScheduler(), metricsSystem, syncDurationMetrics); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncDownloadPipelineFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncDownloadPipelineFactory.java index 45f3f243d8c..0be10869861 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncDownloadPipelineFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncDownloadPipelineFactory.java @@ -19,6 +19,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloadPipelineFactory; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; @@ -40,9 +41,17 @@ public CheckpointSyncDownloadPipelineFactory( final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final FastSyncState fastSyncState, final MetricsSystem metricsSystem) { - super(syncConfig, protocolSchedule, protocolContext, ethContext, fastSyncState, metricsSystem); + super( + syncConfig, + protocolSchedule, + protocolContext, + ethContext, + peerTaskExecutor, + fastSyncState, + metricsSystem); } @Override @@ -76,7 +85,8 @@ protected Pipeline createDownloadCheckPointPipeline( checkPointSource, checkpoint, protocolContext.getBlockchain()); final CheckpointDownloadBlockStep checkPointDownloadBlockStep = - new CheckpointDownloadBlockStep(protocolSchedule, ethContext, checkpoint, metricsSystem); + new CheckpointDownloadBlockStep( + protocolSchedule, ethContext, peerTaskExecutor, checkpoint, syncConfig, metricsSystem); return PipelineBuilder.createPipelineFrom( "fetchCheckpoints", diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java index cd57de371dd..876e96d1072 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java @@ -22,10 +22,16 @@ import org.hyperledger.besu.ethereum.core.BlockWithReceipts; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetReceiptsFromPeerTask; +import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.tasks.GetReceiptsForHeadersTask; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.plugin.services.MetricsSystem; -import org.hyperledger.besu.util.FutureUtils; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -33,24 +39,69 @@ public class DownloadReceiptsStep implements Function, CompletableFuture>> { + + private final ProtocolSchedule protocolSchedule; private final EthContext ethContext; + private final PeerTaskExecutor peerTaskExecutor; + private final SynchronizerConfiguration synchronizerConfiguration; private final MetricsSystem metricsSystem; - public DownloadReceiptsStep(final EthContext ethContext, final MetricsSystem metricsSystem) { + public DownloadReceiptsStep( + final ProtocolSchedule protocolSchedule, + final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, + final SynchronizerConfiguration synchronizerConfiguration, + final MetricsSystem metricsSystem) { + this.protocolSchedule = protocolSchedule; this.ethContext = ethContext; + this.peerTaskExecutor = peerTaskExecutor; + this.synchronizerConfiguration = synchronizerConfiguration; this.metricsSystem = metricsSystem; } @Override public CompletableFuture> apply(final List blocks) { final List headers = blocks.stream().map(Block::getHeader).collect(toList()); - final CompletableFuture>> getReceipts = - GetReceiptsForHeadersTask.forHeaders(ethContext, headers, metricsSystem).run(); - final CompletableFuture> combineWithBlocks = - getReceipts.thenApply( - receiptsByHeader -> combineBlocksAndReceipts(blocks, receiptsByHeader)); - FutureUtils.propagateCancellation(combineWithBlocks, getReceipts); - return combineWithBlocks; + if (synchronizerConfiguration.isPeerTaskSystemEnabled()) { + return ethContext + .getScheduler() + .scheduleServiceTask(() -> getReceiptsWithPeerTaskSystem(headers)) + .thenApply((receipts) -> combineBlocksAndReceipts(blocks, receipts)); + + } else { + return GetReceiptsForHeadersTask.forHeaders(ethContext, headers, metricsSystem) + .run() + .thenApply((receipts) -> combineBlocksAndReceipts(blocks, receipts)); + } + } + + private CompletableFuture>> + getReceiptsWithPeerTaskSystem(final List headers) { + Map> getReceipts = new HashMap<>(); + do { + GetReceiptsFromPeerTask task = new GetReceiptsFromPeerTask(headers, protocolSchedule); + PeerTaskExecutorResult>> getReceiptsResult = + peerTaskExecutor.execute(task); + if (getReceiptsResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS + && getReceiptsResult.result().isPresent()) { + Map> taskResult = getReceiptsResult.result().get(); + taskResult + .keySet() + .forEach( + (blockHeader) -> + getReceipts.merge( + blockHeader, + taskResult.get(blockHeader), + (initialReceipts, newReceipts) -> { + throw new IllegalStateException( + "Unexpectedly got receipts for block header already populated!"); + })); + // remove all the headers we found receipts for + headers.removeAll(getReceipts.keySet()); + } + // repeat until all headers have receipts + } while (!headers.isEmpty()); + return CompletableFuture.completedFuture(getReceipts); } private List combineBlocksAndReceipts( @@ -60,8 +111,17 @@ private List combineBlocksAndReceipts( block -> { final List receipts = receiptsByHeader.getOrDefault(block.getHeader(), emptyList()); + if (block.getBody().getTransactions().size() != receipts.size()) { + throw new IllegalStateException( + "PeerTask response code was success, but incorrect number of receipts returned. Header hash: " + + block.getHeader().getHash() + + ", Transactions: " + + block.getBody().getTransactions().size() + + ", receipts: " + + receipts.size()); + } return new BlockWithReceipts(block, receipts); }) - .collect(toList()); + .toList(); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java index 7f6bbae3f31..58a64bd562a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java @@ -19,6 +19,7 @@ import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; @@ -48,6 +49,7 @@ public class FastSyncActions { protected final ProtocolSchedule protocolSchedule; protected final ProtocolContext protocolContext; protected final EthContext ethContext; + protected final PeerTaskExecutor peerTaskExecutor; protected final SyncState syncState; protected final PivotBlockSelector pivotBlockSelector; protected final MetricsSystem metricsSystem; @@ -60,6 +62,7 @@ public FastSyncActions( final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final SyncState syncState, final PivotBlockSelector pivotBlockSelector, final MetricsSystem metricsSystem) { @@ -68,6 +71,7 @@ public FastSyncActions( this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; + this.peerTaskExecutor = peerTaskExecutor; this.syncState = syncState; this.pivotBlockSelector = pivotBlockSelector; this.metricsSystem = metricsSystem; @@ -164,6 +168,7 @@ public ChainDownloader createChainDownloader( protocolSchedule, protocolContext, ethContext, + peerTaskExecutor, syncState, metricsSystem, currentState, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java index c36ff7cb482..1bf55a3811a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java @@ -16,6 +16,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.PipelineChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -35,6 +36,7 @@ public static ChainDownloader create( final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final SyncState syncState, final MetricsSystem metricsSystem, final FastSyncState fastSyncState, @@ -53,7 +55,13 @@ public static ChainDownloader create( syncState, syncTargetManager, new FastSyncDownloadPipelineFactory( - config, protocolSchedule, protocolContext, ethContext, fastSyncState, metricsSystem), + config, + protocolSchedule, + protocolContext, + ethContext, + peerTaskExecutor, + fastSyncState, + metricsSystem), ethContext.getScheduler(), metricsSystem, syncDurationMetrics); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java index 87032b76e57..ac562608873 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java @@ -26,6 +26,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.DownloadBodiesStep; import org.hyperledger.besu.ethereum.eth.sync.DownloadHeadersStep; import org.hyperledger.besu.ethereum.eth.sync.DownloadPipelineFactory; @@ -58,6 +59,7 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory protected final ProtocolSchedule protocolSchedule; protected final ProtocolContext protocolContext; protected final EthContext ethContext; + protected final PeerTaskExecutor peerTaskExecutor; protected final FastSyncState fastSyncState; protected final MetricsSystem metricsSystem; protected final FastSyncValidationPolicy attachedValidationPolicy; @@ -69,12 +71,14 @@ public FastSyncDownloadPipelineFactory( final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final FastSyncState fastSyncState, final MetricsSystem metricsSystem) { this.syncConfig = syncConfig; this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; + this.peerTaskExecutor = peerTaskExecutor; this.fastSyncState = fastSyncState; this.metricsSystem = metricsSystem; final LabelledMetric fastSyncValidationCounter = @@ -145,7 +149,8 @@ public Pipeline createDownloadPipelineForSyncTarget(final SyncT final DownloadBodiesStep downloadBodiesStep = new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem); final DownloadReceiptsStep downloadReceiptsStep = - new DownloadReceiptsStep(ethContext, metricsSystem); + new DownloadReceiptsStep( + protocolSchedule, ethContext, peerTaskExecutor, syncConfig, metricsSystem); final ImportBlocksStep importBlockStep = new ImportBlocksStep( protocolSchedule, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java index 8b71a57885d..1d775cc80fd 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java @@ -17,6 +17,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -59,6 +60,7 @@ public static Optional> create( final ProtocolContext protocolContext, final MetricsSystem metricsSystem, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final WorldStateStorageCoordinator worldStateStorageCoordinator, final SyncState syncState, final Clock clock, @@ -126,6 +128,7 @@ public static Optional> create( protocolSchedule, protocolContext, ethContext, + peerTaskExecutor, syncState, pivotBlockSelector, metricsSystem), diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java index 5de8ceb9843..6c5ce0b04e9 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java @@ -17,6 +17,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -57,6 +58,7 @@ public static Optional> createSnapDownloader( final ProtocolContext protocolContext, final MetricsSystem metricsSystem, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final WorldStateStorageCoordinator worldStateStorageCoordinator, final SyncState syncState, final Clock clock, @@ -121,6 +123,7 @@ public static Optional> createSnapDownloader( protocolSchedule, protocolContext, ethContext, + peerTaskExecutor, syncState, pivotBlockSelector, metricsSystem), diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java index 0262e276da2..9639de154d7 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java @@ -72,7 +72,7 @@ public void testExecuteAgainstPeerWithNoRetriesAndSuccessfulFlow() Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) .thenReturn(responseMessageData); - Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + Mockito.when(peerTask.processResponse(responseMessageData)).thenReturn(responseObject); Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(true); PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); @@ -101,7 +101,7 @@ public void testExecuteAgainstPeerWithNoRetriesAndPartialSuccessfulFlow() Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) .thenReturn(responseMessageData); - Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + Mockito.when(peerTask.processResponse(responseMessageData)).thenReturn(responseObject); Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(false); PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); @@ -130,7 +130,7 @@ public void testExecuteAgainstPeerWithRetriesAndSuccessfulFlowAfterFirstFailure( .thenThrow(new TimeoutException()) .thenReturn(responseMessageData); Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode); - Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + Mockito.when(peerTask.processResponse(responseMessageData)).thenReturn(responseObject); Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(true); PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); @@ -204,7 +204,7 @@ public void testExecuteAgainstPeerWithNoRetriesAndInvalidResponseMessage() Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) .thenReturn(responseMessageData); - Mockito.when(peerTask.parseResponse(responseMessageData)) + Mockito.when(peerTask.processResponse(responseMessageData)) .thenThrow(new InvalidPeerTaskResponseException()); PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); @@ -236,7 +236,7 @@ public void testExecuteWithNoRetriesAndSuccessFlow() Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) .thenReturn(responseMessageData); - Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + Mockito.when(peerTask.processResponse(responseMessageData)).thenReturn(responseObject); Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(true); PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); @@ -274,7 +274,7 @@ public void testExecuteWithPeerSwitchingAndSuccessFlow() Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, peer2)) .thenReturn(responseMessageData); - Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + Mockito.when(peerTask.processResponse(responseMessageData)).thenReturn(responseObject); Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(true); PeerTaskExecutorResult result = peerTaskExecutor.execute(peerTask); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTaskTest.java new file mode 100644 index 00000000000..90e6f738fcd --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTaskTest.java @@ -0,0 +1,264 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask.task; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.TransactionReceipt; +import org.hyperledger.besu.ethereum.eth.EthProtocol; +import org.hyperledger.besu.ethereum.eth.manager.ChainState; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException; +import org.hyperledger.besu.ethereum.eth.messages.EthPV63; +import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage; +import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage; +import org.hyperledger.besu.ethereum.mainnet.BodyValidation; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class GetReceiptsFromPeerTaskTest { + + @Test + public void testGetSubProtocol() { + GetReceiptsFromPeerTask task = new GetReceiptsFromPeerTask(Collections.emptyList(), null); + Assertions.assertEquals(EthProtocol.get(), task.getSubProtocol()); + } + + @Test + public void testGetRequestMessage() { + BlockHeader blockHeader1 = mockBlockHeader(1); + TransactionReceipt receiptForBlock1 = + new TransactionReceipt(1, 123, Collections.emptyList(), Optional.empty()); + Mockito.when(blockHeader1.getReceiptsRoot()) + .thenReturn(BodyValidation.receiptsRoot(List.of(receiptForBlock1))); + + BlockHeader blockHeader2 = mockBlockHeader(2); + TransactionReceipt receiptForBlock2 = + new TransactionReceipt(1, 456, Collections.emptyList(), Optional.empty()); + Mockito.when(blockHeader2.getReceiptsRoot()) + .thenReturn(BodyValidation.receiptsRoot(List.of(receiptForBlock2))); + + BlockHeader blockHeader3 = mockBlockHeader(3); + TransactionReceipt receiptForBlock3 = + new TransactionReceipt(1, 789, Collections.emptyList(), Optional.empty()); + Mockito.when(blockHeader3.getReceiptsRoot()) + .thenReturn(BodyValidation.receiptsRoot(List.of(receiptForBlock3))); + + GetReceiptsFromPeerTask task = + new GetReceiptsFromPeerTask(List.of(blockHeader1, blockHeader2, blockHeader3), null); + + MessageData messageData = task.getRequestMessage(); + GetReceiptsMessage getReceiptsMessage = GetReceiptsMessage.readFrom(messageData); + + Assertions.assertEquals(EthPV63.GET_RECEIPTS, getReceiptsMessage.getCode()); + Iterable hashesInMessage = getReceiptsMessage.hashes(); + List expectedHashes = + List.of( + Hash.fromHexString(StringUtils.repeat("00", 31) + "11"), + Hash.fromHexString(StringUtils.repeat("00", 31) + "21"), + Hash.fromHexString(StringUtils.repeat("00", 31) + "31")); + List actualHashes = new ArrayList<>(); + hashesInMessage.forEach(actualHashes::add); + + Assertions.assertEquals(3, actualHashes.size()); + Assertions.assertEquals( + expectedHashes.stream().sorted().toList(), actualHashes.stream().sorted().toList()); + } + + @Test + public void testParseResponseWithNullResponseMessage() { + GetReceiptsFromPeerTask task = new GetReceiptsFromPeerTask(Collections.emptyList(), null); + Assertions.assertThrows( + InvalidPeerTaskResponseException.class, () -> task.processResponse(null)); + } + + @Test + public void testParseResponseForInvalidResponse() { + BlockHeader blockHeader1 = mockBlockHeader(1); + TransactionReceipt receiptForBlock1 = + new TransactionReceipt(1, 123, Collections.emptyList(), Optional.empty()); + Mockito.when(blockHeader1.getReceiptsRoot()) + .thenReturn(BodyValidation.receiptsRoot(List.of(receiptForBlock1))); + + BlockHeader blockHeader2 = mockBlockHeader(2); + TransactionReceipt receiptForBlock2 = + new TransactionReceipt(1, 456, Collections.emptyList(), Optional.empty()); + Mockito.when(blockHeader2.getReceiptsRoot()) + .thenReturn(BodyValidation.receiptsRoot(List.of(receiptForBlock2))); + + BlockHeader blockHeader3 = mockBlockHeader(3); + TransactionReceipt receiptForBlock3 = + new TransactionReceipt(1, 789, Collections.emptyList(), Optional.empty()); + Mockito.when(blockHeader3.getReceiptsRoot()) + .thenReturn(BodyValidation.receiptsRoot(List.of(receiptForBlock3))); + GetReceiptsFromPeerTask task = + new GetReceiptsFromPeerTask(List.of(blockHeader1, blockHeader2, blockHeader3), null); + ReceiptsMessage receiptsMessage = + ReceiptsMessage.create( + List.of( + List.of(receiptForBlock1), + List.of(receiptForBlock2), + List.of(receiptForBlock3), + List.of( + new TransactionReceipt(1, 101112, Collections.emptyList(), Optional.empty())))); + + Assertions.assertThrows( + InvalidPeerTaskResponseException.class, () -> task.processResponse(receiptsMessage)); + } + + @Test + public void testParseResponse() throws InvalidPeerTaskResponseException { + BlockHeader blockHeader1 = mockBlockHeader(1); + TransactionReceipt receiptForBlock1 = + new TransactionReceipt(1, 123, Collections.emptyList(), Optional.empty()); + Mockito.when(blockHeader1.getReceiptsRoot()) + .thenReturn(BodyValidation.receiptsRoot(List.of(receiptForBlock1))); + + BlockHeader blockHeader2 = mockBlockHeader(2); + TransactionReceipt receiptForBlock2 = + new TransactionReceipt(1, 456, Collections.emptyList(), Optional.empty()); + Mockito.when(blockHeader2.getReceiptsRoot()) + .thenReturn(BodyValidation.receiptsRoot(List.of(receiptForBlock2))); + + BlockHeader blockHeader3 = mockBlockHeader(3); + TransactionReceipt receiptForBlock3 = + new TransactionReceipt(1, 789, Collections.emptyList(), Optional.empty()); + Mockito.when(blockHeader3.getReceiptsRoot()) + .thenReturn(BodyValidation.receiptsRoot(List.of(receiptForBlock3))); + + BlockHeader blockHeader4 = mockBlockHeader(4); + Mockito.when(blockHeader4.getReceiptsRoot()).thenReturn(Hash.EMPTY_TRIE_HASH); + + GetReceiptsFromPeerTask task = + new GetReceiptsFromPeerTask( + List.of(blockHeader1, blockHeader2, blockHeader3, blockHeader4), null); + + ReceiptsMessage receiptsMessage = + ReceiptsMessage.create( + List.of( + List.of(receiptForBlock1), List.of(receiptForBlock2), List.of(receiptForBlock3))); + + Map> resultMap = task.processResponse(receiptsMessage); + + Assertions.assertEquals(4, resultMap.size()); + Assertions.assertEquals(Collections.emptyList(), resultMap.get(blockHeader4)); + Assertions.assertEquals(List.of(receiptForBlock1), resultMap.get(blockHeader1)); + Assertions.assertEquals(List.of(receiptForBlock2), resultMap.get(blockHeader2)); + Assertions.assertEquals(List.of(receiptForBlock3), resultMap.get(blockHeader3)); + } + + @Test + public void testParseResponseForOnlyPrefilledEmptyTrieReceiptsRoots() + throws InvalidPeerTaskResponseException { + BlockHeader blockHeader1 = mockBlockHeader(1); + Mockito.when(blockHeader1.getReceiptsRoot()).thenReturn(Hash.EMPTY_TRIE_HASH); + + GetReceiptsFromPeerTask task = new GetReceiptsFromPeerTask(List.of(blockHeader1), null); + + ReceiptsMessage receiptsMessage = ReceiptsMessage.create(Collections.emptyList()); + + Map> resultMap = task.processResponse(receiptsMessage); + + Assertions.assertEquals(1, resultMap.size()); + Assertions.assertEquals(Collections.emptyList(), resultMap.get(blockHeader1)); + } + + @Test + public void testGetPeerRequirementFilter() { + BlockHeader blockHeader1 = mockBlockHeader(1); + TransactionReceipt receiptForBlock1 = + new TransactionReceipt(1, 123, Collections.emptyList(), Optional.empty()); + Mockito.when(blockHeader1.getReceiptsRoot()) + .thenReturn(BodyValidation.receiptsRoot(List.of(receiptForBlock1))); + + BlockHeader blockHeader2 = mockBlockHeader(2); + TransactionReceipt receiptForBlock2 = + new TransactionReceipt(1, 456, Collections.emptyList(), Optional.empty()); + Mockito.when(blockHeader2.getReceiptsRoot()) + .thenReturn(BodyValidation.receiptsRoot(List.of(receiptForBlock2))); + + BlockHeader blockHeader3 = mockBlockHeader(3); + TransactionReceipt receiptForBlock3 = + new TransactionReceipt(1, 789, Collections.emptyList(), Optional.empty()); + Mockito.when(blockHeader3.getReceiptsRoot()) + .thenReturn(BodyValidation.receiptsRoot(List.of(receiptForBlock3))); + + ProtocolSchedule protocolSchedule = Mockito.mock(ProtocolSchedule.class); + Mockito.when(protocolSchedule.anyMatch(Mockito.any())).thenReturn(false); + + GetReceiptsFromPeerTask task = + new GetReceiptsFromPeerTask( + List.of(blockHeader1, blockHeader2, blockHeader3), protocolSchedule); + + EthPeer failForIncorrectProtocol = mockPeer("incorrectProtocol", 5); + EthPeer failForShortChainHeight = mockPeer("incorrectProtocol", 1); + EthPeer successfulCandidate = mockPeer(EthProtocol.NAME, 5); + + Assertions.assertFalse(task.getPeerRequirementFilter().test(failForIncorrectProtocol)); + Assertions.assertFalse(task.getPeerRequirementFilter().test(failForShortChainHeight)); + Assertions.assertTrue(task.getPeerRequirementFilter().test(successfulCandidate)); + } + + @Test + public void testIsSuccessForPartialSuccess() { + GetReceiptsFromPeerTask task = new GetReceiptsFromPeerTask(Collections.emptyList(), null); + + Assertions.assertFalse(task.isSuccess(Collections.emptyMap())); + } + + @Test + public void testIsSuccessForFullSuccess() { + GetReceiptsFromPeerTask task = new GetReceiptsFromPeerTask(Collections.emptyList(), null); + + Map> map = new HashMap<>(); + map.put(mockBlockHeader(1), null); + + Assertions.assertTrue(task.isSuccess(map)); + } + + private BlockHeader mockBlockHeader(final long blockNumber) { + BlockHeader blockHeader = Mockito.mock(BlockHeader.class); + Mockito.when(blockHeader.getNumber()).thenReturn(blockNumber); + // second to last hex digit indicates the blockNumber, last hex digit indicates the usage of the + // hash + Mockito.when(blockHeader.getHash()) + .thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + blockNumber + "1")); + + return blockHeader; + } + + private EthPeer mockPeer(final String protocol, final long chainHeight) { + EthPeer ethPeer = Mockito.mock(EthPeer.class); + ChainState chainState = Mockito.mock(ChainState.class); + + Mockito.when(ethPeer.getProtocolName()).thenReturn(protocol); + Mockito.when(ethPeer.chainState()).thenReturn(chainState); + Mockito.when(chainState.getEstimatedHeight()).thenReturn(chainHeight); + + return ethPeer; + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckPointSyncChainDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckPointSyncChainDownloaderTest.java index 43f03100a75..56e0461f706 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckPointSyncChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckPointSyncChainDownloaderTest.java @@ -22,13 +22,19 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.chain.MutableBlockchain; +import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil; import org.hyperledger.besu.ethereum.core.Difficulty; +import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetReceiptsFromPeerTask; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; @@ -44,8 +50,15 @@ import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.plugin.services.storage.DataStorageFormat; +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; @@ -55,12 +68,16 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; +import org.junit.platform.commons.util.ReflectionUtils; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class CheckPointSyncChainDownloaderTest { protected ProtocolSchedule protocolSchedule; protected EthProtocolManager ethProtocolManager; protected EthContext ethContext; + private PeerTaskExecutor peerTaskExecutor; protected ProtocolContext protocolContext; private SyncState syncState; @@ -100,6 +117,7 @@ public void setup(final DataStorageFormat dataStorageFormat) { localBlockchain = localBlockchainSetup.getBlockchain(); otherBlockchainSetup = BlockchainSetupUtil.forTesting(dataStorageFormat); otherBlockchain = otherBlockchainSetup.getBlockchain(); + otherBlockchainSetup.importFirstBlocks(30); protocolSchedule = localBlockchainSetup.getProtocolSchedule(); protocolContext = localBlockchainSetup.getProtocolContext(); ethProtocolManager = @@ -123,6 +141,41 @@ public void setup(final DataStorageFormat dataStorageFormat) { ethContext.getEthPeers(), true, Optional.of(checkpoint)); + + peerTaskExecutor = mock(PeerTaskExecutor.class); + + when(peerTaskExecutor.execute(any(GetReceiptsFromPeerTask.class))) + .thenAnswer( + new Answer>>>() { + @Override + public PeerTaskExecutorResult>> answer( + final InvocationOnMock invocationOnMock) throws Throwable { + GetReceiptsFromPeerTask task = + invocationOnMock.getArgument(0, GetReceiptsFromPeerTask.class); + + return processTask(task); + } + }); + } + + @SuppressWarnings("unchecked") + private PeerTaskExecutorResult>> processTask( + final GetReceiptsFromPeerTask task) throws IllegalAccessException { + Map> getReceiptsFromPeerTaskResult = new HashMap<>(); + List fields = + ReflectionUtils.findFields( + task.getClass(), + (field) -> field.getName().equals("blockHeaders"), + ReflectionUtils.HierarchyTraversalMode.TOP_DOWN); + fields.forEach((f) -> f.setAccessible(true)); + Collection blockHeaders = (Collection) fields.getFirst().get(task); + blockHeaders.forEach( + (bh) -> + getReceiptsFromPeerTaskResult.put( + bh, otherBlockchain.getTxReceipts(bh.getHash()).get())); + + return new PeerTaskExecutorResult<>( + Optional.of(getReceiptsFromPeerTaskResult), PeerTaskExecutorResponseCode.SUCCESS); } @AfterEach @@ -140,6 +193,7 @@ private ChainDownloader downloader( protocolSchedule, protocolContext, ethContext, + peerTaskExecutor, syncState, new NoOpMetricsSystem(), new FastSyncState(otherBlockchain.getBlockHeader(pivotBlockNumber).get()), @@ -148,9 +202,9 @@ private ChainDownloader downloader( @ParameterizedTest @ArgumentsSource(CheckPointSyncChainDownloaderTestArguments.class) - public void shouldSyncToPivotBlockInMultipleSegments(final DataStorageFormat storageFormat) { + public void shouldSyncToPivotBlockInMultipleSegments(final DataStorageFormat storageFormat) + throws IllegalAccessException { setup(storageFormat); - otherBlockchainSetup.importFirstBlocks(30); final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain); @@ -161,6 +215,7 @@ public void shouldSyncToPivotBlockInMultipleSegments(final DataStorageFormat sto SynchronizerConfiguration.builder() .downloaderChainSegmentSize(5) .downloaderHeadersRequestSize(3) + .isPeerTaskSystemEnabled(false) .build(); final long pivotBlockNumber = 25; ethContext @@ -184,9 +239,9 @@ public void shouldSyncToPivotBlockInMultipleSegments(final DataStorageFormat sto @ParameterizedTest @ArgumentsSource(CheckPointSyncChainDownloaderTestArguments.class) - public void shouldSyncToPivotBlockInSingleSegment(final DataStorageFormat storageFormat) { + public void shouldSyncToPivotBlockInSingleSegment(final DataStorageFormat storageFormat) + throws IllegalAccessException { setup(storageFormat); - otherBlockchainSetup.importFirstBlocks(30); final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain); @@ -194,7 +249,79 @@ public void shouldSyncToPivotBlockInSingleSegment(final DataStorageFormat storag RespondingEthPeer.blockchainResponder(otherBlockchain); final long pivotBlockNumber = 10; - final SynchronizerConfiguration syncConfig = SynchronizerConfiguration.builder().build(); + final SynchronizerConfiguration syncConfig = + SynchronizerConfiguration.builder().isPeerTaskSystemEnabled(false).build(); + ethContext + .getEthPeers() + .streamAvailablePeers() + .forEach( + ethPeer -> { + ethPeer.setCheckpointHeader( + otherBlockchainSetup.getBlocks().get((int) checkpoint.blockNumber()).getHeader()); + }); + final ChainDownloader downloader = downloader(syncConfig, pivotBlockNumber); + final CompletableFuture result = downloader.start(); + + peer.respondWhileOtherThreadsWork(responder, () -> !result.isDone()); + + assertThat(result).isCompleted(); + assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(pivotBlockNumber); + assertThat(localBlockchain.getChainHeadHeader()) + .isEqualTo(otherBlockchain.getBlockHeader(pivotBlockNumber).get()); + } + + @ParameterizedTest + @ArgumentsSource(CheckPointSyncChainDownloaderTestArguments.class) + public void shouldSyncToPivotBlockInMultipleSegmentsWithPeerTaskSystem( + final DataStorageFormat storageFormat) + throws IllegalAccessException, ExecutionException, InterruptedException, TimeoutException { + setup(storageFormat); + + final RespondingEthPeer peer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain); + final RespondingEthPeer.Responder responder = + RespondingEthPeer.blockchainResponder(otherBlockchain); + + final SynchronizerConfiguration syncConfig = + SynchronizerConfiguration.builder() + .downloaderChainSegmentSize(5) + .downloaderHeadersRequestSize(3) + .isPeerTaskSystemEnabled(true) + .build(); + final long pivotBlockNumber = 25; + ethContext + .getEthPeers() + .streamAvailablePeers() + .forEach( + ethPeer -> { + ethPeer.setCheckpointHeader( + otherBlockchainSetup.getBlocks().get((int) checkpoint.blockNumber()).getHeader()); + }); + final ChainDownloader downloader = downloader(syncConfig, pivotBlockNumber); + final CompletableFuture result = downloader.start(); + + peer.respondWhileOtherThreadsWork(responder, () -> !result.isDone()); + + assertThat(result).isCompleted(); + assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(pivotBlockNumber); + assertThat(localBlockchain.getChainHeadHeader()) + .isEqualTo(otherBlockchain.getBlockHeader(pivotBlockNumber).get()); + } + + @ParameterizedTest + @ArgumentsSource(CheckPointSyncChainDownloaderTestArguments.class) + public void shouldSyncToPivotBlockInSingleSegmentWithPeerTaskSystem( + final DataStorageFormat storageFormat) throws IllegalAccessException { + setup(storageFormat); + + final RespondingEthPeer peer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain); + final RespondingEthPeer.Responder responder = + RespondingEthPeer.blockchainResponder(otherBlockchain); + + final long pivotBlockNumber = 10; + final SynchronizerConfiguration syncConfig = + SynchronizerConfiguration.builder().isPeerTaskSystemEnabled(true).build(); ethContext .getEthPeers() .streamAvailablePeers() diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java index c9cfeda1191..4559b211e6e 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java @@ -18,47 +18,64 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockBody; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockWithReceipts; import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil; import org.hyperledger.besu.ethereum.core.ProtocolScheduleFixture; +import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetReceiptsFromPeerTask; +import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.plugin.services.storage.DataStorageFormat; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; public class DownloadReceiptsStepTest { private static ProtocolContext protocolContext; + private static ProtocolSchedule protocolSchedule; private static MutableBlockchain blockchain; + private PeerTaskExecutor peerTaskExecutor; private EthProtocolManager ethProtocolManager; - private DownloadReceiptsStep downloadReceiptsStep; @BeforeAll public static void setUpClass() { final BlockchainSetupUtil setupUtil = BlockchainSetupUtil.forTesting(DataStorageFormat.FOREST); setupUtil.importFirstBlocks(20); protocolContext = setupUtil.getProtocolContext(); + protocolSchedule = setupUtil.getProtocolSchedule(); blockchain = setupUtil.getBlockchain(); } @BeforeEach public void setUp() { + peerTaskExecutor = mock(PeerTaskExecutor.class); TransactionPool transactionPool = mock(TransactionPool.class); ethProtocolManager = EthProtocolManagerTestUtil.create( @@ -68,12 +85,17 @@ public void setUp() { protocolContext.getWorldStateArchive(), transactionPool, EthProtocolConfiguration.defaultConfig()); - downloadReceiptsStep = - new DownloadReceiptsStep(ethProtocolManager.ethContext(), new NoOpMetricsSystem()); } @Test public void shouldDownloadReceiptsForBlocks() { + DownloadReceiptsStep downloadReceiptsStep = + new DownloadReceiptsStep( + protocolSchedule, + ethProtocolManager.ethContext(), + peerTaskExecutor, + SynchronizerConfiguration.builder().isPeerTaskSystemEnabled(false).build(), + new NoOpMetricsSystem()); final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); final List blocks = asList(block(1), block(2), block(3), block(4)); @@ -90,6 +112,39 @@ public void shouldDownloadReceiptsForBlocks() { blockWithReceipts(4))); } + @Test + public void shouldDownloadReceiptsForBlocksUsingPeerTaskSystem() + throws ExecutionException, InterruptedException { + DownloadReceiptsStep downloadReceiptsStep = + new DownloadReceiptsStep( + protocolSchedule, + ethProtocolManager.ethContext(), + peerTaskExecutor, + SynchronizerConfiguration.builder().isPeerTaskSystemEnabled(true).build(), + new NoOpMetricsSystem()); + + final List blocks = asList(mockBlock(), mockBlock(), mockBlock(), mockBlock()); + Map> receiptsMap = new HashMap<>(); + blocks.forEach( + (b) -> receiptsMap.put(b.getHeader(), List.of(Mockito.mock(TransactionReceipt.class)))); + PeerTaskExecutorResult>> peerTaskResult = + new PeerTaskExecutorResult<>( + Optional.of(receiptsMap), PeerTaskExecutorResponseCode.SUCCESS); + Mockito.when(peerTaskExecutor.execute(Mockito.any(GetReceiptsFromPeerTask.class))) + .thenReturn(peerTaskResult); + + final CompletableFuture> result = downloadReceiptsStep.apply(blocks); + + assertThat(result.get().get(0).getBlock()).isEqualTo(blocks.get(0)); + assertThat(result.get().get(0).getReceipts().size()).isEqualTo(1); + assertThat(result.get().get(1).getBlock()).isEqualTo(blocks.get(1)); + assertThat(result.get().get(1).getReceipts().size()).isEqualTo(1); + assertThat(result.get().get(2).getBlock()).isEqualTo(blocks.get(2)); + assertThat(result.get().get(2).getReceipts().size()).isEqualTo(1); + assertThat(result.get().get(3).getBlock()).isEqualTo(blocks.get(3)); + assertThat(result.get().get(3).getReceipts().size()).isEqualTo(1); + } + private Block block(final long number) { final BlockHeader header = blockchain.getBlockHeader(number).get(); return new Block(header, blockchain.getBlockBody(header.getHash()).get()); @@ -100,4 +155,16 @@ private BlockWithReceipts blockWithReceipts(final long number) { final List receipts = blockchain.getTxReceipts(block.getHash()).get(); return new BlockWithReceipts(block, receipts); } + + private Block mockBlock() { + final Block block = Mockito.mock(Block.class); + final BlockHeader blockHeader = Mockito.mock(BlockHeader.class); + Mockito.when(block.getHeader()).thenAnswer((invocationOnMock) -> blockHeader); + Mockito.when(blockHeader.getReceiptsRoot()).thenReturn(Hash.fromHexStringLenient("DEADBEEF")); + final BlockBody blockBody = Mockito.mock(BlockBody.class); + Mockito.when(block.getBody()).thenAnswer((invocationOnMock) -> blockBody); + Mockito.when(blockBody.getTransactions()) + .thenAnswer((invocationOnMock) -> List.of(Mockito.mock(Transaction.class))); + return block; + } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java index 37ca5be2e99..bc493ebd036 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java @@ -25,6 +25,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -71,6 +72,7 @@ public class FastDownloaderFactoryTest { @Mock private ProtocolContext protocolContext; @Mock private MetricsSystem metricsSystem; @Mock private EthContext ethContext; + @Mock private PeerTaskExecutor peerTaskExecutor; @Mock private SyncState syncState; @Mock private Clock clock; @Mock private Path dataDirectory; @@ -114,6 +116,7 @@ public void shouldThrowIfSyncModeChangedWhileFastSyncIncomplete( protocolContext, metricsSystem, ethContext, + peerTaskExecutor, worldStateStorageCoordinator, syncState, clock, @@ -139,6 +142,7 @@ public void shouldNotThrowIfSyncModeChangedWhileFastSyncComplete( protocolContext, metricsSystem, ethContext, + peerTaskExecutor, worldStateStorageCoordinator, syncState, clock, @@ -167,6 +171,7 @@ public void shouldNotThrowWhenFastSyncModeRequested(final DataStorageFormat data protocolContext, metricsSystem, ethContext, + peerTaskExecutor, worldStateStorageCoordinator, syncState, clock, @@ -202,6 +207,7 @@ public void shouldClearWorldStateDuringFastSyncWhenStateQueDirectoryExists( protocolContext, metricsSystem, ethContext, + peerTaskExecutor, worldStateStorageCoordinator, syncState, clock, @@ -239,6 +245,7 @@ public void shouldCrashWhenStateQueueIsNotDirectory(final DataStorageFormat data protocolContext, metricsSystem, ethContext, + peerTaskExecutor, worldStateStorageCoordinator, syncState, clock, diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java index 68caf2182c0..7af807c1c7b 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java @@ -34,6 +34,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; @@ -536,6 +537,7 @@ private FastSyncActions createFastSyncActions( protocolSchedule, protocolContext, ethContext, + new PeerTaskExecutor(null, null, new NoOpMetricsSystem()), new SyncState(blockchain, ethContext.getEthPeers(), true, Optional.empty()), pivotBlockSelector, new NoOpMetricsSystem()); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java index 34014246d28..0e5b5ec2c7c 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java @@ -29,6 +29,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.messages.EthPV62; import org.hyperledger.besu.ethereum.eth.messages.GetBlockHeadersMessage; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; @@ -110,6 +111,7 @@ private ChainDownloader downloader( protocolSchedule, protocolContext, ethContext, + new PeerTaskExecutor(null, null, new NoOpMetricsSystem()), syncState, new NoOpMetricsSystem(), new FastSyncState(otherBlockchain.getBlockHeader(pivotBlockNumber).get()),