From a82c49aea623150248af2c8aa4f937f44ca25b82 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Tue, 5 Mar 2024 14:55:34 +0000 Subject: [PATCH] Pass PeerId along with message when subscribed to a pubsub topic --- .../src/main/kotlin/io/libp2p/core/pubsub/PubsubApi.kt | 5 +++++ .../src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt | 9 +++++---- .../src/main/kotlin/io/libp2p/pubsub/PubsubApiImpl.kt | 10 +++++----- .../src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt | 2 +- .../src/test/java/io/libp2p/pubsub/GossipApiTest.java | 4 ++-- .../kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt | 4 ++-- .../testFixtures/kotlin/io/libp2p/pubsub/TestRouter.kt | 4 ++-- 7 files changed, 22 insertions(+), 16 deletions(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/core/pubsub/PubsubApi.kt b/libp2p/src/main/kotlin/io/libp2p/core/pubsub/PubsubApi.kt index 21b8dbffe..81e72e039 100644 --- a/libp2p/src/main/kotlin/io/libp2p/core/pubsub/PubsubApi.kt +++ b/libp2p/src/main/kotlin/io/libp2p/core/pubsub/PubsubApi.kt @@ -167,6 +167,11 @@ interface PubsubApi : PubsubSubscriberApi { */ interface MessageApi { + /** + * The identity of the peer who has propagated this message + */ + val propagator: PeerId + val originalMessage: PubsubMessage /** diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt index eb8033cea..d5c3a05ee 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt @@ -23,7 +23,7 @@ import java.util.function.Consumer // 1 MB default max message size const val DEFAULT_MAX_PUBSUB_MESSAGE_SIZE = 1 shl 20 -typealias PubsubMessageHandler = (PubsubMessage) -> CompletableFuture +typealias PubsubMessageHandler = (PeerId, PubsubMessage) -> CompletableFuture open class DefaultPubsubMessage(override val protobufMessage: Rpc.Message) : AbstractPubsubMessage() { override val messageId: MessageId = protobufMessage.from.toWBytes() + protobufMessage.seqno.toWBytes() @@ -44,7 +44,8 @@ abstract class AbstractRouter( protected val messageValidator: PubsubRouterMessageValidator ) : P2PServiceSemiDuplex(executor), PubsubRouter, PubsubRouterDebug { - protected var msgHandler: PubsubMessageHandler = { throw IllegalStateException("Message handler is not initialized for PubsubRouter") } + protected var msgHandler: PubsubMessageHandler = + { _, _ -> throw IllegalStateException("Message handler is not initialized for PubsubRouter") } protected open val peersTopics = mutableMultiBiMap() protected open val subscribedTopics = linkedSetOf() @@ -216,7 +217,7 @@ abstract class AbstractRouter( } } - val validFuts = msgValid.map { it to msgHandler(it) } + val validFuts = msgValid.map { it to msgHandler(peer.peerId, it) } val doneUndone = validFuts.groupBy { it.second.isDone } val done = doneUndone.getOrDefault(true, emptyList()) val undone = doneUndone.getOrDefault(false, emptyList()) @@ -331,7 +332,7 @@ abstract class AbstractRouter( return peer.writeAndFlush(msg) } - override fun initHandler(handler: (PubsubMessage) -> CompletableFuture) { + override fun initHandler(handler: (PeerId, PubsubMessage) -> CompletableFuture) { msgHandler = handler } } diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubApiImpl.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubApiImpl.kt index fddc263f2..cd8613c3e 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubApiImpl.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubApiImpl.kt @@ -62,7 +62,7 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi { } init { - router.initHandler { onNewMessage(it) } + router.initHandler { peerId, msg -> onNewMessage(peerId, msg) } } val subscriptions: MutableMap> = mutableMapOf() @@ -74,11 +74,11 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi { } } - private fun onNewMessage(msg: PubsubMessage): CompletableFuture { + private fun onNewMessage(peerId: PeerId, msg: PubsubMessage): CompletableFuture { val validationFuts = synchronized(this) { msg.topics.mapNotNull { subscriptions[Topic(it)] }.flatten().distinct() }.map { - it.receiver.apply(rpc2Msg(msg)) + it.receiver.apply(rpc2Msg(peerId, msg)) } return validationFuts.thenApplyAll { if (it.isEmpty()) { @@ -89,7 +89,7 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi { } } - private fun rpc2Msg(msg: PubsubMessage) = MessageImpl(msg) + private fun rpc2Msg(peerId: PeerId, msg: PubsubMessage) = MessageImpl(peerId, msg) override fun subscribe(receiver: Validator, vararg topics: Topic): PubsubSubscription { val subscription = SubscriptionImpl(topics, receiver) @@ -138,7 +138,7 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi { PublisherImpl(privKey, seqIdGenerator) } -class MessageImpl(override val originalMessage: PubsubMessage) : MessageApi { +class MessageImpl(override val propagator: PeerId, override val originalMessage: PubsubMessage) : MessageApi { private val msg = originalMessage.protobufMessage override val data = msg.data.toByteArray().toByteBuf() override val from = if (msg.hasFrom()) msg.from.toByteArray() else null diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt index ae3d94b16..381974dd8 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt @@ -77,7 +77,7 @@ interface PubsubMessageRouter { * All the messages received by the router are forwarded to the [handler] independently * of any client subscriptions. Is it up to the client API to sort out subscriptions */ - fun initHandler(handler: (PubsubMessage) -> CompletableFuture) + fun initHandler(handler: (PeerId, PubsubMessage) -> CompletableFuture) /** * Notifies the router that a client wants to receive messages on the following topics diff --git a/libp2p/src/test/java/io/libp2p/pubsub/GossipApiTest.java b/libp2p/src/test/java/io/libp2p/pubsub/GossipApiTest.java index d95485103..0ae0111b2 100644 --- a/libp2p/src/test/java/io/libp2p/pubsub/GossipApiTest.java +++ b/libp2p/src/test/java/io/libp2p/pubsub/GossipApiTest.java @@ -53,8 +53,8 @@ public void testFastMessageId() throws Exception { BlockingQueue messages = new LinkedBlockingQueue<>(); router.initHandler( - m -> { - messages.add(m); + (__, msg) -> { + messages.add(msg); return CompletableFuture.completedFuture(ValidationResult.Valid); }); diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt index 3750803e4..6ebda9752 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt @@ -422,7 +422,7 @@ class GossipV1_1Tests { fun testAppValidatorScore() { val test = TwoRoutersTest() val validator = AtomicReference>(RESULT_VALID) - test.gossipRouter.initHandler { validator.get() } + test.gossipRouter.initHandler { _, _ -> validator.get() } test.mockRouter.subscribe("topic1") test.gossipRouter.subscribe("topic1") @@ -860,7 +860,7 @@ class GossipV1_1Tests { fun testValidatorIgnoreResult() { val test = ManyRoutersTest(mockRouterCount = 2) val validator = AtomicReference>(RESULT_VALID) - test.gossipRouter.initHandler { validator.get() } + test.gossipRouter.initHandler { _, _ -> validator.get() } test.connectAll() test.gossipRouter.subscribe("topic1") test.routers.forEach { it.router.subscribe("topic1") } diff --git a/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/TestRouter.kt b/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/TestRouter.kt index 60e325aed..ec8fbb4d5 100644 --- a/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/TestRouter.kt +++ b/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/TestRouter.kt @@ -40,8 +40,8 @@ class TestRouter( val inboundMessages = LinkedBlockingQueue() var handlerValidationResult = RESULT_VALID - val routerHandler: (PubsubMessage) -> CompletableFuture = { - inboundMessages += it + val routerHandler: (PeerId, PubsubMessage) -> CompletableFuture = { _, msg -> + inboundMessages += msg handlerValidationResult }