Skip to content

Commit

Permalink
Filter out disconnected peers when fetching available peers (hyperled…
Browse files Browse the repository at this point in the history
…ger#4269)

* Filter out disconnected peers when fetching available peers

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
  • Loading branch information
fab-10 authored and garyschulte committed Sep 7, 2022
1 parent 5e87ab2 commit ca34517
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Additions and Improvements
- Upgrade besu-native to 0.6.0 and use Blake2bf native implementation if available by default [#4264](https://github.com/hyperledger/besu/pull/4264)
- Better management of jemalloc presence/absence in startup script [#4237](https://github.com/hyperledger/besu/pull/4237)
- Filter out disconnected peers when fetching available peers [#4269](https://github.com/hyperledger/besu/pull/4269)

### Bug Fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ public Stream<EthPeer> streamAllPeers() {
}

public Stream<EthPeer> streamAvailablePeers() {
return streamAllPeers().filter(EthPeer::readyForRequests);
return streamAllPeers()
.filter(EthPeer::readyForRequests)
.filter(peer -> !peer.isDisconnected());
}

public Stream<EthPeer> streamBestPeers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@ public boolean attemptExecution() {
if (result.isDone()) {
return true;
}
final Optional<EthPeer> leastBusySuitablePeer = getLeastBusySuitablePeer();
if (!leastBusySuitablePeer.isPresent()) {
final Optional<EthPeer> maybePeer = getPeerToUse();
if (maybePeer.isEmpty()) {
// No peers have the required height.
result.completeExceptionally(new NoAvailablePeersException());
return true;
} else {
// At least one peer has the required height, but we not be able to use it if it's busy
final Optional<EthPeer> selectedPeer =
leastBusySuitablePeer.filter(EthPeer::hasAvailableRequestCapacity);
// At least one peer has the required height, but we are not able to use it if it's busy
final Optional<EthPeer> maybePeerWithCapacity =
maybePeer.filter(EthPeer::hasAvailableRequestCapacity);

selectedPeer.ifPresent(this::sendRequest);
return selectedPeer.isPresent();
maybePeerWithCapacity.ifPresent(this::sendRequest);
return maybePeerWithCapacity.isPresent();
}
}

Expand All @@ -79,8 +79,9 @@ private synchronized void sendRequest(final EthPeer peer) {
}
}

private Optional<EthPeer> getLeastBusySuitablePeer() {
return peer.isPresent()
private Optional<EthPeer> getPeerToUse() {
// return the assigned peer if still valid, otherwise switch to another peer
return peer.filter(p -> !p.isDisconnected()).isPresent()
? peer
: ethPeers
.streamAvailablePeers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private Stream<EthPeer> remainingPeersToTry() {
return getEthContext()
.getEthPeers()
.streamBestPeers()
.filter(peer -> !peer.isDisconnected() && !triedPeers.contains(peer));
.filter(peer -> !triedPeers.contains(peer));
}

private void refreshPeers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,29 @@ public void shouldReduceTheBlockSegmentSizeAfterEachRetry() {
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);

peerCountToTimeout.set(3);
final List<Block> requestedData = generateDataToBeRequested(10);

final CompleteBlocksTask task = createTask(requestedData);
final CompletableFuture<List<Block>> future = task.run();

final List<MessageData> messageCollector = new ArrayList<>();
final List<MessageData> messageCollector = new ArrayList<>(4);

respondingPeer.respond(
peerCountToTimeout.set(4);
// after 3 timeouts a peer is disconnected, so we need another peer to reach 4 retries
respondingPeer.respondTimes(
RespondingEthPeer.wrapResponderWithCollector(
RespondingEthPeer.emptyResponder(), messageCollector),
3);
final RespondingEthPeer respondingPeer2 =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
respondingPeer2.respond(
RespondingEthPeer.wrapResponderWithCollector(
RespondingEthPeer.emptyResponder(), messageCollector));

assertThat(batchSize(messageCollector.get(0))).isEqualTo(10);
assertThat(batchSize(messageCollector.get(1))).isEqualTo(5);
assertThat(batchSize(messageCollector.get(2))).isEqualTo(4);
assertThat(batchSize(messageCollector.get(3))).isEqualTo(3);
assertThat(future.isCompletedExceptionally()).isTrue();
assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class);
}
Expand All @@ -110,21 +118,29 @@ public void shouldNotReduceTheBlockSegmentSizeIfOnlyOneBlockNeeded() {
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);

peerCountToTimeout.set(3);
final List<Block> requestedData = generateDataToBeRequested(1);

final EthTask<List<Block>> task = createTask(requestedData);
final CompletableFuture<List<Block>> future = task.run();

final List<MessageData> messageCollector = new ArrayList<>();
final List<MessageData> messageCollector = new ArrayList<>(4);

respondingPeer.respond(
peerCountToTimeout.set(4);
// after 3 timeouts a peer is disconnected, so we need another peer to reach 4 retries
respondingPeer.respondTimes(
RespondingEthPeer.wrapResponderWithCollector(
RespondingEthPeer.emptyResponder(), messageCollector),
3);
final RespondingEthPeer respondingPeer2 =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
respondingPeer2.respond(
RespondingEthPeer.wrapResponderWithCollector(
RespondingEthPeer.emptyResponder(), messageCollector));

assertThat(batchSize(messageCollector.get(0))).isEqualTo(1);
assertThat(batchSize(messageCollector.get(1))).isEqualTo(1);
assertThat(batchSize(messageCollector.get(2))).isEqualTo(1);
assertThat(batchSize(messageCollector.get(3))).isEqualTo(1);
assertThat(future.isCompletedExceptionally()).isTrue();
assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.EthTaskException;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.EthTaskException.FailureReason;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
Expand Down Expand Up @@ -117,8 +118,7 @@ public void shouldFailIfPeerDisconnects() {
assertThat(failure.get()).isNotNull();
final Throwable error = ExceptionUtils.rootCause(failure.get());
assertThat(error).isInstanceOf(EthTaskException.class);
assertThat(((EthTaskException) error).reason())
.isEqualTo(EthTaskException.FailureReason.PEER_DISCONNECTED);
assertThat(((EthTaskException) error).reason()).isEqualTo(FailureReason.NO_AVAILABLE_PEERS);
}

@Test
Expand Down

0 comments on commit ca34517

Please sign in to comment.