Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Integration of RecursivePeerRefreshState and PeerDiscoveryController #420

Merged
Merged
Show file tree
Hide file tree
Changes from 56 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
943f775
minimal changeset
smatthewenglish Jan 24, 2019
8e9bc8a
candidate.getPeer().getStatus().equals(PeerDiscoveryStatus.BONDED)
smatthewenglish Jan 24, 2019
9aa3c69
update
smatthewenglish Jan 25, 2019
9965a90
fix peerSeenTwice
smatthewenglish Jan 25, 2019
48aeb33
update++
smatthewenglish Jan 25, 2019
02e1dfc
update
smatthewenglish Jan 25, 2019
c1a161d
update
smatthewenglish Jan 25, 2019
a1fa8ae
update
smatthewenglish Jan 25, 2019
7b9f565
yaya
smatthewenglish Jan 25, 2019
6a38f17
yaya
smatthewenglish Jan 25, 2019
93cd731
Ensure round timers are cancelled and many other cleanups.
ajsutton Jan 31, 2019
fda9183
Shutdown peer refresh state executors on stop.
ajsutton Jan 31, 2019
d7ce507
Trigger new recursive peer search when refreshing peer table. Reduce…
ajsutton Jan 31, 2019
c18bfa5
tests passing
smatthewenglish Jan 31, 2019
221b6cd
cleaning up
smatthewenglish Jan 31, 2019
2f176bc
remove useless, inbhibitive artifact from nearestPeers method
smatthewenglish Jan 31, 2019
022e68d
add round cieling
smatthewenglish Jan 31, 2019
fa8a0cb
remove mock from test
smatthewenglish Jan 31, 2019
d186a6c
working
smatthewenglish Jan 31, 2019
91c0955
revision to RecursivePeerRefreshStateTest
smatthewenglish Feb 1, 2019
d7f2c8d
test
smatthewenglish Feb 1, 2019
2bde9c5
Use TimerUtil to schedule timeout instead of our own ScheduledExecuto…
ajsutton Feb 1, 2019
b293620
Remove isBootstrap from Interaction. Start adding tests for Recursiv…
ajsutton Feb 1, 2019
eb22790
update to bootstrapPeersRetriesSent
smatthewenglish Feb 1, 2019
e251e76
fics helper method
smatthewenglish Feb 1, 2019
ad70f09
cleaning up
smatthewenglish Feb 1, 2019
9a8fcb8
cleaning up tests
smatthewenglish Feb 1, 2019
5abdac1
implement shouldOnlyQueryClosestThreeNeighbours
smatthewenglish Feb 1, 2019
a4c4b78
update to tests
smatthewenglish Feb 1, 2019
3c6671c
shouldBondWithNewNeighboursWhenSomeRequestsTimeOut
smatthewenglish Feb 1, 2019
de17792
increase test coverage
smatthewenglish Feb 1, 2019
6ae84c7
spotless
smatthewenglish Feb 1, 2019
789e477
test update
smatthewenglish Feb 4, 2019
c60bbd5
adding tests
smatthewenglish Feb 4, 2019
892a342
shouldNotQueryNodeThatIsAlreadyQueried
smatthewenglish Feb 4, 2019
f74dc18
shouldBondWithPeersInNeighboursResponseReceivedAfterTimeout
smatthewenglish Feb 4, 2019
0219607
simplification
smatthewenglish Feb 4, 2019
d45da4f
add tests
smatthewenglish Feb 4, 2019
221104b
merge conflicts
smatthewenglish Feb 4, 2019
41a8988
Fix tests to handle whitelisting.
ajsutton Feb 4, 2019
6168fa0
Ensure late bonding or neighbours responses don't kick off a new round.
ajsutton Feb 4, 2019
84b60ee
Merge remote-tracking branch 'upstream/master' into iterative-integra…
smatthewenglish Feb 5, 2019
cda3725
cleaning up
smatthewenglish Feb 5, 2019
73e29e8
add for loop
smatthewenglish Feb 5, 2019
0bc7b56
respoins with pong
smatthewenglish Feb 5, 2019
00158e7
concis
smatthewenglish Feb 5, 2019
6f98946
failing test
smatthewenglish Feb 5, 2019
022f40f
test passing
smatthewenglish Feb 5, 2019
3113e80
update
smatthewenglish Feb 5, 2019
46409d7
fix to tableRefreshSingleNode
smatthewenglish Feb 5, 2019
734ab01
Merge remote-tracking branch 'upstream/master' into iterative-integra…
smatthewenglish Feb 5, 2019
47482cd
update coment
smatthewenglish Feb 6, 2019
913d572
Update ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/…
mbaxter Feb 6, 2019
d03888c
update
smatthewenglish Feb 6, 2019
d55d7dd
Merge remote-tracking branch 'upstream/master' into iterative-integra…
smatthewenglish Feb 6, 2019
5c49e1d
merge
smatthewenglish Feb 7, 2019
9ab6760
tableRefreshSingleNode
smatthewenglish Feb 11, 2019
88b90fc
deconstructedIncrementalUpdateBootstrapPeersList
smatthewenglish Feb 11, 2019
f5079c4
setKeccak256
smatthewenglish Feb 11, 2019
d660ca5
Update ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/…
mbaxter Feb 11, 2019
f7413b3
Update ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/…
mbaxter Feb 11, 2019
c5c7906
Update ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/…
mbaxter Feb 11, 2019
c03c932
Update ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/…
mbaxter Feb 11, 2019
8ce92f4
synchronized
smatthewenglish Feb 11, 2019
3503716
update
smatthewenglish Feb 11, 2019
38130c9
merge
smatthewenglish Feb 11, 2019
acfa899
merge
smatthewenglish Feb 11, 2019
bcae3d5
filtering out known peers
smatthewenglish Feb 11, 2019
eda6ae1
update
smatthewenglish Feb 11, 2019
c77f9a9
update
smatthewenglish Feb 12, 2019
b8e9aff
Merge remote-tracking branch 'upstream/master' into iterative-integra…
smatthewenglish Feb 12, 2019
65714cf
update II
smatthewenglish Feb 12, 2019
b764320
merge ii
smatthewenglish Feb 12, 2019
6d2bbbd
Merge branch 'master' into iterative-integration
smatthewenglish Feb 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package tech.pegasys.pantheon.ethereum.p2p.discovery.internal;

