Skip to content

Commit

Permalink
Pass PeerId along with message when subscribed to a pubsub topic
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Mar 5, 2024
1 parent 562ce10 commit a82c49a
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 16 deletions.
5 changes: 5 additions & 0 deletions libp2p/src/main/kotlin/io/libp2p/core/pubsub/PubsubApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ interface PubsubApi : PubsubSubscriberApi {
*/
interface MessageApi {

/**
* The identity of the peer who has propagated this message
*/
val propagator: PeerId

val originalMessage: PubsubMessage

/**
Expand Down
9 changes: 5 additions & 4 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.function.Consumer
// 1 MB default max message size
const val DEFAULT_MAX_PUBSUB_MESSAGE_SIZE = 1 shl 20

typealias PubsubMessageHandler = (PubsubMessage) -> CompletableFuture<ValidationResult>
typealias PubsubMessageHandler = (PeerId, PubsubMessage) -> CompletableFuture<ValidationResult>

open class DefaultPubsubMessage(override val protobufMessage: Rpc.Message) : AbstractPubsubMessage() {
override val messageId: MessageId = protobufMessage.from.toWBytes() + protobufMessage.seqno.toWBytes()
Expand All @@ -44,7 +44,8 @@ abstract class AbstractRouter(
protected val messageValidator: PubsubRouterMessageValidator
) : P2PServiceSemiDuplex(executor), PubsubRouter, PubsubRouterDebug {

protected var msgHandler: PubsubMessageHandler = { throw IllegalStateException("Message handler is not initialized for PubsubRouter") }
protected var msgHandler: PubsubMessageHandler =
{ _, _ -> throw IllegalStateException("Message handler is not initialized for PubsubRouter") }

protected open val peersTopics = mutableMultiBiMap<PeerHandler, Topic>()
protected open val subscribedTopics = linkedSetOf<Topic>()
Expand Down Expand Up @@ -216,7 +217,7 @@ abstract class AbstractRouter(
}
}

val validFuts = msgValid.map { it to msgHandler(it) }
val validFuts = msgValid.map { it to msgHandler(peer.peerId, it) }
val doneUndone = validFuts.groupBy { it.second.isDone }
val done = doneUndone.getOrDefault(true, emptyList())
val undone = doneUndone.getOrDefault(false, emptyList())
Expand Down Expand Up @@ -331,7 +332,7 @@ abstract class AbstractRouter(
return peer.writeAndFlush(msg)
}

override fun initHandler(handler: (PubsubMessage) -> CompletableFuture<ValidationResult>) {
override fun initHandler(handler: (PeerId, PubsubMessage) -> CompletableFuture<ValidationResult>) {
msgHandler = handler
}
}
10 changes: 5 additions & 5 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubApiImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {
}

init {
router.initHandler { onNewMessage(it) }
router.initHandler { peerId, msg -> onNewMessage(peerId, msg) }
}

val subscriptions: MutableMap<Topic, MutableList<SubscriptionImpl>> = mutableMapOf()
Expand All @@ -74,11 +74,11 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {
}
}

private fun onNewMessage(msg: PubsubMessage): CompletableFuture<ValidationResult> {
private fun onNewMessage(peerId: PeerId, msg: PubsubMessage): CompletableFuture<ValidationResult> {
val validationFuts = synchronized(this) {
msg.topics.mapNotNull { subscriptions[Topic(it)] }.flatten().distinct()
}.map {
it.receiver.apply(rpc2Msg(msg))
it.receiver.apply(rpc2Msg(peerId, msg))
}
return validationFuts.thenApplyAll {
if (it.isEmpty()) {
Expand All @@ -89,7 +89,7 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {
}
}

private fun rpc2Msg(msg: PubsubMessage) = MessageImpl(msg)
private fun rpc2Msg(peerId: PeerId, msg: PubsubMessage) = MessageImpl(peerId, msg)

override fun subscribe(receiver: Validator, vararg topics: Topic): PubsubSubscription {
val subscription = SubscriptionImpl(topics, receiver)
Expand Down Expand Up @@ -138,7 +138,7 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {
PublisherImpl(privKey, seqIdGenerator)
}

class MessageImpl(override val originalMessage: PubsubMessage) : MessageApi {
class MessageImpl(override val propagator: PeerId, override val originalMessage: PubsubMessage) : MessageApi {
private val msg = originalMessage.protobufMessage
override val data = msg.data.toByteArray().toByteBuf()
override val from = if (msg.hasFrom()) msg.from.toByteArray() else null
Expand Down
2 changes: 1 addition & 1 deletion libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ interface PubsubMessageRouter {
* All the messages received by the router are forwarded to the [handler] independently
* of any client subscriptions. Is it up to the client API to sort out subscriptions
*/
fun initHandler(handler: (PubsubMessage) -> CompletableFuture<ValidationResult>)
fun initHandler(handler: (PeerId, PubsubMessage) -> CompletableFuture<ValidationResult>)

/**
* Notifies the router that a client wants to receive messages on the following topics
Expand Down
4 changes: 2 additions & 2 deletions libp2p/src/test/java/io/libp2p/pubsub/GossipApiTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public void testFastMessageId() throws Exception {

BlockingQueue<PubsubMessage> messages = new LinkedBlockingQueue<>();
router.initHandler(
m -> {
messages.add(m);
(__, msg) -> {
messages.add(msg);
return CompletableFuture.completedFuture(ValidationResult.Valid);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ class GossipV1_1Tests {
fun testAppValidatorScore() {
val test = TwoRoutersTest()
val validator = AtomicReference<CompletableFuture<ValidationResult>>(RESULT_VALID)
test.gossipRouter.initHandler { validator.get() }
test.gossipRouter.initHandler { _, _ -> validator.get() }

test.mockRouter.subscribe("topic1")
test.gossipRouter.subscribe("topic1")
Expand Down Expand Up @@ -860,7 +860,7 @@ class GossipV1_1Tests {
fun testValidatorIgnoreResult() {
val test = ManyRoutersTest(mockRouterCount = 2)
val validator = AtomicReference<CompletableFuture<ValidationResult>>(RESULT_VALID)
test.gossipRouter.initHandler { validator.get() }
test.gossipRouter.initHandler { _, _ -> validator.get() }
test.connectAll()
test.gossipRouter.subscribe("topic1")
test.routers.forEach { it.router.subscribe("topic1") }
Expand Down
4 changes: 2 additions & 2 deletions libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/TestRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class TestRouter(

val inboundMessages = LinkedBlockingQueue<PubsubMessage>()
var handlerValidationResult = RESULT_VALID
val routerHandler: (PubsubMessage) -> CompletableFuture<ValidationResult> = {
inboundMessages += it
val routerHandler: (PeerId, PubsubMessage) -> CompletableFuture<ValidationResult> = { _, msg ->
inboundMessages += msg
handlerValidationResult
}

Expand Down

0 comments on commit a82c49a

Please sign in to comment.