Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Integration of RecursivePeerRefreshState and PeerDiscoveryController (#…
Browse files Browse the repository at this point in the history
…420)

* minimal changeset

* candidate.getPeer().getStatus().equals(PeerDiscoveryStatus.BONDED)

* update

* fix peerSeenTwice

* update++

* update

* update

* update

* yaya
g

* yaya
g

* Ensure round timers are cancelled and many other cleanups.

* Shutdown peer refresh state executors on stop.

* Trigger new recursive peer search when refreshing peer table.  Reduce round timeout to 5 seconds.

* tests passing

* cleaning up

* remove useless, inbhibitive artifact from nearestPeers method

* add round cieling

* remove mock from test

* working

* revision to RecursivePeerRefreshStateTest

* test

* Use TimerUtil to schedule timeout instead of our own ScheduledExecutorService.

* Remove isBootstrap from Interaction.  Start adding tests for RecursivePeerRefreshState.

* update to bootstrapPeersRetriesSent

* fics helper method

* cleaning up

* cleaning up tests

* implement shouldOnlyQueryClosestThreeNeighbours

* update to tests

* shouldBondWithNewNeighboursWhenSomeRequestsTimeOut

* increase test coverage

* spotless

* test update

* adding tests

* shouldNotQueryNodeThatIsAlreadyQueried

* shouldBondWithPeersInNeighboursResponseReceivedAfterTimeout

* simplification

* add tests

* Fix tests to handle whitelisting.

* Ensure late bonding or neighbours responses don't kick off a new round.

* cleaning up

* add for loop

* respoins with pong

* concis

* failing test

* test passing

* update

* fix to tableRefreshSingleNode

* update coment

* Update ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java

Co-Authored-By: s-matthew-english <s.matthew.english@gmail.com>

* update

* tableRefreshSingleNode

* deconstructedIncrementalUpdateBootstrapPeersList

* setKeccak256

* Update ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshStateTest.java

Co-Authored-By: s-matthew-english <s.matthew.english@gmail.com>

* Update ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshStateTest.java

Co-Authored-By: s-matthew-english <s.matthew.english@gmail.com>

* Update ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshStateTest.java

Co-Authored-By: s-matthew-english <s.matthew.english@gmail.com>

* Update ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshStateTest.java

Co-Authored-By: s-matthew-english <s.matthew.english@gmail.com>

* synchronized

* update

* merge

* filtering out known peers

* update

* update

* update II
  • Loading branch information
smatthewenglish authored Feb 12, 2019
1 parent 15784d4 commit c4ad4f6
Show file tree
Hide file tree
Showing 9 changed files with 952 additions and 710 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package tech.pegasys.pantheon.ethereum.p2p.discovery.internal;