import static java.util.Collections.emptyList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerTable.AddResult.Outcome;
Expand All @@ -39,6 +38,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -92,6 +92,7 @@ public class PeerDiscoveryController {

private static final Logger LOG = LogManager.getLogger();
private static final long REFRESH_CHECK_INTERVAL_MILLIS = MILLISECONDS.convert(30, SECONDS);
private static final int PEER_REFRESH_ROUND_TIMEOUT_IN_SECONDS = 5;
protected final TimerUtil timerUtil;
private final PeerTable peerTable;

Expand Down Expand Up @@ -123,6 +124,8 @@ public class PeerDiscoveryController {
// Observers for "peer bonded" discovery events.
private final Subscribers<Consumer<PeerBondedEvent>> peerBondedObservers;

private RecursivePeerRefreshState recursivePeerRefreshState;

public PeerDiscoveryController(
mbaxter marked this conversation as resolved.
Show resolved Hide resolved
final KeyPair keypair,
final DiscoveryPeer localPeer,
Expand Down Expand Up @@ -154,14 +157,29 @@ public CompletableFuture<?> start() {
}

bootstrapNodes.stream()
.filter(node -> peerTable.tryAdd(node).getOutcome() == Outcome.ADDED)
.filter(node -> whitelistIfPresentIsNodePermitted(node))
.forEach(node -> bond(node, true));
.filter(this::whitelistIfPresentIsNodePermitted)
.forEach(peerTable::tryAdd);

recursivePeerRefreshState =
new RecursivePeerRefreshState(
peerBlacklist,
nodeWhitelistController,
this::bond,
this::findNodes,
timerUtil,
PEER_REFRESH_ROUND_TIMEOUT_IN_SECONDS,
100);

final List<DiscoveryPeer> initialDiscoveryPeers =
bootstrapNodes.stream()
.filter(this::whitelistIfPresentIsNodePermitted)
.collect(Collectors.toList());
recursivePeerRefreshState.start(initialDiscoveryPeers, localPeer.getId());

final long timerId =
timerUtil.setPeriodic(
Math.min(REFRESH_CHECK_INTERVAL_MILLIS, tableRefreshIntervalMs),
() -> refreshTableIfRequired());
this::refreshTableIfRequired);
tableRefreshTimerId = OptionalLong.of(timerId);

return CompletableFuture.completedFuture(null);
Expand All @@ -181,7 +199,7 @@ public CompletableFuture<?> stop() {

private boolean whitelistIfPresentIsNodePermitted(final DiscoveryPeer sender) {
return nodeWhitelistController
.map(nodeWhitelistController1 -> nodeWhitelistController1.isPermitted(sender))
.map(nodeWhitelistController -> nodeWhitelistController.isPermitted(sender))
.orElse(true);
}

Expand Down Expand Up @@ -239,38 +257,17 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) {
return;
}
addToPeerTable(peer);

// If this was a bootstrap peer, let's ask it for nodes near to us.
if (interaction.isBootstrap()) {
findNodes(peer, localPeer.getId());
}
recursivePeerRefreshState.onBondingComplete(peer);
});
break;
}
case NEIGHBORS:
LOG.trace("Received NEIGHBORS packet from {}", sender.getEnodeURI());
matchInteraction(packet)
.ifPresent(
interaction -> {
// Extract the peers from the incoming packet.
final List<DiscoveryPeer> neighbors =
packet
.getPacketData(NeighborsPacketData.class)
.map(NeighborsPacketData::getNodes)
.orElse(emptyList());

for (final DiscoveryPeer neighbor : neighbors) {
// If the peer is not whitelisted, is blacklisted, is already known, or
// represents this node, skip bonding
if (!whitelistIfPresentIsNodePermitted(neighbor)
|| peerBlacklist.contains(neighbor)
|| peerTable.get(neighbor).isPresent()
|| neighbor.getId().equals(localPeer.getId())) {
continue;
}
bond(neighbor, false);
}
});
interaction ->
recursivePeerRefreshState.onNeighboursPacketReceived(
peer, packet.getPacketData(NeighborsPacketData.class).orElse(null)));
break;

