Skip to content

Commit

Permalink
Merge branch 'main' into feature/fix-snapsync
Browse files Browse the repository at this point in the history
  • Loading branch information
matkt authored Oct 1, 2024
2 parents c570706 + d081c17 commit b490c6d
Show file tree
Hide file tree
Showing 19 changed files with 185 additions and 193 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- Remove privacy test classes support [#7569](https://github.com/hyperledger/besu/pull/7569)
- Add Blob Transaction Metrics [#7622](https://github.com/hyperledger/besu/pull/7622)
- Implemented support for emptyBlockPeriodSeconds in QBFT [#6965](https://github.com/hyperledger/besu/pull/6965)
- LUKSO Cancun Hardfork [#7686](https://github.com/hyperledger/besu/pull/7686)
- Add configuration of Consolidation Request Contract Address via genesis configuration [#7647](https://github.com/hyperledger/besu/pull/7647)
- Interrupt pending transaction processing on block creation timeout [#7673](https://github.com/hyperledger/besu/pull/7673)

Expand Down
2 changes: 1 addition & 1 deletion config/src/main/resources/lukso.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"terminalTotalDifficulty": 0,
"terminalTotalDifficultyPassed": true,
"shanghaiTime": 1687969198,
"cancunTime": 1767182400,
"cancunTime": 1732119595,
"discovery": {
"bootnodes": [
"enode://c2bb19ce658cfdf1fecb45da599ee6c7bf36e5292efb3fb61303a0b2cd07f96c20ac9b376a464d687ac456675a2e4a44aec39a0509bcb4b6d8221eedec25aca2@34.147.73.193:30303",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import org.hyperledger.besu.ethereum.forkid.ForkId;
import org.hyperledger.besu.ethereum.p2p.peers.DefaultPeer;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import org.hyperledger.besu.ethereum.rlp.RLPInput;
import org.hyperledger.besu.ethereum.rlp.RLPOutput;
import org.hyperledger.besu.plugin.data.EnodeURL;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.tuweni.bytes.Bytes;
import org.ethereum.beacon.discovery.schema.NodeRecord;
Expand All @@ -37,9 +37,7 @@ public class DiscoveryPeer extends DefaultPeer {
private final Endpoint endpoint;

// Timestamps.
private long firstDiscovered = 0;
private long lastContacted = 0;
private long lastSeen = 0;
private final AtomicLong firstDiscovered = new AtomicLong(0L);
private long lastAttemptedConnection = 0;

private NodeRecord nodeRecord;
Expand Down Expand Up @@ -96,20 +94,11 @@ public void setStatus(final PeerDiscoveryStatus status) {
}

public long getFirstDiscovered() {
return firstDiscovered;
return firstDiscovered.get();
}

public PeerId setFirstDiscovered(final long firstDiscovered) {
this.firstDiscovered = firstDiscovered;
return this;
}

public long getLastContacted() {
return lastContacted;
}

public void setLastContacted(final long lastContacted) {
this.lastContacted = lastContacted;
public void setFirstDiscovered(final long firstDiscovered) {
this.firstDiscovered.compareAndExchange(0L, firstDiscovered);
}

public long getLastAttemptedConnection() {
Expand All @@ -120,14 +109,6 @@ public void setLastAttemptedConnection(final long lastAttemptedConnection) {
this.lastAttemptedConnection = lastAttemptedConnection;
}

public long getLastSeen() {
return lastSeen;
}

public void setLastSeen(final long lastSeen) {
this.lastSeen = lastSeen;
}

public Endpoint getEndpoint() {
return endpoint;
}
Expand Down Expand Up @@ -163,8 +144,6 @@ public String toString() {
sb.append("status=").append(status);
sb.append(", enode=").append(this.getEnodeURL());
sb.append(", firstDiscovered=").append(firstDiscovered);
sb.append(", lastContacted=").append(lastContacted);
sb.append(", lastSeen=").append(lastSeen);
sb.append('}');
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,7 @@ protected void handleOutgoingPacket(final DiscoveryPeer peer, final Packet packe
(res, err) -> {
if (err != null) {
handleOutgoingPacketError(err, peer, packet);
return;
}
peer.setLastContacted(System.currentTimeMillis());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ public enum PeerDiscoveryStatus {
* We have successfully bonded with this {@link DiscoveryPeer}, and we are able to exchange
* messages with them.
*/
BONDED,

/** We have requested the ENR record from this {@link DiscoveryPeer} */
ENR_REQUESTED;
BONDED;

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissions;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
Expand All @@ -43,6 +44,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand Down Expand Up @@ -321,7 +323,6 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) {
switch (packet.getType()) {
case PING:
if (peerPermissions.allowInboundBonding(peer)) {
peer.setLastSeen(System.currentTimeMillis());
final PingPacketData ping = packet.getPacketData(PingPacketData.class).get();
if (!PeerDiscoveryStatus.BONDED.equals(peer.getStatus())
&& (bondingPeers.getIfPresent(sender.getId()) == null)) {
Expand All @@ -338,7 +339,7 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) {
requestENR(peer);
}
bondingPeers.invalidate(peerId);
addToPeerTable(peer);
checkBeforeAddingToPeerTable(peer);
recursivePeerRefreshState.onBondingComplete(peer);
Optional.ofNullable(cachedEnrRequests.getIfPresent(peerId))
.ifPresent(cachedEnrRequest -> processEnrRequest(peer, cachedEnrRequest));
Expand Down Expand Up @@ -405,38 +406,45 @@ private List<DiscoveryPeer> getPeersFromNeighborsPacket(final Packet packet) {
.collect(Collectors.toList());
}

private boolean addToPeerTable(final DiscoveryPeer peer) {
final PeerTable.AddResult result = peerTable.tryAdd(peer);
if (result.getOutcome() != PeerTable.AddResult.AddOutcome.INVALID) {

// Reset the last seen timestamp.
final long now = System.currentTimeMillis();
if (peer.getFirstDiscovered() == 0) {
peer.setFirstDiscovered(now);
}
peer.setLastSeen(now);
private void checkBeforeAddingToPeerTable(final DiscoveryPeer peer) {
if (peerTable.isIpAddressInvalid(peer.getEndpoint())) {
return;
}

if (peer.getStatus() != PeerDiscoveryStatus.BONDED) {
peer.setStatus(PeerDiscoveryStatus.BONDED);
connectOnRlpxLayer(peer);
}
if (peer.getFirstDiscovered() == 0L) {
connectOnRlpxLayer(peer)
.whenComplete(
(pc, th) -> {
if (th == null || !(th.getCause() instanceof TimeoutException)) {
peer.setStatus(PeerDiscoveryStatus.BONDED);
peer.setFirstDiscovered(System.currentTimeMillis());
addToPeerTable(peer);
} else {
LOG.debug("Handshake timed out with peer {}", peer.getLoggableId(), th);
peerTable.invalidateIP(peer.getEndpoint());
}
});
} else {
peer.setStatus(PeerDiscoveryStatus.BONDED);
addToPeerTable(peer);
}
}

if (result.getOutcome() == PeerTable.AddResult.AddOutcome.ALREADY_EXISTED) {
// Bump peer.
peerTable.tryEvict(peer);
peerTable.tryAdd(peer);
} else if (result.getOutcome() == PeerTable.AddResult.AddOutcome.BUCKET_FULL) {
peerTable.tryEvict(result.getEvictionCandidate());
peerTable.tryAdd(peer);
}
public void addToPeerTable(final DiscoveryPeer peer) {
final PeerTable.AddResult result = peerTable.tryAdd(peer);

return true;
if (result.getOutcome() == PeerTable.AddResult.AddOutcome.ALREADY_EXISTED) {
// Bump peer.
peerTable.tryEvict(peer);
peerTable.tryAdd(peer);
} else if (result.getOutcome() == PeerTable.AddResult.AddOutcome.BUCKET_FULL) {
peerTable.tryEvict(result.getEvictionCandidate());
peerTable.tryAdd(peer);
}
return false;
}

void connectOnRlpxLayer(final DiscoveryPeer peer) {
rlpxAgent.connect(peer);
CompletableFuture<PeerConnection> connectOnRlpxLayer(final DiscoveryPeer peer) {
return rlpxAgent.connect(peer);
}

private Optional<PeerInteractionState> matchInteraction(final Packet packet) {
Expand Down Expand Up @@ -512,7 +520,6 @@ void bond(final DiscoveryPeer peer) {
return;
}

peer.setFirstDiscovered(System.currentTimeMillis());
peer.setStatus(PeerDiscoveryStatus.BONDING);
bondingPeers.put(peer.getId(), peer);

Expand Down Expand Up @@ -719,7 +726,7 @@ public void handleBondingRequest(final DiscoveryPeer peer) {

// Load the peer first from the table, then from bonding cache or use the instance that comes in.
private DiscoveryPeer resolvePeer(final DiscoveryPeer peer) {
if (peerTable.ipAddressIsInvalid(peer.getEndpoint())) {
if (peerTable.isIpAddressInvalid(peer.getEndpoint())) {
return null;
}
final Optional<DiscoveryPeer> maybeKnownPeer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public Optional<DiscoveryPeer> get(final PeerId peer) {
* @see AddOutcome
*/
public AddResult tryAdd(final DiscoveryPeer peer) {
if (ipAddressIsInvalid(peer.getEndpoint())) {
if (isIpAddressInvalid(peer.getEndpoint())) {
return AddResult.invalid();
}
final Bytes id = peer.getId();
Expand Down Expand Up @@ -212,7 +212,7 @@ public Stream<DiscoveryPeer> streamAllPeers() {
return Arrays.stream(table).flatMap(e -> e.getPeers().stream());
}

boolean ipAddressIsInvalid(final Endpoint endpoint) {
public boolean isIpAddressInvalid(final Endpoint endpoint) {
final String key = getKey(endpoint);
if (invalidIPs.contains(key)) {
return true;
Expand All @@ -223,21 +223,21 @@ boolean ipAddressIsInvalid(final Endpoint endpoint) {
for (final Bucket bucket : table) {
bucket.getPeers().stream()
.filter(p -> p.getEndpoint().getHost().equals(endpoint.getHost()))
.forEach(p -> evictAndStore(p, bucket, key));
.forEach(bucket::evict);
}
return true;
} else {
return false;
}
}

private void evictAndStore(final DiscoveryPeer peer, final Bucket bucket, final String key) {
bucket.evict(peer);
public void invalidateIP(final Endpoint endpoint) {
final String key = getKey(endpoint);
invalidIPs.add(key);
}

private static String getKey(final Endpoint endpoint) {
return endpoint.getHost() + endpoint.getFunctionalTcpPort();
return endpoint.getHost() + ":" + endpoint.getFunctionalTcpPort();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private boolean satisfiesMapAdditionCriteria(final DiscoveryPeer discoPeer) {
return !oneTrueMap.containsKey(discoPeer.getId())
&& (initialPeers.contains(discoPeer) || !peerTable.get(discoPeer).isPresent())
&& !discoPeer.getId().equals(localPeer.getId())
&& !peerTable.ipAddressIsInvalid(discoPeer.getEndpoint());
&& !peerTable.isIpAddressInvalid(discoPeer.getEndpoint());
}

void onNeighboursReceived(final DiscoveryPeer peer, final List<DiscoveryPeer> peers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.ethereum.p2p.config.RlpxConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.peers.LocalNode;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
Expand Down Expand Up @@ -174,10 +173,6 @@ public void subscribeIncomingConnect(final ConnectCallback callback) {
public CompletableFuture<PeerConnection> connect(final Peer peer) {
final CompletableFuture<PeerConnection> connectionFuture = new CompletableFuture<>();

if (peer instanceof DiscoveryPeer) {
((DiscoveryPeer) peer).setLastAttemptedConnection(System.currentTimeMillis());
}

final EnodeURL enode = peer.getEnodeURL();
new Bootstrap()
.group(workers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void neighborsPacketFromUnbondedPeerIsDropped() {
}

@Test
public void neighborsPacketLimited() {
public void neighborsPacketLimited() throws InterruptedException {
// Start 20 agents with no bootstrap peers.
final List<MockPeerDiscoveryAgent> otherAgents =
helper.startDiscoveryAgents(20, Collections.emptyList());
Expand All @@ -192,8 +192,9 @@ public void neighborsPacketLimited() {
.map(Optional::get)
.collect(Collectors.toList());

// Start another peer pointing to those 20 agents.
// Start another peer
final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent(otherPeers);

// We used to do a hasSize match but we had issues with duplicate peers getting added to the
// list. By moving to a contains we make sure that all the peers are loaded with tolerance for
// duplicates. If we fix the duplication problem we should use containsExactlyInAnyOrder to
Expand Down Expand Up @@ -222,7 +223,7 @@ public void neighborsPacketLimited() {
final List<IncomingPacket> incomingPackets =
testAgent.getIncomingPackets().stream()
.filter(p -> p.packet.getType().equals(PacketType.NEIGHBORS))
.collect(toList());
.toList();
assertThat(incomingPackets.size()).isEqualTo(1);
final IncomingPacket neighborsPacket = incomingPackets.get(0);
assertThat(neighborsPacket.fromAgent).isEqualTo(agent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,15 @@ public void pongSentUponPing() {
final List<IncomingPacket> otherAgentIncomingPongs =
otherAgent.getIncomingPackets().stream()
.filter(p -> p.packet.getType().equals(PacketType.PONG))
.collect(Collectors.toList());
.toList();
assertThat(otherAgentIncomingPongs.size()).isEqualTo(1);

assertThat(
otherAgentIncomingPongs.get(0).packet.getPacketData(PongPacketData.class).isPresent())
otherAgentIncomingPongs
.getFirst()
.packet
.getPacketData(PongPacketData.class)
.isPresent())
.isTrue();
final PongPacketData pong =
otherAgentIncomingPongs.get(0).packet.getPacketData(PongPacketData.class).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Arrays.asList;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -44,6 +45,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -301,15 +303,12 @@ public MockPeerDiscoveryAgent build() {
final ForkId forkId = new ForkId(Bytes.EMPTY, Bytes.EMPTY);
when(mockForkIdManager.getForkIdForChainHead()).thenReturn(forkId);
when(mockForkIdManager.peerCheck(forkId)).thenReturn(true);
final RlpxAgent rlpxAgent = mock(RlpxAgent.class);
when(rlpxAgent.connect(any()))
.thenReturn(CompletableFuture.failedFuture(new RuntimeException()));
final MockPeerDiscoveryAgent mockPeerDiscoveryAgent =
new MockPeerDiscoveryAgent(
nodeKey,
config,
peerPermissions,
agents,
natService,
mockForkIdManager,
mock(RlpxAgent.class));
nodeKey, config, peerPermissions, agents, natService, mockForkIdManager, rlpxAgent);
mockPeerDiscoveryAgent.getAdvertisedPeer().ifPresent(peer -> peer.setNodeRecord(nodeRecord));

return mockPeerDiscoveryAgent;
Expand Down
Loading

0 comments on commit b490c6d

Please sign in to comment.