Skip to content

Commit

Permalink
Improve finding peers (#7626)
Browse files Browse the repository at this point in the history
* add check before adding peer to peer table

Signed-off-by: stefan.pingel@consensys.net <stefan.pingel@consensys.net>
  • Loading branch information
pinges authored Sep 30, 2024
1 parent c3aa3f4 commit 9c80c9b
Show file tree
Hide file tree
Showing 14 changed files with 147 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import org.hyperledger.besu.ethereum.forkid.ForkId;
import org.hyperledger.besu.ethereum.p2p.peers.DefaultPeer;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import org.hyperledger.besu.ethereum.rlp.RLPInput;
import org.hyperledger.besu.ethereum.rlp.RLPOutput;
import org.hyperledger.besu.plugin.data.EnodeURL;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.tuweni.bytes.Bytes;
import org.ethereum.beacon.discovery.schema.NodeRecord;
Expand All @@ -37,9 +37,7 @@ public class DiscoveryPeer extends DefaultPeer {
private final Endpoint endpoint;

// Timestamps.
private long firstDiscovered = 0;
private long lastContacted = 0;
private long lastSeen = 0;
private final AtomicLong firstDiscovered = new AtomicLong(0L);
private long lastAttemptedConnection = 0;

private NodeRecord nodeRecord;
Expand Down Expand Up @@ -96,20 +94,11 @@ public void setStatus(final PeerDiscoveryStatus status) {
}

public long getFirstDiscovered() {
return firstDiscovered;
return firstDiscovered.get();
}

public PeerId setFirstDiscovered(final long firstDiscovered) {
this.firstDiscovered = firstDiscovered;
return this;
}

public long getLastContacted() {
return lastContacted;
}

public void setLastContacted(final long lastContacted) {
this.lastContacted = lastContacted;
public void setFirstDiscovered(final long firstDiscovered) {
this.firstDiscovered.compareAndExchange(0L, firstDiscovered);
}

public long getLastAttemptedConnection() {
Expand All @@ -120,14 +109,6 @@ public void setLastAttemptedConnection(final long lastAttemptedConnection) {
this.lastAttemptedConnection = lastAttemptedConnection;
}

public long getLastSeen() {
return lastSeen;
}

public void setLastSeen(final long lastSeen) {
this.lastSeen = lastSeen;
}

public Endpoint getEndpoint() {
return endpoint;
}
Expand Down Expand Up @@ -163,8 +144,6 @@ public String toString() {
sb.append("status=").append(status);
sb.append(", enode=").append(this.getEnodeURL());
sb.append(", firstDiscovered=").append(firstDiscovered);
sb.append(", lastContacted=").append(lastContacted);
sb.append(", lastSeen=").append(lastSeen);
sb.append('}');
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,7 @@ protected void handleOutgoingPacket(final DiscoveryPeer peer, final Packet packe
(res, err) -> {
if (err != null) {
handleOutgoingPacketError(err, peer, packet);
return;
}
peer.setLastContacted(System.currentTimeMillis());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ public enum PeerDiscoveryStatus {
* We have successfully bonded with this {@link DiscoveryPeer}, and we are able to exchange
* messages with them.
*/
BONDED,

/** We have requested the ENR record from this {@link DiscoveryPeer} */
ENR_REQUESTED;
BONDED;

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissions;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
Expand All @@ -43,6 +44,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand Down Expand Up @@ -321,7 +323,6 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) {
switch (packet.getType()) {
case PING:
if (peerPermissions.allowInboundBonding(peer)) {
peer.setLastSeen(System.currentTimeMillis());
final PingPacketData ping = packet.getPacketData(PingPacketData.class).get();
if (!PeerDiscoveryStatus.BONDED.equals(peer.getStatus())
&& (bondingPeers.getIfPresent(sender.getId()) == null)) {
Expand All @@ -338,7 +339,7 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) {
requestENR(peer);
}
bondingPeers.invalidate(peerId);
addToPeerTable(peer);
checkBeforeAddingToPeerTable(peer);
recursivePeerRefreshState.onBondingComplete(peer);
Optional.ofNullable(cachedEnrRequests.getIfPresent(peerId))
.ifPresent(cachedEnrRequest -> processEnrRequest(peer, cachedEnrRequest));
Expand Down Expand Up @@ -405,38 +406,45 @@ private List<DiscoveryPeer> getPeersFromNeighborsPacket(final Packet packet) {
.collect(Collectors.toList());
}

private boolean addToPeerTable(final DiscoveryPeer peer) {
final PeerTable.AddResult result = peerTable.tryAdd(peer);
if (result.getOutcome() != PeerTable.AddResult.AddOutcome.INVALID) {

// Reset the last seen timestamp.
final long now = System.currentTimeMillis();
if (peer.getFirstDiscovered() == 0) {
peer.setFirstDiscovered(now);
}
peer.setLastSeen(now);
private void checkBeforeAddingToPeerTable(final DiscoveryPeer peer) {
if (peerTable.isIpAddressInvalid(peer.getEndpoint())) {
return;
}

if (peer.getStatus() != PeerDiscoveryStatus.BONDED) {
peer.setStatus(PeerDiscoveryStatus.BONDED);
connectOnRlpxLayer(peer);
}
if (peer.getFirstDiscovered() == 0L) {
connectOnRlpxLayer(peer)
.whenComplete(
(pc, th) -> {
if (th == null || !(th.getCause() instanceof TimeoutException)) {
peer.setStatus(PeerDiscoveryStatus.BONDED);
peer.setFirstDiscovered(System.currentTimeMillis());
addToPeerTable(peer);
} else {
LOG.debug("Handshake timed out with peer {}", peer.getLoggableId(), th);
peerTable.invalidateIP(peer.getEndpoint());
}
});
} else {
peer.setStatus(PeerDiscoveryStatus.BONDED);
addToPeerTable(peer);
}
}

if (result.getOutcome() == PeerTable.AddResult.AddOutcome.ALREADY_EXISTED) {
// Bump peer.
peerTable.tryEvict(peer);
peerTable.tryAdd(peer);
} else if (result.getOutcome() == PeerTable.AddResult.AddOutcome.BUCKET_FULL) {
peerTable.tryEvict(result.getEvictionCandidate());
peerTable.tryAdd(peer);
}
public void addToPeerTable(final DiscoveryPeer peer) {
final PeerTable.AddResult result = peerTable.tryAdd(peer);

return true;
if (result.getOutcome() == PeerTable.AddResult.AddOutcome.ALREADY_EXISTED) {
// Bump peer.
peerTable.tryEvict(peer);
peerTable.tryAdd(peer);
} else if (result.getOutcome() == PeerTable.AddResult.AddOutcome.BUCKET_FULL) {
peerTable.tryEvict(result.getEvictionCandidate());
peerTable.tryAdd(peer);
}
return false;
}

void connectOnRlpxLayer(final DiscoveryPeer peer) {
rlpxAgent.connect(peer);
CompletableFuture<PeerConnection> connectOnRlpxLayer(final DiscoveryPeer peer) {
return rlpxAgent.connect(peer);
}

private Optional<PeerInteractionState> matchInteraction(final Packet packet) {
Expand Down Expand Up @@ -512,7 +520,6 @@ void bond(final DiscoveryPeer peer) {
return;
}

peer.setFirstDiscovered(System.currentTimeMillis());
peer.setStatus(PeerDiscoveryStatus.BONDING);
bondingPeers.put(peer.getId(), peer);

Expand Down Expand Up @@ -719,7 +726,7 @@ public void handleBondingRequest(final DiscoveryPeer peer) {

// Load the peer first from the table, then from bonding cache or use the instance that comes in.
private DiscoveryPeer resolvePeer(final DiscoveryPeer peer) {
if (peerTable.ipAddressIsInvalid(peer.getEndpoint())) {
if (peerTable.isIpAddressInvalid(peer.getEndpoint())) {
return null;
}
final Optional<DiscoveryPeer> maybeKnownPeer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public Optional<DiscoveryPeer> get(final PeerId peer) {
* @see AddOutcome
*/
public AddResult tryAdd(final DiscoveryPeer peer) {
if (ipAddressIsInvalid(peer.getEndpoint())) {
if (isIpAddressInvalid(peer.getEndpoint())) {
return AddResult.invalid();
}
final Bytes id = peer.getId();
Expand Down Expand Up @@ -212,7 +212,7 @@ public Stream<DiscoveryPeer> streamAllPeers() {
return Arrays.stream(table).flatMap(e -> e.getPeers().stream());
}

boolean ipAddressIsInvalid(final Endpoint endpoint) {
public boolean isIpAddressInvalid(final Endpoint endpoint) {
final String key = getKey(endpoint);
if (invalidIPs.contains(key)) {
return true;
Expand All @@ -223,21 +223,21 @@ boolean ipAddressIsInvalid(final Endpoint endpoint) {
for (final Bucket bucket : table) {
bucket.getPeers().stream()
.filter(p -> p.getEndpoint().getHost().equals(endpoint.getHost()))
.forEach(p -> evictAndStore(p, bucket, key));
.forEach(bucket::evict);
}
return true;
} else {
return false;
}
}

private void evictAndStore(final DiscoveryPeer peer, final Bucket bucket, final String key) {
bucket.evict(peer);
public void invalidateIP(final Endpoint endpoint) {
final String key = getKey(endpoint);
invalidIPs.add(key);
}

private static String getKey(final Endpoint endpoint) {
return endpoint.getHost() + endpoint.getFunctionalTcpPort();
return endpoint.getHost() + ":" + endpoint.getFunctionalTcpPort();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private boolean satisfiesMapAdditionCriteria(final DiscoveryPeer discoPeer) {
return !oneTrueMap.containsKey(discoPeer.getId())
&& (initialPeers.contains(discoPeer) || !peerTable.get(discoPeer).isPresent())
&& !discoPeer.getId().equals(localPeer.getId())
&& !peerTable.ipAddressIsInvalid(discoPeer.getEndpoint());
&& !peerTable.isIpAddressInvalid(discoPeer.getEndpoint());
}

void onNeighboursReceived(final DiscoveryPeer peer, final List<DiscoveryPeer> peers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.ethereum.p2p.config.RlpxConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.peers.LocalNode;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
Expand Down Expand Up @@ -174,10 +173,6 @@ public void subscribeIncomingConnect(final ConnectCallback callback) {
public CompletableFuture<PeerConnection> connect(final Peer peer) {
final CompletableFuture<PeerConnection> connectionFuture = new CompletableFuture<>();

if (peer instanceof DiscoveryPeer) {
((DiscoveryPeer) peer).setLastAttemptedConnection(System.currentTimeMillis());
}

final EnodeURL enode = peer.getEnodeURL();
new Bootstrap()
.group(workers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void neighborsPacketFromUnbondedPeerIsDropped() {
}

@Test
public void neighborsPacketLimited() {
public void neighborsPacketLimited() throws InterruptedException {
// Start 20 agents with no bootstrap peers.
final List<MockPeerDiscoveryAgent> otherAgents =
helper.startDiscoveryAgents(20, Collections.emptyList());
Expand All @@ -192,8 +192,9 @@ public void neighborsPacketLimited() {
.map(Optional::get)
.collect(Collectors.toList());

// Start another peer pointing to those 20 agents.
// Start another peer
final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent(otherPeers);

// We used to do a hasSize match but we had issues with duplicate peers getting added to the
// list. By moving to a contains we make sure that all the peers are loaded with tolerance for
// duplicates. If we fix the duplication problem we should use containsExactlyInAnyOrder to
Expand Down Expand Up @@ -222,7 +223,7 @@ public void neighborsPacketLimited() {
final List<IncomingPacket> incomingPackets =
testAgent.getIncomingPackets().stream()
.filter(p -> p.packet.getType().equals(PacketType.NEIGHBORS))
.collect(toList());
.toList();
assertThat(incomingPackets.size()).isEqualTo(1);
final IncomingPacket neighborsPacket = incomingPackets.get(0);
assertThat(neighborsPacket.fromAgent).isEqualTo(agent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,15 @@ public void pongSentUponPing() {
final List<IncomingPacket> otherAgentIncomingPongs =
otherAgent.getIncomingPackets().stream()
.filter(p -> p.packet.getType().equals(PacketType.PONG))
.collect(Collectors.toList());
.toList();
assertThat(otherAgentIncomingPongs.size()).isEqualTo(1);

assertThat(
otherAgentIncomingPongs.get(0).packet.getPacketData(PongPacketData.class).isPresent())
otherAgentIncomingPongs
.getFirst()
.packet
.getPacketData(PongPacketData.class)
.isPresent())
.isTrue();
final PongPacketData pong =
otherAgentIncomingPongs.get(0).packet.getPacketData(PongPacketData.class).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Arrays.asList;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -44,6 +45,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -301,15 +303,12 @@ public MockPeerDiscoveryAgent build() {
final ForkId forkId = new ForkId(Bytes.EMPTY, Bytes.EMPTY);
when(mockForkIdManager.getForkIdForChainHead()).thenReturn(forkId);
when(mockForkIdManager.peerCheck(forkId)).thenReturn(true);
final RlpxAgent rlpxAgent = mock(RlpxAgent.class);
when(rlpxAgent.connect(any()))
.thenReturn(CompletableFuture.failedFuture(new RuntimeException()));
final MockPeerDiscoveryAgent mockPeerDiscoveryAgent =
new MockPeerDiscoveryAgent(
nodeKey,
config,
peerPermissions,
agents,
natService,
mockForkIdManager,
mock(RlpxAgent.class));
nodeKey, config, peerPermissions, agents, natService, mockForkIdManager, rlpxAgent);
mockPeerDiscoveryAgent.getAdvertisedPeer().ifPresent(peer -> peer.setNodeRecord(nodeRecord));

return mockPeerDiscoveryAgent;
Expand Down
Loading

0 comments on commit 9c80c9b

Please sign in to comment.