Skip to content

Commit

Permalink
Drop messages that exceeds local message size limit
Browse files Browse the repository at this point in the history
Signed-off-by: Rodion Lim <rodion.lim@hotmail.com>
  • Loading branch information
rodionlim committed Aug 23, 2024
1 parent 58bb931 commit c790598
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 27 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
- Fix tracing in precompiled contracts when halting for out of gas [#7318](https://github.com/hyperledger/besu/issues/7318)
- Correctly release txpool save and restore lock in case of exceptions [#7473](https://github.com/hyperledger/besu/pull/7473)
- Fix for `eth_gasPrice` could not retrieve block error [#7482](https://github.com/hyperledger/besu/pull/7482)

- Correctly drops messages that exceeds local message size limit [#5455](https://github.com/hyperledger/besu/pull/7507)

## 24.8.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.testutil.DeterministicEthScheduler;
import org.hyperledger.besu.testutil.TestClock;
import org.hyperledger.besu.util.number.ByteUnits;

import java.math.BigInteger;
import java.time.ZoneId;
Expand Down Expand Up @@ -154,7 +155,7 @@ public void setUp() {
.when(mockWorldStateArchive.getMutable(any(), anyBoolean()))
.thenReturn(Optional.of(mockWorldState));

blockBroadcaster = new BlockBroadcaster(mockEthContext);
blockBroadcaster = new BlockBroadcaster(mockEthContext, 10 * ByteUnits.MEGABYTE);
syncState = new SyncState(blockchain, mockEthPeers);
TransactionPoolConfiguration txPoolConfig =
ImmutableTransactionPoolConfiguration.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,13 @@ public RequestManager.ResponseStream send(
if (messageData.getSize() > maxMessageSize) {
// This is a bug or else a misconfiguration of the max message size.
LOG.error(
"Sending {} message to peer ({}) which exceeds local message size limit of {} bytes. Message code: {}, Message Size: {}",
"Dropping {} message to peer ({}) which exceeds local message size limit of {} bytes. Message code: {}, Message Size: {}",
protocolName,
this,
maxMessageSize,
messageData.getCode(),
messageData.getSize());
return null;
}

if (requestManagers.containsKey(protocolName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public EthProtocolManager(
this.ethMessages = ethMessages;
this.ethContext = ethContext;

this.blockBroadcaster = new BlockBroadcaster(ethContext);
this.blockBroadcaster =
new BlockBroadcaster(ethContext, ethereumWireProtocolConfiguration.getMaxMessageSize());

this.supportedCapabilities =
calculateCapabilities(synchronizerConfiguration, ethereumWireProtocolConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,20 @@ public int getCode() {
return MESSAGE_CODE;
}

public static NewBlockMessage create(final Block block, final Difficulty totalDifficulty) {
public static NewBlockMessage create(
final Block block, final Difficulty totalDifficulty, final int maxMessageSize)
throws IllegalArgumentException {
final NewBlockMessageData msgData = new NewBlockMessageData(block, totalDifficulty);
final BytesValueRLPOutput out = new BytesValueRLPOutput();
msgData.writeTo(out);
return new NewBlockMessage(out.encoded());
final Bytes data = out.encoded();
if (data.size() > maxMessageSize) {
throw new IllegalArgumentException(
String.format(
"Block message size %d is larger than allowed message size %d",
data.size(), maxMessageSize));
}
return new NewBlockMessage(data);
}

public static NewBlockMessage readFrom(final MessageData message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ public class BlockBroadcaster {
private static final Logger LOG = LoggerFactory.getLogger(BlockBroadcaster.class);

private final EthContext ethContext;
private final int maxMessageSize;
private final Subscribers<BlockPropagatedSubscriber> blockPropagatedSubscribers =
Subscribers.create();

public BlockBroadcaster(final EthContext ethContext) {
public BlockBroadcaster(final EthContext ethContext, final int maxMessageSize) {
this.ethContext = ethContext;
this.maxMessageSize = maxMessageSize;
}

public long subscribePropagateNewBlocks(final BlockPropagatedSubscriber callback) {
Expand All @@ -45,7 +47,13 @@ public void unsubscribePropagateNewBlocks(final long id) {

public void propagate(final Block block, final Difficulty totalDifficulty) {
blockPropagatedSubscribers.forEach(listener -> listener.accept(block, totalDifficulty));
final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty);
final NewBlockMessage newBlockMessage;
try {
newBlockMessage = NewBlockMessage.create(block, totalDifficulty, this.maxMessageSize);
} catch (final IllegalArgumentException e) {
LOG.error("Failed to create block", e);
return;
}
ethContext
.getEthPeers()
.streamAvailablePeers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,23 @@
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.RawMessage;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.util.number.ByteUnits;

import org.apache.tuweni.bytes.Bytes;
import org.junit.jupiter.api.Test;

public class NewBlockMessageTest {
private static final ProtocolSchedule protocolSchedule = ProtocolScheduleFixture.MAINNET;
private static final int maxMessageSize = 10 * ByteUnits.MEGABYTE;

@Test
public void roundTripNewBlockMessage() {
final Difficulty totalDifficulty = Difficulty.of(98765);
final BlockDataGenerator blockGenerator = new BlockDataGenerator();
final Block blockForInsertion = blockGenerator.block();

final NewBlockMessage msg = NewBlockMessage.create(blockForInsertion, totalDifficulty);
final NewBlockMessage msg =
NewBlockMessage.create(blockForInsertion, totalDifficulty, maxMessageSize);
assertThat(msg.getCode()).isEqualTo(EthPV62.NEW_BLOCK);
assertThat(msg.totalDifficulty(protocolSchedule)).isEqualTo(totalDifficulty);
final Block extractedBlock = msg.block(protocolSchedule);
Expand Down Expand Up @@ -73,4 +76,14 @@ public void readFromMessageWithWrongCodeThrows() {
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> NewBlockMessage.readFrom(rawMsg));
}

@Test
public void createBlockMessageLargerThanLimitThrows() {
final Difficulty totalDifficulty = Difficulty.of(98765);
final BlockDataGenerator blockGenerator = new BlockDataGenerator();
final Block newBlock = blockGenerator.block();

assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> NewBlockMessage.create(newBlock, totalDifficulty, 1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.storage.DataStorageFormat;
import org.hyperledger.besu.testutil.TestClock;
import org.hyperledger.besu.util.number.ByteUnits;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -95,6 +96,7 @@ public abstract class AbstractBlockPropagationManagerTest {
protected SyncState syncState;
protected final MetricsSystem metricsSystem = new NoOpMetricsSystem();
private final Hash finalizedHash = Hash.fromHexStringLenient("0x1337");
private final int maxMessageSize = 10 * ByteUnits.MEGABYTE;

protected void setup(final DataStorageFormat dataStorageFormat) {
blockchainUtil = BlockchainSetupUtil.forTesting(dataStorageFormat);
Expand Down Expand Up @@ -222,11 +224,14 @@ public void importsAnnouncedNewBlocks_aheadOfChainInOrder() {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final NewBlockMessage nextAnnouncement =
NewBlockMessage.create(
nextBlock, getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get());
nextBlock,
getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get(),
maxMessageSize);
final NewBlockMessage nextNextAnnouncement =
NewBlockMessage.create(
nextNextBlock,
getFullBlockchain().getTotalDifficultyByHash(nextNextBlock.getHash()).get());
getFullBlockchain().getTotalDifficultyByHash(nextNextBlock.getHash()).get(),
maxMessageSize);
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());

// Broadcast first message
Expand Down Expand Up @@ -256,11 +261,14 @@ public void importsAnnouncedNewBlocks_aheadOfChainOutOfOrder() {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final NewBlockMessage nextAnnouncement =
NewBlockMessage.create(
nextBlock, getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get());
nextBlock,
getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get(),
maxMessageSize);
final NewBlockMessage nextNextAnnouncement =
NewBlockMessage.create(
nextNextBlock,
getFullBlockchain().getTotalDifficultyByHash(nextNextBlock.getHash()).get());
getFullBlockchain().getTotalDifficultyByHash(nextNextBlock.getHash()).get(),
maxMessageSize);
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());

// Broadcast second message first
Expand Down Expand Up @@ -299,15 +307,19 @@ public void importsMixedOutOfOrderMessages() {
block1.getHash(), block1.getHeader().getNumber())));
final NewBlockMessage block2Msg =
NewBlockMessage.create(
block2, getFullBlockchain().getTotalDifficultyByHash(block2.getHash()).get());
block2,
getFullBlockchain().getTotalDifficultyByHash(block2.getHash()).get(),
maxMessageSize);
final NewBlockHashesMessage block3Msg =
NewBlockHashesMessage.create(
Collections.singletonList(
new NewBlockHashesMessage.NewBlockHash(
block3.getHash(), block3.getHeader().getNumber())));
final NewBlockMessage block4Msg =
NewBlockMessage.create(
block4, getFullBlockchain().getTotalDifficultyByHash(block4.getHash()).get());
block4,
getFullBlockchain().getTotalDifficultyByHash(block4.getHash()).get(),
maxMessageSize);
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());

// Broadcast older blocks
Expand Down Expand Up @@ -362,7 +374,9 @@ public void handlesDuplicateAnnouncements() {
nextBlock.getHash(), nextBlock.getHeader().getNumber())));
final NewBlockMessage newBlock =
NewBlockMessage.create(
nextBlock, getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get());
nextBlock,
getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get(),
maxMessageSize);
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());

// Broadcast first message
Expand Down Expand Up @@ -413,7 +427,9 @@ public void handlesPendingDuplicateAnnouncements() {
nextBlock.getHash(), nextBlock.getHeader().getNumber())));
final NewBlockMessage newBlock =
NewBlockMessage.create(
nextBlock, getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get());
nextBlock,
getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get(),
maxMessageSize);

// Broadcast messages
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, newBlock);
Expand Down Expand Up @@ -467,7 +483,9 @@ public void ignoresFutureNewBlockAnnouncement() {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final NewBlockMessage futureAnnouncement =
NewBlockMessage.create(
futureBlock, getFullBlockchain().getTotalDifficultyByHash(futureBlock.getHash()).get());
futureBlock,
getFullBlockchain().getTotalDifficultyByHash(futureBlock.getHash()).get(),
maxMessageSize);

// Broadcast
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, futureAnnouncement);
Expand Down Expand Up @@ -522,7 +540,8 @@ public void ignoresOldNewBlockAnnouncement() {

// Setup peer and messages
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final NewBlockMessage oldAnnouncement = NewBlockMessage.create(oldBlock, Difficulty.ZERO);
final NewBlockMessage oldAnnouncement =
NewBlockMessage.create(oldBlock, Difficulty.ZERO, maxMessageSize);

// Broadcast
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, oldAnnouncement);
Expand Down Expand Up @@ -559,7 +578,7 @@ public void purgesOldBlocks() {
blockPropagationManager.start();
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final NewBlockMessage blockAnnouncementMsg =
NewBlockMessage.create(blockToPurge, Difficulty.ZERO);
NewBlockMessage.create(blockToPurge, Difficulty.ZERO, maxMessageSize);

// Broadcast
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, blockAnnouncementMsg);
Expand Down Expand Up @@ -597,7 +616,8 @@ public void updatesChainHeadWhenNewBlockMessageReceived() {
getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHeader().getParentHash()).get();
final Difficulty totalDifficulty =
getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get();
final NewBlockMessage nextAnnouncement = NewBlockMessage.create(nextBlock, totalDifficulty);
final NewBlockMessage nextAnnouncement =
NewBlockMessage.create(nextBlock, totalDifficulty, maxMessageSize);

// Broadcast message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, nextAnnouncement);
Expand Down Expand Up @@ -735,7 +755,8 @@ public void verifyBroadcastBlockInvocation() {

final Difficulty totalDifficulty =
getFullBlockchain().getTotalDifficultyByHash(block.getHash()).get();
final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty);
final NewBlockMessage newBlockMessage =
NewBlockMessage.create(block, totalDifficulty, maxMessageSize);

// Broadcast message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, newBlockMessage);
Expand Down Expand Up @@ -933,7 +954,9 @@ public void shouldNotListenToNewBlockAnnouncementsWhenTTDReachedAndFinal() {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final NewBlockMessage nextAnnouncement =
NewBlockMessage.create(
nextBlock, getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get());
nextBlock,
getFullBlockchain().getTotalDifficultyByHash(nextBlock.getHash()).get(),
maxMessageSize);
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());

syncState.setReachedTerminalDifficulty(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.messages.NewBlockMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.util.number.ByteUnits;

import java.util.Collections;
import java.util.stream.Stream;
Expand All @@ -38,6 +39,8 @@

public class BlockBroadcasterTest {

final int maxMessageSize = 10 * ByteUnits.MEGABYTE;

@Test
public void blockPropagationUnitTest() throws PeerConnection.PeerNotConnected {
final EthPeer ethPeer = mock(EthPeer.class);
Expand All @@ -47,10 +50,10 @@ public void blockPropagationUnitTest() throws PeerConnection.PeerNotConnected {
final EthContext ethContext = mock(EthContext.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);

final BlockBroadcaster blockBroadcaster = new BlockBroadcaster(ethContext);
final BlockBroadcaster blockBroadcaster = new BlockBroadcaster(ethContext, maxMessageSize);
final Block block = generateBlock();
final NewBlockMessage newBlockMessage =
NewBlockMessage.create(block, block.getHeader().getDifficulty());
NewBlockMessage.create(block, block.getHeader().getDifficulty(), maxMessageSize);

blockBroadcaster.propagate(block, Difficulty.ZERO);

Expand All @@ -70,10 +73,10 @@ public void blockPropagationUnitTestSeenUnseen() throws PeerConnection.PeerNotCo
final EthContext ethContext = mock(EthContext.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);

final BlockBroadcaster blockBroadcaster = new BlockBroadcaster(ethContext);
final BlockBroadcaster blockBroadcaster = new BlockBroadcaster(ethContext, maxMessageSize);
final Block block = generateBlock();
final NewBlockMessage newBlockMessage =
NewBlockMessage.create(block, block.getHeader().getDifficulty());
NewBlockMessage.create(block, block.getHeader().getDifficulty(), maxMessageSize);

blockBroadcaster.propagate(block, Difficulty.ZERO);

Expand Down

0 comments on commit c790598

Please sign in to comment.