From 4849c95eeec4c7c02044c87cd8a3f0d9f2828f4e Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Wed, 15 May 2024 16:58:41 +0300 Subject: [PATCH] Set topicID on outbound IHAVE and ignore inbound IHAVE for unknown topic --- .../kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt | 10 +++++++--- .../io/libp2p/pubsub/gossip/GossipRpcPartsQueue.kt | 11 ++++++----- .../pubsub/gossip/GossipRouterListLimitsTest.kt | 13 ++++++++----- .../kotlin/io/libp2p/tools/protobuf/RpcBuilder.kt | 5 +++-- 4 files changed, 24 insertions(+), 15 deletions(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index b1de2bd05..01901e85a 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -317,6 +317,10 @@ open class GossipRouter( } private fun handleIHave(msg: Rpc.ControlIHave, peer: PeerHandler) { + // we ignore IHAVE for unknown topics + if (msg.hasTopicID() && !mesh.containsKey(msg.topicID)) { + return + } val peerScore = score.score(peer.peerId) // we ignore IHAVE gossip from any peer whose score is below the gossip threshold if (peerScore < scoreParams.gossipThreshold) return @@ -544,7 +548,7 @@ open class GossipRouter( peers.shuffled(random) .take(max((params.gossipFactor * peers.size).toInt(), params.DLazy)) - .forEach { enqueueIhave(it, shuffledMessageIds) } + .forEach { enqueueIhave(it, shuffledMessageIds, topic) } } private fun graft(peer: PeerHandler, topic: Topic) { @@ -587,8 +591,8 @@ open class GossipRouter( private fun enqueueIwant(peer: PeerHandler, messageIds: List) = pendingRpcParts.getQueue(peer).addIWants(messageIds) - private fun enqueueIhave(peer: PeerHandler, messageIds: List) = - pendingRpcParts.getQueue(peer).addIHaves(messageIds) + private fun enqueueIhave(peer: PeerHandler, messageIds: List, topic: Topic) = + pendingRpcParts.getQueue(peer).addIHaves(messageIds, topic) data class AcceptRequestsWhitelistEntry(val whitelistedTill: Long, val messagesAccepted: Int = 0) { fun incrementMessageCount() = AcceptRequestsWhitelistEntry(whitelistedTill, messagesAccepted + 1) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueue.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueue.kt index cc5fe7893..0ceae0522 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueue.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueue.kt @@ -10,8 +10,8 @@ import pubsub.pb.Rpc interface GossipRpcPartsQueue : RpcPartsQueue { - fun addIHave(messageId: MessageId) - fun addIHaves(messageIds: Collection) = messageIds.forEach { addIHave(it) } + fun addIHave(messageId: MessageId, topic: Topic) + fun addIHaves(messageIds: Collection, topic: Topic) = messageIds.forEach { addIHave(it, topic) } fun addIWant(messageId: MessageId) fun addIWants(messageIds: Collection) = messageIds.forEach { addIWant(it) } @@ -37,7 +37,7 @@ open class DefaultGossipRpcPartsQueue( private val params: GossipParams ) : DefaultRpcPartsQueue(), GossipRpcPartsQueue { - protected data class IHavePart(val messageId: MessageId) : AbstractPart { + protected data class IHavePart(val messageId: MessageId, val topic: Topic) : AbstractPart { override fun appendToBuilder(builder: Rpc.RPC.Builder) { val ctrlBuilder = builder.controlBuilder val iHaveBuilder = if (ctrlBuilder.ihaveBuilderList.isEmpty()) { @@ -45,6 +45,7 @@ open class DefaultGossipRpcPartsQueue( } else { ctrlBuilder.getIhaveBuilder(0) } + iHaveBuilder.setTopicID(topic) iHaveBuilder.addMessageIDs(messageId.toProtobuf()) } } @@ -82,8 +83,8 @@ open class DefaultGossipRpcPartsQueue( } } - override fun addIHave(messageId: MessageId) { - addPart(IHavePart(messageId)) + override fun addIHave(messageId: MessageId, topic: Topic) { + addPart(IHavePart(messageId, topic)) } override fun addIWant(messageId: MessageId) { diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRouterListLimitsTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRouterListLimitsTest.kt index 6942cc979..81dc3d761 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRouterListLimitsTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRouterListLimitsTest.kt @@ -1,5 +1,6 @@ package io.libp2p.pubsub.gossip +import io.libp2p.pubsub.Topic import io.libp2p.pubsub.gossip.builders.GossipParamsBuilder import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder import io.libp2p.tools.protobuf.RpcBuilder @@ -35,6 +36,8 @@ class GossipRouterListLimitsTest { private val routerWithLimits = GossipRouterBuilder(params = gossipParamsWithLimits).build() private val routerWithNoLimits = GossipRouterBuilder(params = gossipParamsNoLimits).build() + private val topic: Topic = "topic1" + @Test fun validateProtobufLists_validMessage() { val msg = fullMsgBuilder().build() @@ -96,7 +99,7 @@ class GossipRouterListLimitsTest { @Test fun validateProtobufLists_tooManyIHaves() { val builder = fullMsgBuilder() - builder.addIHaves(maxIHaveLength, 1) + builder.addIHaves(maxIHaveLength, 1, topic) val msg = builder.build() Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isFalse() @@ -105,7 +108,7 @@ class GossipRouterListLimitsTest { @Test fun validateProtobufLists_tooManyIHaveMsgIds() { val builder = fullMsgBuilder() - builder.addIHaves(1, maxIHaveLength) + builder.addIHaves(1, maxIHaveLength, topic) val msg = builder.build() Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isFalse() @@ -186,7 +189,7 @@ class GossipRouterListLimitsTest { @Test fun validateProtobufLists_maxIHaves() { val builder = fullMsgBuilder() - builder.addIHaves(maxIHaveLength - 1, 1) + builder.addIHaves(maxIHaveLength - 1, 1, topic) val msg = builder.build() Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue() @@ -195,7 +198,7 @@ class GossipRouterListLimitsTest { @Test fun validateProtobufLists_maxIHaveMsgIds() { val builder = fullMsgBuilder() - builder.addIHaves(1, maxIHaveLength - 1) + builder.addIHaves(1, maxIHaveLength - 1, topic) val msg = builder.build() Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue() @@ -256,7 +259,7 @@ class GossipRouterListLimitsTest { // Add some data to all possible fields builder.addSubscriptions(listSize) builder.addPublishMessages(listSize, listSize) - builder.addIHaves(listSize, listSize) + builder.addIHaves(listSize, listSize, topic) builder.addIWants(listSize, listSize) builder.addGrafts(listSize) builder.addPrunes(listSize, listSize) diff --git a/libp2p/src/testFixtures/kotlin/io/libp2p/tools/protobuf/RpcBuilder.kt b/libp2p/src/testFixtures/kotlin/io/libp2p/tools/protobuf/RpcBuilder.kt index a4c55bba6..4da90ef85 100644 --- a/libp2p/src/testFixtures/kotlin/io/libp2p/tools/protobuf/RpcBuilder.kt +++ b/libp2p/src/testFixtures/kotlin/io/libp2p/tools/protobuf/RpcBuilder.kt @@ -1,6 +1,7 @@ package io.libp2p.tools.protobuf import io.libp2p.etc.types.toProtobuf +import io.libp2p.pubsub.Topic import pubsub.pb.Rpc import kotlin.random.Random @@ -28,9 +29,9 @@ class RpcBuilder { } } - fun addIHaves(iHaveCount: Int, messageIdCount: Int) { + fun addIHaves(iHaveCount: Int, messageIdCount: Int, topic: Topic) { for (i in 0 until iHaveCount) { - val iHaveBuilder = Rpc.ControlIHave.newBuilder() + val iHaveBuilder = Rpc.ControlIHave.newBuilder().setTopicID(topic) for (j in 0 until messageIdCount) { iHaveBuilder.addMessageIDs(Random.nextBytes(6).toProtobuf()) }