Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change maxPrunePeers and maxPeersPerPruneMessage usage #336

Merged
merged 7 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt
Original file line number Diff line number Diff line change
Expand Up @@ -193,17 +193,17 @@ data class GossipParams(
val maxGraftMessages: Int? = null,

/**
* [maxPrunePeers] controls the number of peers to include in prune Peer eXchange.
* [maxPeersSentInPruneMsg] controls the number of peers to include in prune Peer eXchange.
* When we prune a peer that's eligible for PX (has a good score, etc), we will try to
* send them signed peer records for up to [maxPrunePeers] other peers that we
* send them signed peer records for up to [maxPeersSentInPruneMsg] other peers that we
* know of.
*/
val maxPrunePeers: Int = 16,
val maxPeersSentInPruneMsg: Int = 16,

/**
* [maxPeersPerPruneMessage] is the maximum number of peers allowed in an incoming prune message
* [maxPeersAcceptedInPruneMsg] is the maximum number of peers allowed in an incoming prune message
*/
val maxPeersPerPruneMessage: Int? = null,
val maxPeersAcceptedInPruneMsg: Int = 16,

/**
* [pruneBackoff] controls the backoff time for pruned peers. This is how long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ open class GossipRouter(
params.maxIWantMessageIds?.let { iWantMessageIdCount <= it } ?: true &&
params.maxGraftMessages?.let { (msg.control?.graftCount ?: 0) <= it } ?: true &&
params.maxPruneMessages?.let { (msg.control?.pruneCount ?: 0) <= it } ?: true &&
params.maxPeersPerPruneMessage?.let { msg.control?.pruneList?.none { p -> p.peersCount > it } } ?: true
params.maxPeersAcceptedInPruneMsg.let { msg.control?.pruneList?.none { p -> p.peersCount > it } } ?: true
}

private fun processControlMessage(controlMsg: Any, receivedFrom: PeerHandler) {
Expand Down Expand Up @@ -349,7 +349,7 @@ open class GossipRouter(
}

private fun processPrunePeers(peersList: List<Rpc.PeerInfo>) {
peersList.shuffled(random).take(params.maxPrunePeers)
peersList.shuffled(random).take(params.maxPeersAcceptedInPruneMsg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: that take() call seems to have no effect (as a message exceeding the limit should be dropped before), but I would probably leave it just to be safe in case of future modifications

.map { PeerId(it.peerID.toByteArray()) to it.signedPeerRecord.toByteArray() }
.filter { (id, _) -> !isConnected(id) }
.forEach { (id, record) -> params.connectCallback(id, record) }
Expand Down Expand Up @@ -572,6 +572,7 @@ open class GossipRouter(
val peerQueue = pendingRpcParts.getQueue(peer)
if (peer.getPeerProtocol() == PubsubProtocol.Gossip_V_1_1 && this.protocol == PubsubProtocol.Gossip_V_1_1) {
val backoffPeers = (getTopicPeers(topic) - peer)
.take(params.maxPeersSentInPruneMsg)
.filter { score.score(it.peerId) >= 0 }
.map { it.peerId }
peerQueue.addPrune(topic, params.pruneBackoff.seconds, backoffPeers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ class GossipParamsBuilder {

private var seenTTL: Duration? = null

private var maxPrunePeers: Int? = null
private var maxPeersSentInPruneMsg: Int? = null

private var maxPeersPerPruneMessage: Int? = null
private var maxPeersAcceptedInPruneMsg: Int? = null

private var pruneBackoff: Duration? = null

Expand Down Expand Up @@ -81,8 +81,8 @@ class GossipParamsBuilder {
this.gossipHistoryLength = source.gossipHistoryLength
this.heartbeatInterval = source.heartbeatInterval
this.seenTTL = source.seenTTL
this.maxPrunePeers = source.maxPrunePeers
this.maxPeersPerPruneMessage = source.maxPeersPerPruneMessage
this.maxPeersSentInPruneMsg = source.maxPeersSentInPruneMsg
this.maxPeersAcceptedInPruneMsg = source.maxPeersAcceptedInPruneMsg
this.pruneBackoff = source.pruneBackoff
this.floodPublish = source.floodPublish
this.gossipFactor = source.gossipFactor
Expand Down Expand Up @@ -126,9 +126,9 @@ class GossipParamsBuilder {

fun seenTTL(value: Duration): GossipParamsBuilder = apply { seenTTL = value }

fun maxPrunePeers(value: Int): GossipParamsBuilder = apply { maxPrunePeers = value }
fun maxPeersSentInPruneMsg(value: Int): GossipParamsBuilder = apply { maxPeersSentInPruneMsg = value }

fun maxPeersPerPruneMessage(value: Int): GossipParamsBuilder = apply { maxPeersPerPruneMessage = value }
fun maxPeersAcceptedInPruneMsg(value: Int): GossipParamsBuilder = apply { maxPeersAcceptedInPruneMsg = value }

fun pruneBackoff(value: Duration): GossipParamsBuilder = apply { pruneBackoff = value }

Expand Down Expand Up @@ -201,8 +201,8 @@ class GossipParamsBuilder {
maxIWantMessageIds = maxIWantMessageIds,
iWantFollowupTime = iWantFollowupTime!!,
maxGraftMessages = maxGraftMessages,
maxPrunePeers = maxPrunePeers!!,
maxPeersPerPruneMessage = maxPeersPerPruneMessage,
maxPeersSentInPruneMsg = maxPeersSentInPruneMsg!!,
maxPeersAcceptedInPruneMsg = maxPeersAcceptedInPruneMsg!!,
pruneBackoff = pruneBackoff!!,
maxPruneMessages = maxPruneMessages,
gossipRetransmission = gossipRetransmission!!,
Expand Down Expand Up @@ -232,7 +232,7 @@ class GossipParamsBuilder {
check(gossipHistoryLength != null, { "gossipHistoryLength must not be null" })
check(heartbeatInterval != null, { "heartbeatInterval must not be null" })
check(seenTTL != null, { "seenTTL must not be null" })
check(maxPrunePeers != null, { "maxPrunePeers must not be null" })
check(maxPeersSentInPruneMsg != null, { "maxPeersSentInPruneMsg must not be null" })
check(pruneBackoff != null, { "pruneBackoff must not be null" })
check(floodPublish != null, { "floodPublish must not be null" })
check(gossipFactor != null, { "gossipFactor must not be null" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class GossipRouterListLimitsTest {
private val maxIWantMessageIds = 14
private val maxGraftMessages = 15
private val maxPruneMessages = 16
private val maxPeersPerPruneMessage = 17
private val maxPeersAcceptedInPruneMsg = 17

private val gossipParamsWithLimits = GossipParamsBuilder()
.maxPublishedMessages(maxPublishedMessages)
Expand All @@ -25,7 +25,7 @@ class GossipRouterListLimitsTest {
.maxIWantMessageIds(maxIWantMessageIds)
.maxGraftMessages(maxGraftMessages)
.maxPruneMessages(maxPruneMessages)
.maxPeersPerPruneMessage(maxPeersPerPruneMessage)
.maxPeersAcceptedInPruneMsg(maxPeersAcceptedInPruneMsg)
.build()

private val gossipParamsNoLimits = GossipParamsBuilder()
Expand All @@ -44,7 +44,7 @@ class GossipRouterListLimitsTest {

@Test
fun validateProtobufLists_validMessageWithLargeLists_noLimits() {
val msg = fullMsgBuilder(20).build()
val msg = fullMsgBuilder(16).build()

Assertions.assertThat(routerWithNoLimits.validateMessageListLimits(msg)).isTrue()
}
Expand Down Expand Up @@ -148,9 +148,9 @@ class GossipRouterListLimitsTest {
}

@Test
fun validateProtobufLists_tooManyPrunePeers() {
fun validateProtobufLists_tooManyPeersToAcceptInPruneMsg() {
val builder = fullMsgBuilder()
builder.addPrunes(1, maxPeersPerPruneMessage + 1)
builder.addPrunes(1, maxPeersAcceptedInPruneMsg + 1)
val msg = builder.build()

Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isFalse()
Expand Down Expand Up @@ -238,9 +238,9 @@ class GossipRouterListLimitsTest {
}

@Test
fun validateProtobufLists_maxPrunePeers() {
fun validateProtobufLists_maxPeersAcceptedInPruneMsg() {
val builder = fullMsgBuilder()
builder.addPrunes(1, maxPeersPerPruneMessage - 1)
builder.addPrunes(1, maxPeersAcceptedInPruneMsg - 1)
val msg = builder.build()

Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package io.libp2p.pubsub.gossip

import com.google.common.util.concurrent.AtomicDouble
import com.google.protobuf.ByteString
import io.libp2p.core.PeerId
import io.libp2p.core.pubsub.MessageApi
import io.libp2p.core.pubsub.RESULT_IGNORE
Expand Down Expand Up @@ -102,7 +103,7 @@ class GossipV1_1Tests {
class TwoRoutersTest(
val coreParams: GossipParams = GossipParams(),
val scoreParams: GossipScoreParams = GossipScoreParams(),
mockRouterFactory: DeterministicFuzzRouterFactory = createMockFuzzRouterFactory()
val mockRouterFactory: DeterministicFuzzRouterFactory = createMockFuzzRouterFactory()
) {
val fuzz = DeterministicFuzz()
val gossipRouterBuilderFactory = { GossipRouterBuilder(params = coreParams, scoreParams = scoreParams) }
Expand Down Expand Up @@ -1140,4 +1141,75 @@ class GossipV1_1Tests {
assertEquals(5, iWandIds1.size)
assertEquals(5, iWandIds1.distinct().size)
}

@Test
fun testMaxPeersSentInPruneMsg() {
val test = TwoRoutersTest()

val topic = "topic1"
test.mockRouter.subscribe(topic)
test.gossipRouter.subscribe(topic)

for (i in 0..20) {
val router = test.fuzz.createTestRouter(test.mockRouterFactory)
(router.router as MockRouter).subscribe(topic)
test.router1.connectSemiDuplex(router, null, LogLevel.ERROR)
}

// 2 heartbeats - the topic should be GRAFTed
test.fuzz.timeController.addTime(2.seconds)
test.mockRouter.waitForMessage { it.hasControl() && it.control.graftCount > 0 }

test.gossipRouter.unsubscribe(topic)
test.fuzz.timeController.addTime(2.seconds)
assertEquals(
1,
test.mockRouter.inboundMessages.count {
it.hasControl() && it.control.pruneCount == 1 &&
it.control.getPrune(0).peersCount == test.gossipRouter.params.maxPeersSentInPruneMsg
}
)
}

@Test
fun testMaxPeersAcceptedInPruneMsg() {
val test = TwoRoutersTest()
val topic = "topic1"

test.mockRouter.subscribe(topic)
test.gossipRouter.subscribe(topic)

// 2 heartbeats - the topic should be GRAFTed
test.fuzz.timeController.addTime(2.seconds)

fun createPruneMessage(peersCount: Int): Rpc.RPC {
val peerInfos = List(peersCount) {
Rpc.PeerInfo.newBuilder()
.setPeerID(PeerId.random().bytes.toProtobuf())
.setSignedPeerRecord(ByteString.EMPTY)
.build()
}
return Rpc.RPC.newBuilder().setControl(
Rpc.ControlMessage.newBuilder().addPrune(
Rpc.ControlPrune.newBuilder()
.setTopicID(topic)
.addAllPeers(peerInfos)
)
).build()
}

test.mockRouter.sendToSingle(
createPruneMessage(test.gossipRouter.params.maxPeersAcceptedInPruneMsg + 1)
)

// prune message should be dropped because too many peers
assertEquals(1, test.gossipRouter.mesh[topic]!!.size)

test.mockRouter.sendToSingle(
createPruneMessage(test.gossipRouter.params.maxPeersAcceptedInPruneMsg)
)

// prune message should now be processed
assertEquals(0, test.gossipRouter.mesh[topic]!!.size)
}
}