case FIND_NEIGHBORS:
Expand Down Expand Up @@ -341,24 +338,29 @@ private void refreshTableIfRequired() {
}
}

@VisibleForTesting
RecursivePeerRefreshState getRecursivePeerRefreshState() {
return recursivePeerRefreshState;
}

/**
* Refreshes the peer table by generating a random ID and interrogating the closest nodes for it.
* Currently the refresh process is NOT recursive.
*/
private void refreshTable() {
final BytesValue target = Peer.randomId();
peerTable.nearestPeers(Peer.randomId(), 16).forEach((peer) -> findNodes(peer, target));
final List<DiscoveryPeer> initialPeers = peerTable.nearestPeers(Peer.randomId(), 16);
recursivePeerRefreshState.start(initialPeers, target);
lastRefreshTime = System.currentTimeMillis();
}

/**
* Initiates a bonding PING-PONG cycle with a peer.
*
* @param peer The targeted peer.
* @param bootstrap Whether this is a bootstrap interaction.
*/
@VisibleForTesting
void bond(final DiscoveryPeer peer, final boolean bootstrap) {
void bond(final DiscoveryPeer peer) {
peer.setFirstDiscovered(System.currentTimeMillis());
peer.setStatus(PeerDiscoveryStatus.BONDING);

Expand All @@ -383,7 +385,7 @@ void bond(final DiscoveryPeer peer, final boolean bootstrap) {

// The filter condition will be updated as soon as the action is performed.
final PeerInteractionState ping =
new PeerInteractionState(action, PacketType.PONG, (packet) -> false, true, bootstrap);
new PeerInteractionState(action, PacketType.PONG, (packet) -> false, true);
dispatchInteraction(peer, ping);
}

Expand Down Expand Up @@ -414,7 +416,7 @@ private void findNodes(final DiscoveryPeer peer, final BytesValue target) {
sendPacket(peer, PacketType.FIND_NEIGHBORS, data);
};
final PeerInteractionState interaction =
new PeerInteractionState(action, PacketType.NEIGHBORS, packet -> true, true, false);
new PeerInteractionState(action, PacketType.NEIGHBORS, packet -> true, true);
dispatchInteraction(peer, interaction);
}

Expand Down Expand Up @@ -483,22 +485,18 @@ private class PeerInteractionState implements Predicate<Packet> {
private Predicate<Packet> filter;
/** Whether the action associated to this state is retryable or not. */
private final boolean retryable;
/** Whether this is an entry for a bootstrap peer. */
private final boolean bootstrap;
/** Timers associated with this entry. */
private OptionalLong timerId = OptionalLong.empty();

PeerInteractionState(
final Consumer<PeerInteractionState> action,
final PacketType expectedType,
final Predicate<Packet> filter,
final boolean retryable,
final boolean bootstrap) {
final boolean retryable) {
this.action = action;
this.expectedType = expectedType;
this.filter = filter;
this.retryable = retryable;
this.bootstrap = bootstrap;
}

@Override
Expand All @@ -510,10 +508,6 @@ void updateFilter(final Predicate<Packet> filter) {
this.filter = filter;
}

boolean isBootstrap() {
return bootstrap;
}

/**
* Executes the action associated with this state. Sets a "boomerang" timer to itself in case
* the action is retryable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import tech.pegasys.pantheon.crypto.Hash;
import tech.pegasys.pantheon.ethereum.p2p.discovery.DiscoveryPeer;
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryStatus;
import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerId;
import tech.pegasys.pantheon.util.bytes.BytesValue;
Expand Down Expand Up @@ -180,7 +179,6 @@ private void buildBloomFilter() {
public List<DiscoveryPeer> nearestPeers(final BytesValue target, final int limit) {
final BytesValue keccak256 = Hash.keccak256(target);
return getAllPeers().stream()
.filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this status filtering being removed?

.sorted(comparingInt((peer) -> distance(peer.keccak256(), keccak256)))
.limit(limit)
.collect(toList());
Expand Down
Loading