import static java.util.Collections.emptyList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerTable.AddResult.Outcome;
Expand All @@ -39,6 +38,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -92,6 +92,7 @@ public class PeerDiscoveryController {

private static final Logger LOG = LogManager.getLogger();
private static final long REFRESH_CHECK_INTERVAL_MILLIS = MILLISECONDS.convert(30, SECONDS);
private static final int PEER_REFRESH_ROUND_TIMEOUT_IN_SECONDS = 5;
protected final TimerUtil timerUtil;
private final PeerTable peerTable;

Expand Down Expand Up @@ -123,6 +124,8 @@ public class PeerDiscoveryController {
// Observers for "peer bonded" discovery events.
private final Subscribers<Consumer<PeerBondedEvent>> peerBondedObservers;

private RecursivePeerRefreshState recursivePeerRefreshState;

public PeerDiscoveryController(
final KeyPair keypair,
final DiscoveryPeer localPeer,
Expand Down Expand Up @@ -154,14 +157,31 @@ public CompletableFuture<?> start() {
}

bootstrapNodes.stream()
.filter(node -> peerTable.tryAdd(node).getOutcome() == Outcome.ADDED)
.filter(node -> whitelistIfPresentIsNodePermitted(node))
.forEach(node -> bond(node, true));
.filter(this::whitelistIfPresentIsNodePermitted)
.forEach(peerTable::tryAdd);

recursivePeerRefreshState =
new RecursivePeerRefreshState(
peerBlacklist,
nodeWhitelistController,
this::bond,
this::findNodes,
timerUtil,
localPeer.getId(),
peerTable,
PEER_REFRESH_ROUND_TIMEOUT_IN_SECONDS,
100);

final List<DiscoveryPeer> initialDiscoveryPeers =
bootstrapNodes.stream()
.filter(this::whitelistIfPresentIsNodePermitted)
.collect(Collectors.toList());
recursivePeerRefreshState.start(initialDiscoveryPeers, localPeer.getId());

final long timerId =
timerUtil.setPeriodic(
Math.min(REFRESH_CHECK_INTERVAL_MILLIS, tableRefreshIntervalMs),
() -> refreshTableIfRequired());
this::refreshTableIfRequired);
tableRefreshTimerId = OptionalLong.of(timerId);

return CompletableFuture.completedFuture(null);
Expand All @@ -181,7 +201,7 @@ public CompletableFuture<?> stop() {

private boolean whitelistIfPresentIsNodePermitted(final DiscoveryPeer sender) {
return nodeWhitelistController
.map(nodeWhitelistController1 -> nodeWhitelistController1.isPermitted(sender))
.map(nodeWhitelistController -> nodeWhitelistController.isPermitted(sender))
.orElse(true);
}

Expand Down Expand Up @@ -227,52 +247,27 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) {
final PingPacketData ping = packet.getPacketData(PingPacketData.class).get();
respondToPing(ping, packet.getHash(), peer);
}

break;
case PONG:
{
LOG.trace("Received PONG packet from {}", sender.getEnodeURI());
matchInteraction(packet)
.ifPresent(
interaction -> {
if (peerBlacklisted) {
return;
}
addToPeerTable(peer);

// If this was a bootstrap peer, let's ask it for nodes near to us.
if (interaction.isBootstrap()) {
findNodes(peer, localPeer.getId());
}
});
break;
}
case NEIGHBORS:
LOG.trace("Received NEIGHBORS packet from {}", sender.getEnodeURI());
LOG.trace("Received PONG packet from {}", sender.getEnodeURI());
matchInteraction(packet)
.ifPresent(
interaction -> {
// Extract the peers from the incoming packet.
final List<DiscoveryPeer> neighbors =
packet
.getPacketData(NeighborsPacketData.class)
.map(NeighborsPacketData::getNodes)
.orElse(emptyList());

for (final DiscoveryPeer neighbor : neighbors) {
// If the peer is not whitelisted, is blacklisted, is already known, or
// represents this node, skip bonding
if (!whitelistIfPresentIsNodePermitted(neighbor)
|| peerBlacklist.contains(neighbor)
|| peerTable.get(neighbor).isPresent()
|| neighbor.getId().equals(localPeer.getId())) {
continue;
}
bond(neighbor, false);
if (peerBlacklisted) {
return;
}
addToPeerTable(peer);
recursivePeerRefreshState.onBondingComplete(peer);
});
break;

case NEIGHBORS:
LOG.trace("Received NEIGHBORS packet from {}", sender.getEnodeURI());
matchInteraction(packet)
.ifPresent(
interaction ->
recursivePeerRefreshState.onNeighboursPacketReceived(
peer, packet.getPacketData(NeighborsPacketData.class).orElse(null)));
break;
case FIND_NEIGHBORS:
LOG.trace("Received FIND_NEIGHBORS packet from {}", sender.getEnodeURI());
if (!peerKnown || peerBlacklisted) {
Expand Down Expand Up @@ -341,24 +336,29 @@ private void refreshTableIfRequired() {
}
}

@VisibleForTesting
RecursivePeerRefreshState getRecursivePeerRefreshState() {
return recursivePeerRefreshState;
}

/**
* Refreshes the peer table by generating a random ID and interrogating the closest nodes for it.
* Currently the refresh process is NOT recursive.
*/
private void refreshTable() {
final BytesValue target = Peer.randomId();
peerTable.nearestPeers(Peer.randomId(), 16).forEach((peer) -> findNodes(peer, target));
final List<DiscoveryPeer> initialPeers = peerTable.nearestPeers(Peer.randomId(), 16);
recursivePeerRefreshState.start(initialPeers, target);
lastRefreshTime = System.currentTimeMillis();
}

/**
* Initiates a bonding PING-PONG cycle with a peer.
*
* @param peer The targeted peer.
* @param bootstrap Whether this is a bootstrap interaction.
*/
@VisibleForTesting
void bond(final DiscoveryPeer peer, final boolean bootstrap) {
void bond(final DiscoveryPeer peer) {
peer.setFirstDiscovered(System.currentTimeMillis());
peer.setStatus(PeerDiscoveryStatus.BONDING);

Expand All @@ -383,7 +383,7 @@ void bond(final DiscoveryPeer peer, final boolean bootstrap) {

// The filter condition will be updated as soon as the action is performed.
final PeerInteractionState ping =
new PeerInteractionState(action, PacketType.PONG, (packet) -> false, true, bootstrap);
new PeerInteractionState(action, PacketType.PONG, (packet) -> false, true);
dispatchInteraction(peer, ping);
}

Expand Down Expand Up @@ -414,7 +414,7 @@ private void findNodes(final DiscoveryPeer peer, final BytesValue target) {
sendPacket(peer, PacketType.FIND_NEIGHBORS, data);
};
final PeerInteractionState interaction =
new PeerInteractionState(action, PacketType.NEIGHBORS, packet -> true, true, false);
new PeerInteractionState(action, PacketType.NEIGHBORS, packet -> true, true);
dispatchInteraction(peer, interaction);
}

Expand Down Expand Up @@ -483,22 +483,18 @@ private class PeerInteractionState implements Predicate<Packet> {
private Predicate<Packet> filter;
/** Whether the action associated to this state is retryable or not. */
private final boolean retryable;
/** Whether this is an entry for a bootstrap peer. */
private final boolean bootstrap;
/** Timers associated with this entry. */
private OptionalLong timerId = OptionalLong.empty();

PeerInteractionState(
final Consumer<PeerInteractionState> action,
final PacketType expectedType,
final Predicate<Packet> filter,
final boolean retryable,
final boolean bootstrap) {
final boolean retryable) {
this.action = action;
this.expectedType = expectedType;
this.filter = filter;
this.retryable = retryable;
this.bootstrap = bootstrap;
}

@Override
Expand All @@ -510,10 +506,6 @@ void updateFilter(final Predicate<Packet> filter) {
this.filter = filter;
}

boolean isBootstrap() {
return bootstrap;
}

/**
* Executes the action associated with this state. Sets a "boomerang" timer to itself in case
* the action is retryable.
Expand Down
Loading

0 comments on commit c4ad4f6

Please sign in to comment.